diff --git a/src/appier_extras/parts/admin/models/__init__.py b/src/appier_extras/parts/admin/models/__init__.py index a6858149..1579cb3e 100644 --- a/src/appier_extras/parts/admin/models/__init__.py +++ b/src/appier_extras/parts/admin/models/__init__.py @@ -29,6 +29,7 @@ from . import account from . import base from . import config +from . import credential from . import event from . import locale from . import role @@ -40,6 +41,7 @@ from .account import Account from .base import Base from .config import Config +from .credential import Credential from .event import Event from .locale import Locale from .role import Role diff --git a/src/appier_extras/parts/admin/models/account.py b/src/appier_extras/parts/admin/models/account.py index 8d5093d6..3b0c9425 100644 --- a/src/appier_extras/parts/admin/models/account.py +++ b/src/appier_extras/parts/admin/models/account.py @@ -28,7 +28,9 @@ __license__ = "Apache License, Version 2.0" """ The license for the module """ +import json import time +import base64 import hashlib import binascii @@ -68,6 +70,8 @@ class Account(base.Base, authenticable.Authenticable): confirmation_token = appier.field(safe=True, private=True, meta="secret") + fido2_enabled = appier.field(type=bool, description="FIDO2 Enabled") + otp_enabled = appier.field(type=bool, description="OTP Enabled") otp_secret = appier.field( @@ -106,6 +110,8 @@ class Account(base.Base, authenticable.Authenticable): roles = appier.field(type=appier.references("Role", name="id")) + credentials = appier.field(type=appier.references("Credential", name="id")) + @classmethod def setup(cls): super(Account, cls).setup() @@ -281,6 +287,66 @@ def login_otp(cls, username, otp_token, touch=True): account.touch_login_s() return account + @classmethod + def login_begin_fido2(cls, username): + # tries to retrieve the account with the provided username, so that + # the other validation steps may be done as required by login operation + account = cls.get(username=username, rules=False, build=False, raise_e=False) + if not account: + raise appier.OperationalError(message="No valid account found", code=403) + + # verifies that the retrieved account is currently enabled, because + # disabled accounts are not considered to be valid ones + if not account.enabled: + raise appier.OperationalError(message="Account is not enabled", code=403) + + fido2_server = cls._get_fido2_server() + auth_data, state = fido2_server.authenticate_begin(account.credentials_data_n) + state_json = json.dumps(state) + + auth_data_d = dict(auth_data) + auth_data_json = json.dumps(auth_data_d, cls=utils.BytesEncoder) + + return state_json, auth_data_json + + @classmethod + def login_fido2(cls, username, state, response_data, touch=True): + if not state: + raise appier.OperationalError( + message="FIDO2 state must be provided", code=400 + ) + + if not response_data: + raise appier.OperationalError( + message="FIDO2 response data must be provided", code=400 + ) + + # tries to retrieve the account with the provided username, so that + # the other validation steps may be done as required by login operation + account = cls.get(username=username, rules=False, build=False, raise_e=False) + if not account: + raise appier.OperationalError(message="No valid account found", code=403) + + # verifies that the retrieved account is currently enabled, because + # disabled accounts are not considered to be valid ones + if not account.enabled: + raise appier.OperationalError(message="Account is not enabled", code=403) + + # obtains the FIDO2 server and runs the authentication complete operation + # using the current session state and the credential data, if the + # authentication fails an exception is raised + fido2_server = cls._get_fido2_server() + fido2_server.authenticate_complete( + state, account.credentials_data_n, response_data + ) + + # "touches" the current account meaning that the last login value will be + # updated to reflect the current time and then returns the current logged + # in account to the caller method so that it may used (valid account) + if touch: + account.touch_login_s() + return account + @classmethod def confirm(cls, confirmation_token, send_email=False): # validates the current account and token for confirmation and @@ -569,6 +635,22 @@ def _is_master(cls, owner=None): admin_part = owner.admin_part return cls == admin_part.account_c + @classmethod + def _get_fido2_server(cls): + if hasattr(cls, "_fido2_server") and cls._fido2_server: + return cls._fido2_server + + _fido2 = appier.import_pip("fido2") + + import fido2.webauthn + import fido2.server + + fido2.webauthn.webauthn_json_mapping.enabled = True + rp = fido2.webauthn.PublicKeyCredentialRpEntity(name="Appier", id="localhost") + cls._fido2_server = fido2.server.Fido2Server(rp, verify_origin=lambda _: True) + + return cls._fido2_server + def pre_validate(self): base.Base.pre_validate(self) if hasattr(self, "username") and self.username: @@ -732,6 +814,48 @@ def verify_otp(self, otp_token): if not totp.verify(otp_token): raise appier.OperationalError(message="Invalid OTP code", code=403) + def register_begin_fido2(self): + cls = self.__class__ + fido2_server = cls._get_fido2_server() + + registration_data, state = fido2_server.register_begin( + dict( + id=appier.legacy.bytes(self.username, encoding="utf-8"), + name=self.username, + displayName=self.username, + ), + user_verification="discouraged", + ) + state_json = json.dumps(state) + self.session["state"] = state_json + + registration_data_d = dict(registration_data) + registration_data_json = json.dumps(registration_data_d, cls=utils.BytesEncoder) + + return registration_data_json + + def register_fido2(self, state, credential_data): + cls = self.__class__ + + # obtains the FIDO2 server and runs the registration complete operation + # using the current session state and the credential data, if the + # registration fails an exception is raised + fido2_server = cls._get_fido2_server() + auth_data = fido2_server.register_complete(state, credential_data) + + # converts the credential data into a Base64 encoded string + # the strings is structured in the WebAuthn standard format + credential_id_b64 = appier.legacy.str( + base64.b64encode(auth_data.credential_data.credential_id) + ) + credential_data_b64 = appier.legacy.str( + base64.b64encode(auth_data.credential_data) + ) + + # adds the new credential to the account effectively enabling + # FIDO2 based authentication for the account + self.add_credential_s(credential_id_b64, credential_data_b64) + def _send_avatar( self, image="avatar.png", width=None, height=None, strict=False, cache=False ): @@ -930,6 +1054,30 @@ def remove_role_s(self, name): self.roles_l.remove(_role) self.save() + @appier.operation( + name="Add Credential", + parameters=( + ("Credential ID", "credential_id", str), + ("Credential Data", "credential_data", str), + ), + ) + def add_credential_s(self, credential_id, credential_data): + from . import credential + + _credential = credential.Credential( + credential_id=credential_id, credential_data=credential_data, account=self + ) + _credential.save() + + @appier.operation( + name="Remove Credential", parameters=(("Credential ID", "credential_id", str),) + ) + def remove_credential_s(self, credential_id): + from . import credential + + _credential = credential.Credential.get(credential_id=credential_id) + _credential.delete() + @appier.operation(name="Fix Roles", level=2) def fix_children_s(self): self.roles = [role for role in self.roles if role and hasattr(role, "tokens_a")] @@ -1009,6 +1157,16 @@ def duplicates_url(cls, view=None, context=None, absolute=False): absolute=absolute, ) + @appier.view(name="Credentials") + def credentials_v(self, *args, **kwargs): + kwargs["sort"] = kwargs.get("sort", [("id", 1)]) + return appier.lazy_dict( + model=self.credentials._target, + kwargs=kwargs, + entities=appier.lazy(lambda: self.credentials.find(*args, **kwargs)), + page=appier.lazy(lambda: self.credentials.paginate(*args, **kwargs)), + ) + @property def confirmed(self): return self.enabled @@ -1035,6 +1193,8 @@ def two_factor_enabled(self): @property def two_factor_method(self): + if self.fido2_enabled: + return "fido2" if self.otp_enabled: return "otp" return None @@ -1048,6 +1208,22 @@ def otp_uri(self): self.otp_secret, ) + @property + def credentials_data(self): + from . import credential + + return credential.Credential.credentials_data_account(self) + + @property + def credentials_data_n(self): + _fido2 = appier.import_pip("fido2") + import fido2.webauthn + + return [ + fido2.webauthn.AttestedCredentialData(credential_data) + for credential_data in self.credentials_data + ] + def _set_session(self, unset=True, safes=[], method="set", two_factor=True): cls = self.__class__ if unset: diff --git a/src/appier_extras/parts/admin/models/credential.py b/src/appier_extras/parts/admin/models/credential.py new file mode 100644 index 00000000..39711293 --- /dev/null +++ b/src/appier_extras/parts/admin/models/credential.py @@ -0,0 +1,101 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Hive Appier Framework +# Copyright (c) 2008-2024 Hive Solutions Lda. +# +# This file is part of Hive Appier Framework. +# +# Hive Appier Framework is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by the Apache +# Foundation, either version 2.0 of the License, or (at your option) any +# later version. +# +# Hive Appier Framework is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License along with +# Hive Appier Framework. If not, see . + +__author__ = "João Magalhães " +""" The author(s) of the module """ + +__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda." +""" The copyright for the module """ + +__license__ = "Apache License, Version 2.0" +""" The license for the module """ + +import base64 + +import appier + +from appier_extras.parts.admin.models import base + + +class Credential(base.Base): + """ + Model that represents a credential that is associated with + an account, this credential may be used for authentication + purposes and may be of different types (eg: FIDO2). + + It is expected that the FIDO2 authentication is going to be + performed using WebAuthn and the credential is going to be + created using the `navigator.credentials.create` method. + """ + + credential_id = appier.field( + index=True, immutable=True, description="Credential ID" + ) + + credential_data = appier.field(immutable=True) + + account = appier.field(type=appier.reference("Account", name="id"), immutable=True) + + @classmethod + def validate(cls): + return super(Credential, cls).validate() + [ + appier.not_null("credential_id"), + appier.not_empty("credential_id"), + appier.not_null("credential_data"), + appier.not_empty("credential_data"), + ] + + @classmethod + def list_names(cls): + return ["credential_id", "description", "account"] + + def post_create(self): + base.Base.post_create(self) + + account = self.account.reload() + account.credentials.append(self) + account.fido2_enabled = True + account.save() + + def post_delete(self): + base.Base.post_create(self) + + account = self.account.reload() + if self in account.credentials: + account.credentials.remove(self) + if len(account.credentials) == 0: + account.fido2_enabled = False + account.save() + + @classmethod + def get_by_credential_id(cls, credential_id, *args, **kwargs): + return cls.get(credential_id=credential_id, eager=("account",), *args, **kwargs) + + @classmethod + def find_by_account(cls, account, *args, **kwargs): + return cls.find(account=account.id, *args, **kwargs) + + @classmethod + def credentials_data_account(cls, account, *args, **kwargs): + credentials = cls.find_by_account(account=account, *args, **kwargs) + return [ + base64.b64decode(credential.credential_data) for credential in credentials + ] diff --git a/src/appier_extras/parts/admin/part.py b/src/appier_extras/parts/admin/part.py index 99b48ebb..1f929def 100644 --- a/src/appier_extras/parts/admin/part.py +++ b/src/appier_extras/parts/admin/part.py @@ -31,6 +31,7 @@ import os import json import time +import base64 import datetime import tempfile @@ -120,6 +121,8 @@ def load(self): self.owner.two_factor_route_admin = "admin.two_factor" self.owner.otp_route = "admin.otp" self.owner.otp_route_admin = "admin.otp" + self.owner.fido2_route = "admin.fido2" + self.owner.fido2_route_admin = "admin.fido2" self.owner.admin_account = self.account_c self.owner.admin_role = self.role_c self.owner.admin_login_redirect = "admin.index" @@ -197,6 +200,10 @@ def routes(self): (("GET",), "/admin/2fa", self.two_factor), (("GET",), "/admin/otp", self.otp), (("POST",), "/admin/otp", self.otp_login), + (("GET",), "/admin/fido2", self.fido2), + (("POST",), "/admin/fido2", self.fido2_login), + (("GET",), "/admin/fido2/register", self.fido2_register), + (("POST",), "/admin/fido2/register", self.fido2_register_do), (("GET"), "/admin/confirm", self.confirm), (("GET"), "/admin/recover", self.recover), (("POST"), "/admin/recover", self.recover_do), @@ -676,6 +683,10 @@ def two_factor(self): next = self.field("next") error = self.field("error") two_factor_method = self.session["2fa.method"] + if two_factor_method == "fido2": + return self.redirect( + self.url_for(self.owner.fido2_route_admin, next=next, error=error) + ) if two_factor_method == "otp": return self.redirect( self.url_for(self.owner.otp_route_admin, next=next, error=error) @@ -731,6 +742,70 @@ def otp_login(self): # alternative to the root index of the administration return self.redirect(next or self.url_for(self.owner.admin_login_redirect)) + def fido2(self): + next = self.field("next") + error = self.field("error") + + username = self.session["2fa.username"] + state, auth_data = self.account_c.login_begin_fido2(username) + self.session["state"] = state + + return self.template( + "fido2.html.tpl", next=next, error=error, auth_data=auth_data + ) + + def fido2_login(self): + next = self.field("next") + response = self.field("response") + state_json = self.session["state"] + + response_data = json.loads(response) + state = json.loads(state_json) + + username = self.session["2fa.username"] + + account = self.account_c.login_fido2( + username, state, response_data, touch=False + ) + + # updates the current session with the proper + # values to correctly authenticate the user + account.touch_login_s() + account._set_account() + + # redirects the current operation to the next URL or in + # alternative to the root index of the administration + return self.redirect(next or self.url_for(self.owner.admin_login_redirect)) + + def fido2_register(self): + next = self.field("next") + error = self.field("error") + + account = self.account_c.from_session() + registration_data = account.register_begin_fido2() + + return self.template( + "fido2_register.html.tpl", + next=next, + error=error, + registration_data=registration_data, + ) + + def fido2_register_do(self): + next = self.field("next") + credential = self.field("credential") + state_json = self.session["state"] + + credential_data = json.loads(credential) + state = json.loads(state_json) + + account = self.account_c.from_session() + account.register_fido2(state, credential_data) + + # redirects the current operation to the next URL or in + # alternative to the root index of the administration + return self.redirect(next or self.url_for(self.owner.admin_login_redirect)) + def recover(self): next = self.field("next") return self.template("recover.html.tpl", next=next) diff --git a/src/appier_extras/parts/admin/static/css/layout.css b/src/appier_extras/parts/admin/static/css/layout.css index 7f151587..735e3d46 100644 --- a/src/appier_extras/parts/admin/static/css/layout.css +++ b/src/appier_extras/parts/admin/static/css/layout.css @@ -423,3 +423,11 @@ body.mobile-s .pages > .page.other { line-height: 20px; margin: 0px auto 0px auto; } + +.fido2-register { + display: none; +} + +.fido2-auth { + display: none; +} diff --git a/src/appier_extras/parts/admin/static/js/main.js b/src/appier_extras/parts/admin/static/js/main.js index ec68627a..5efcc1d7 100644 --- a/src/appier_extras/parts/admin/static/js/main.js +++ b/src/appier_extras/parts/admin/static/js/main.js @@ -32,6 +32,16 @@ // in the matched object and starts the proper message plugin in it var message = jQuery(".header-message", matchedObject); message.umessage(); + + // retrieves the reference to the FIDO2 Auth element and registers it + // for the base FIDO2 Auth plugin so that it's properly initialized + var fido2 = jQuery(".fido2-auth", matchedObject); + fido2.ufido2auth(); + + // retrieves the reference to the FIDO2 Register element and registers it + // for the base FIDO2 Register plugin so that it's properly initialized + var fido2 = jQuery(".fido2-register", matchedObject); + fido2.ufido2register(); }; })(jQuery); @@ -62,9 +72,104 @@ }; })(jQuery); +(function(jQuery) { + jQuery.fn.ufido2auth = function(options) { + var matchedObject = this; + matchedObject.each(function(index, element) { + var _element = jQuery(element); + var form = _element.parents("form"); + + var authData = JSON.parse(_element.text()); + + authData.publicKey.challenge = base64ToUint8Array(authData.publicKey.challenge); + authData.publicKey.allowCredentials.forEach(function(credential) { + credential.id = base64ToUint8Array(credential.id); + }); + + navigator.credentials.get(authData).then(function(response) { + var responseInput = jQuery("input[type=\"hidden\"][name=\"response\"]", + form); + var serializedResponse = serializePublicKeyCredential(response); + responseInput.uxvalue(JSON.stringify(serializedResponse)); + form.submit(); + }); + }); + }; +})(jQuery); + +(function(jQuery) { + jQuery.fn.ufido2register = function(options) { + var matchedObject = this; + matchedObject.each(function(index, element) { + var _element = jQuery(element); + var form = _element.parents("form"); + + var registrationData = JSON.parse(_element.text()); + + registrationData.publicKey.user.id = base64ToUint8Array(registrationData.publicKey.user.id); + registrationData.publicKey.challenge = base64ToUint8Array(registrationData.publicKey.challenge); + + navigator.credentials.create(registrationData).then(function(credential) { + var credentialInput = jQuery("input[type=\"hidden\"][name=\"credential\"]", + form); + var serializedCredential = serializePublicKeyCredential(credential); + credentialInput.uxvalue(JSON.stringify(serializedCredential)); + form.submit(); + }); + }); + }; +})(jQuery); + jQuery(document).ready(function() { var _body = jQuery("body"); _body.bind("applied", function(event, base) { base.uapply(); }); }); + +function base64ToUint8Array(base64, urlSafe = true) { + if (urlSafe) { + base64 = base64.replace(/-/g, "+").replace(/_/g, "/"); + while (base64.length % 4 === false) { + base64 += "="; + } + } + var binaryString = atob(base64); + var len = binaryString.length; + var bytes = new Uint8Array(len); + for (var i = 0; i < len; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + return bytes; +} + +function arrayBufferToBase64(buffer) { + var binary = ''; + var bytes = new Uint8Array(buffer); + var len = bytes.byteLength; + for (var i = 0; i < len; i++) { + binary += String.fromCharCode(bytes[i]); + } + return window.btoa(binary); +} + +function serializePublicKeyCredential(publicKeyCredential) { + var serialized = {}; + + for (var key in publicKeyCredential) { + if (publicKeyCredential[key] === undefined) { + continue; + } + + var value = publicKeyCredential[key]; + if (value instanceof ArrayBuffer) { + serialized[key] = arrayBufferToBase64(value); + } else if (value instanceof Object && !Array.isArray(value)) { + serialized[key] = serializePublicKeyCredential(value); + } else { + serialized[key] = value; + } + } + + return serialized; +} diff --git a/src/appier_extras/parts/admin/templates/fluid/account/show.html.tpl b/src/appier_extras/parts/admin/templates/fluid/account/show.html.tpl index b0b9ab4f..f78681c1 100644 --- a/src/appier_extras/parts/admin/templates/fluid/account/show.html.tpl +++ b/src/appier_extras/parts/admin/templates/fluid/account/show.html.tpl @@ -5,6 +5,9 @@ {% block buttons %} {{ super() }}