diff --git a/src/eduid/common/config/base.py b/src/eduid/common/config/base.py index 5d36abbec..1e0997db4 100644 --- a/src/eduid/common/config/base.py +++ b/src/eduid/common/config/base.py @@ -371,6 +371,7 @@ class AuthnParameters(BaseModel): same_user: bool = True # the same user was required to log in, such as when entering the security center max_age: timedelta = timedelta(minutes=5) # the maximum age of the authentication allow_login_auth: bool = False # allow login authentication as substitute action + allow_signup_auth: bool = False # allow during signup finish_url: str # str as we want to use unformatted parts as {app_name} and {authn_id} @@ -384,6 +385,7 @@ class FrontendActionMixin(BaseModel): force_authn=True, high_security=True, allow_login_auth=True, + allow_signup_auth=True, finish_url="https://eduid.se/profile/ext-return/{app_name}/{authn_id}", ), FrontendAction.CHANGE_PW_AUTHN: AuthnParameters( @@ -421,6 +423,7 @@ class FrontendActionMixin(BaseModel): FrontendAction.VERIFY_IDENTITY: AuthnParameters( force_authn=True, allow_login_auth=True, + allow_signup_auth=True, finish_url="https://eduid.se/profile/ext-return/{app_name}/{authn_id}", ), FrontendAction.TERMINATE_ACCOUNT_AUTHN: AuthnParameters( @@ -433,6 +436,7 @@ class FrontendActionMixin(BaseModel): force_authn=True, force_mfa=True, allow_login_auth=True, + allow_signup_auth=True, finish_url="https://eduid.se/profile/ext-return/{app_name}/{authn_id}", ), FrontendAction.REMOVE_IDENTITY: AuthnParameters( @@ -443,6 +447,8 @@ class FrontendActionMixin(BaseModel): ), } ) + # slack for recent signup validity as an authn action + signup_auth_slack: timedelta = timedelta(minutes=3) class ProofingConfigMixin(FrontendActionMixin): diff --git a/src/eduid/satosa/scimapi/static_attributes.py b/src/eduid/satosa/scimapi/static_attributes.py index d5e37032d..3a84d0c62 100644 --- a/src/eduid/satosa/scimapi/static_attributes.py +++ b/src/eduid/satosa/scimapi/static_attributes.py @@ -78,7 +78,6 @@ def _build_static(self, requester: str, vidp: str, existing_attributes: dict) -> else: static_attributes[attr_name] = fmt - logger.debug(f"Appending static attribute {attr_name}: {fmt} for requester {requester} or {vidp}") return static_attributes diff --git a/src/eduid/webapp/bankid/acs_actions.py b/src/eduid/webapp/bankid/acs_actions.py index 6096ae2a4..fa35e76a0 100644 --- a/src/eduid/webapp/bankid/acs_actions.py +++ b/src/eduid/webapp/bankid/acs_actions.py @@ -1,12 +1,13 @@ from eduid.userdb import User from eduid.userdb.credentials.fido import FidoCredential from eduid.webapp.bankid.app import current_bankid_app as current_app -from eduid.webapp.bankid.helpers import BankIDMsg, check_reauthn +from eduid.webapp.bankid.helpers import BankIDMsg from eduid.webapp.bankid.proofing import get_proofing_functions from eduid.webapp.common.api.decorators import require_user from eduid.webapp.common.api.messages import AuthnStatusMsg from eduid.webapp.common.authn.acs_enums import BankIDAcsAction from eduid.webapp.common.authn.acs_registry import ACSArgs, ACSResult, acs_action +from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.common.proofing.messages import ProofingMsg from eduid.webapp.common.proofing.methods import ProofingMethodSAML from eduid.webapp.common.proofing.saml_helpers import authn_ctx_to_loa, is_required_loa, is_valid_authn_instant @@ -105,7 +106,9 @@ def verify_credential_action(user: User, args: ACSArgs) -> ACSResult: return ACSResult(message=BankIDMsg.credential_not_found) # Check (again) if token was used to authenticate this session and that the auth is not stale. - _need_reauthn = check_reauthn(frontend_action=args.authn_req.frontend_action, user=user, credential_used=credential) + _need_reauthn = check_reauthn( + frontend_action=args.authn_req.frontend_action, user=user, credential_requested=credential + ) if _need_reauthn: current_app.logger.error(f"User needs to authenticate: {_need_reauthn}") return ACSResult(message=AuthnStatusMsg.must_authenticate) diff --git a/src/eduid/webapp/bankid/helpers.py b/src/eduid/webapp/bankid/helpers.py index a06234da9..eae6574a5 100644 --- a/src/eduid/webapp/bankid/helpers.py +++ b/src/eduid/webapp/bankid/helpers.py @@ -6,15 +6,10 @@ from saml2.client import Saml2Client from saml2.typing import SAMLHttpArgs -from eduid.common.config.base import FrontendAction -from eduid.userdb import User -from eduid.userdb.credentials import Credential from eduid.userdb.credentials.external import TrustFramework from eduid.webapp.bankid.app import current_bankid_app as current_app from eduid.webapp.common.api.messages import TranslatableMsg -from eduid.webapp.common.api.schemas.authn_status import AuthnActionStatus from eduid.webapp.common.authn.cache import OutstandingQueriesCache -from eduid.webapp.common.authn.utils import validate_authn_for_action from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import AuthnRequestRef @@ -94,21 +89,3 @@ def create_authn_info( oq_cache = OutstandingQueriesCache(session.bankid.sp.pysaml2_dicts) oq_cache.set(session_id, authn_ref) return info - - -def check_reauthn( - frontend_action: FrontendAction, user: User, credential_used: Credential | None = None -) -> AuthnActionStatus | None: - """Check if a re-authentication has been performed recently enough for this action""" - - authn_status = validate_authn_for_action( - config=current_app.conf, frontend_action=frontend_action, credential_used=credential_used, user=user - ) - current_app.logger.debug(f"check_reauthn called with authn status {authn_status}") - if authn_status != AuthnActionStatus.OK: - if authn_status == AuthnActionStatus.STALE: - # count stale authentications to monitor if users need more time - current_app.stats.count(name=f"{frontend_action.value}_stale_reauthn", value=1) - return authn_status - current_app.stats.count(name=f"{frontend_action.value}_successful_reauthn", value=1) - return None diff --git a/src/eduid/webapp/bankid/tests/test_app.py b/src/eduid/webapp/bankid/tests/test_app.py index f59736aca..c7259856e 100644 --- a/src/eduid/webapp/bankid/tests/test_app.py +++ b/src/eduid/webapp/bankid/tests/test_app.py @@ -4,6 +4,7 @@ import os import unittest from collections.abc import Mapping +from datetime import timedelta from typing import Any from unittest.mock import MagicMock, patch @@ -18,12 +19,11 @@ from eduid.webapp.bankid.helpers import BankIDMsg from eduid.webapp.common.api.messages import AuthnStatusMsg, TranslatableMsg from eduid.webapp.common.api.testing import CSRFTestClient -from eduid.webapp.common.authn.acs_enums import AuthnAcsAction from eduid.webapp.common.authn.cache import OutstandingQueriesCache from eduid.webapp.common.proofing.messages import ProofingMsg from eduid.webapp.common.proofing.testing import ProofingTests from eduid.webapp.common.session import EduidSession -from eduid.webapp.common.session.namespaces import AuthnRequestRef, SP_AuthnRequest +from eduid.webapp.common.session.namespaces import AuthnRequestRef __author__ = "lundberg" @@ -199,6 +199,7 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: "force_authn": True, "force_mfa": True, "allow_login_auth": True, + "allow_signup_auth": True, "finish_url": "http://test.localhost/testing-verify-credential/{app_name}/{authn_id}", }, }, @@ -265,18 +266,6 @@ def generate_auth_response( return resp.encode("utf-8") - def _setup_faked_login(self, session: EduidSession, credentials_used: list[ElementKey]) -> None: - logger.debug(f"Test setting credentials used for login in session {session}: {credentials_used}") - authn_req = SP_AuthnRequest( - post_authn_action=AuthnAcsAction.login, - credentials_used=credentials_used, - authn_instant=utc_now(), - frontend_action=FrontendAction.LOGIN, - finish_url="https://example.com/ext-return/{app_name}/{authn_id}", - ) - - session.authn.sp.authns = {authn_req.authn_id: authn_req} - @staticmethod def _get_request_id_from_session(session: EduidSession) -> tuple[str, AuthnRequestRef]: """extract the (probable) SAML request ID from the session""" @@ -329,6 +318,7 @@ def verify_token( expect_error: bool = False, expect_saml_error: bool = False, identity: NinIdentity | None = None, + logged_in: bool = True, method: str = "bankid", response_template: str | None = None, verify_credential: ElementKey | None = None, @@ -344,6 +334,7 @@ def verify_token( expect_saml_error=expect_saml_error, frontend_action=frontend_action, identity=identity, + logged_in=logged_in, method=method, response_template=response_template, verify_credential=verify_credential, @@ -355,9 +346,10 @@ def _get_authn_redirect_url( endpoint: str, method: str, frontend_action: FrontendAction, + expect_success: bool = True, verify_credential: ElementKey | None = None, frontend_state: str | None = None, - ) -> str: + ) -> str | None: with browser.session_transaction() as sess: csrf_token = sess.get_csrf_token() @@ -373,10 +365,16 @@ def _get_authn_redirect_url( req["frontend_action"] = frontend_action.value assert browser response = browser.post(endpoint, json=req) - self._check_success_response(response, type_=None, payload={"csrf_token": csrf_token}) - - loc = self.get_response_payload(response).get("location") - assert loc is not None, "Location in header is missing" + if expect_success: + self._check_success_response(response, type_=None, payload={"csrf_token": csrf_token}) + loc = self.get_response_payload(response).get("location") + assert loc is not None, "Location in header is missing" + else: + loc = None + payload = {"csrf_token": csrf_token} + if verify_credential: + payload["credential_description"] = "test" + self._check_error_response(response, type_=None, payload=payload, msg=AuthnStatusMsg.must_authenticate) return loc def _call_endpoint_and_saml_acs( @@ -411,15 +409,17 @@ def _call_endpoint_and_saml_acs( assert isinstance(browser, CSRFTestClient) - browser_with_session_cookie = self.session_cookie(browser, eppn) - if not logged_in: + if logged_in: + browser_with_session_cookie = self.session_cookie(browser, eppn) + self.set_authn_action( + eppn=eppn, + frontend_action=FrontendAction.LOGIN, + credentials_used=credentials_used, + ) + else: browser_with_session_cookie = self.session_cookie_anon(browser) with browser_with_session_cookie as browser: - if credentials_used: - with browser.session_transaction() as sess: - self._setup_faked_login(session=sess, credentials_used=credentials_used) - if logged_in is False: with browser.session_transaction() as sess: # the user is at least partially logged in at this stage @@ -515,6 +515,54 @@ def test_webauthn_token_verify(self, mock_request_user_sync: MagicMock) -> None: self._verify_user_parameters(eppn, token_verified=True, num_proofings=1) + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_webauthn_token_verify_signup_authn(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + + credential = self.add_security_key_to_user(eppn, "test", "webauthn") + + self._verify_user_parameters(eppn) + + self.setup_signup_authn(eppn=eppn) + + self.verify_token( + endpoint="/verify-credential", + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + eppn=eppn, + expect_msg=BankIDMsg.credential_verify_success, + verify_credential=credential.key, + logged_in=False, + ) + + self._verify_user_parameters(eppn, token_verified=True, num_proofings=1) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_webauthn_token_verify_signup_authn_token_to_old(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + credential = self.add_security_key_to_user( + eppn=eppn, keyhandle="test", token_type="webauthn", created_ts=utc_now() + timedelta(minutes=6) + ) + self._verify_user_parameters(eppn) + + self.setup_signup_authn(eppn=eppn) + + with self.session_cookie(self.browser, eppn) as browser: + location = self._get_authn_redirect_url( + browser=browser, + endpoint="/verify-credential", + method="bankid", + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + verify_credential=credential.key, + expect_success=False, + ) + assert location is None + + self._verify_user_parameters(eppn, token_verified=False, num_proofings=0) + def test_mfa_token_verify_wrong_verified_nin(self) -> None: eppn = self.test_user.eppn nin = self.test_user_wrong_nin @@ -722,6 +770,40 @@ def test_nin_verify(self, mock_request_user_sync: MagicMock) -> None: assert doc["given_name"] == "Ûlla" assert doc["surname"] == "Älm" + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_nin_verify_signup_auth(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_unverified_user_eppn + self._verify_user_parameters(eppn, num_mfa_tokens=0, identity_verified=False) + + self.setup_signup_authn(eppn=eppn) + + self.reauthn( + "/verify-identity", + frontend_action=FrontendAction.VERIFY_IDENTITY, + expect_msg=BankIDMsg.identity_verify_success, + eppn=eppn, + logged_in=False, + ) + user = self.app.central_userdb.get_user_by_eppn(eppn) + self._verify_user_parameters( + eppn, + num_mfa_tokens=0, + identity_verified=True, + num_proofings=1, + locked_identity=user.identities.nin, + proofing_method=IdentityProofingMethod.BANKID, + proofing_version=self.app.conf.bankid_proofing_version, + ) + # check names + assert user.given_name == "Ûlla" + assert user.surname == "Älm" + # check proofing log + doc = self.app.proofing_log._get_documents_by_attr(attr="eduPersonPrincipalName", value=eppn)[0] + assert doc["given_name"] == "Ûlla" + assert doc["surname"] == "Älm" + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") def test_mfa_login(self, mock_request_user_sync: MagicMock) -> None: mock_request_user_sync.side_effect = self.request_user_sync diff --git a/src/eduid/webapp/bankid/views.py b/src/eduid/webapp/bankid/views.py index 6b629b221..74c82f142 100644 --- a/src/eduid/webapp/bankid/views.py +++ b/src/eduid/webapp/bankid/views.py @@ -8,7 +8,7 @@ from eduid.userdb.credentials import FidoCredential from eduid.userdb.element import ElementKey from eduid.webapp.bankid.app import current_bankid_app as current_app -from eduid.webapp.bankid.helpers import BankIDMsg, check_reauthn, create_authn_info +from eduid.webapp.bankid.helpers import BankIDMsg, create_authn_info from eduid.webapp.common.api.decorators import MarshalWith, UnmarshalWith, require_user from eduid.webapp.common.api.errors import EduidErrorsContext, goto_errors_response from eduid.webapp.common.api.helpers import check_magic_cookie @@ -18,7 +18,6 @@ FluxData, TranslatableMsg, error_response, - need_authentication_response, success_response, ) from eduid.webapp.common.api.schemas.authn_status import StatusRequestSchema, StatusResponseSchema @@ -26,7 +25,7 @@ from eduid.webapp.common.authn.acs_enums import BankIDAcsAction from eduid.webapp.common.authn.acs_registry import ACSArgs, get_action from eduid.webapp.common.authn.eduid_saml2 import process_assertion -from eduid.webapp.common.authn.utils import get_location +from eduid.webapp.common.authn.utils import check_reauthn, get_location from eduid.webapp.common.proofing.methods import ProofingMethodSAML, get_proofing_method from eduid.webapp.common.proofing.saml_helpers import create_metadata from eduid.webapp.common.session import session @@ -94,14 +93,9 @@ def verify_credential( current_app.logger.error(f"Can't find credential with id: {credential_id}") return error_response(message=BankIDMsg.credential_not_found) - _need_reauthn = check_reauthn(frontend_action=_frontend_action, user=user, credential_used=credential) + _need_reauthn = check_reauthn(frontend_action=_frontend_action, user=user, credential_requested=credential) if _need_reauthn: - current_app.logger.debug(f"Need re-authentication for credential: {credential_id}") - return need_authentication_response( - frontend_action=_frontend_action, - authn_status=_need_reauthn, - payload={"credential_description": credential.description}, # type: ignore[attr-defined] - ) + return _need_reauthn result = _authn( BankIDAcsAction.verify_credential, diff --git a/src/eduid/webapp/common/api/testing.py b/src/eduid/webapp/common/api/testing.py index 95b62aede..f96782990 100644 --- a/src/eduid/webapp/common/api/testing.py +++ b/src/eduid/webapp/common/api/testing.py @@ -8,7 +8,7 @@ from collections.abc import Generator, Iterable, Mapping from contextlib import contextmanager from copy import deepcopy -from datetime import timedelta +from datetime import datetime, timedelta from typing import Any, Generic, TypeVar, cast from fido2.webauthn import AuthenticatorAttachment @@ -318,7 +318,7 @@ def set_authn_action( post_authn_action: AuthnAcsAction = AuthnAcsAction.login, age: timedelta = timedelta(seconds=30), finish_url: str | None = None, - force_mfa: bool = False, + mock_mfa: bool = False, credentials_used: list[ElementKey] | None = None, ) -> None: if not finish_url: @@ -327,7 +327,7 @@ def set_authn_action( if credentials_used is None: credentials_used = [] - if force_mfa: + if mock_mfa: credentials_used = [ElementKey("mock_credential_one"), ElementKey("mock_credential_two")] with self.session_cookie(self.browser, eppn) as client: @@ -342,11 +342,25 @@ def set_authn_action( ) sess.authn.sp.authns[sp_authn_req.authn_id] = sp_authn_req - def add_security_key_to_user(self, eppn: str, keyhandle: str, token_type: str = "webauthn") -> U2F | Webauthn: + def setup_signup_authn(self, eppn: str | None = None, user_created_at: datetime | None = None) -> None: + if eppn is None: + eppn = self.test_user_eppn + if user_created_at is None: + user_created_at = utc_now() - timedelta(minutes=3) + # mock recent account creation from signup + with self.session_cookie(self.browser, eppn) as client: + with client.session_transaction() as sess: + sess.signup.user_created = True + sess.signup.user_created_at = user_created_at + + def add_security_key_to_user( + self, eppn: str, keyhandle: str, token_type: str = "webauthn", created_ts: datetime = utc_now() + ) -> U2F | Webauthn: user = self.app.central_userdb.get_user_by_eppn(eppn) mfa_token: U2F | Webauthn if token_type == "u2f": mfa_token = U2F( + created_ts=created_ts, version="test", keyhandle=keyhandle, public_key="test", @@ -357,6 +371,7 @@ def add_security_key_to_user(self, eppn: str, keyhandle: str, token_type: str = ) else: mfa_token = Webauthn( + created_ts=created_ts, keyhandle=keyhandle, credential_data="test", app_id="test", diff --git a/src/eduid/webapp/common/authn/utils.py b/src/eduid/webapp/common/authn/utils.py index 968425e2a..d325b683a 100644 --- a/src/eduid/webapp/common/authn/utils.py +++ b/src/eduid/webapp/common/authn/utils.py @@ -3,7 +3,9 @@ import os.path import sys from collections.abc import Sequence +from typing import TYPE_CHECKING, cast +from flask import current_app as flask_current_app from saml2 import server from saml2.config import SPConfig from saml2.typing import SAMLHttpArgs @@ -14,11 +16,19 @@ from eduid.common.utils import urlappend from eduid.userdb import User from eduid.userdb.credentials import Credential, FidoCredential +from eduid.webapp.common.api.messages import FluxData, need_authentication_response from eduid.webapp.common.api.schemas.authn_status import AuthnActionStatus from eduid.webapp.common.authn.session_info import SessionInfo from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import SP_AuthnRequest +if TYPE_CHECKING: + from eduid.webapp.common.authn.middleware import AuthnBaseApp + + current_app = cast(AuthnBaseApp, flask_current_app) +else: + current_app = flask_current_app + logger = logging.getLogger(__name__) @@ -127,7 +137,7 @@ def validate_authn_for_action( config: FrontendActionMixin, frontend_action: FrontendAction, user: User, - credential_used: Credential | None = None, + credential_requested: Credential | None = None, ) -> AuthnActionStatus: """ Validate the authentication for the given frontend action. @@ -136,6 +146,24 @@ def validate_authn_for_action( logger.debug(f"Validating authentication for frontend action {frontend_action}") authn, authn_params = get_authn_for_action(config=config, frontend_action=frontend_action) + if not authn and authn_params.allow_signup_auth: + # check if the user is just created in the process of signup + if session.signup.user_created is False or session.signup.user_created_at is None: + logger.info("No signup authentication found") + logger.debug(f"frontend action: {frontend_action}, allow_signup_auth={authn_params.allow_signup_auth}") + return AuthnActionStatus.NOT_FOUND + + if credential_requested and not credential_recently_used( + user=user, credential=credential_requested, action=authn, max_age=int(authn_params.max_age.total_seconds()) + ): + logger.info(f"Credential {credential_requested} has not been used recently") + return AuthnActionStatus.CREDENTIAL_NOT_RECENTLY_USED + + user_age = utc_now() - session.signup.user_created_at + if user_age < (authn_params.max_age + config.signup_auth_slack): + logger.info("Signup in progress, no authentication required") + return AuthnActionStatus.OK + if not authn or not authn.authn_instant: logger.info("No authentication found") logger.debug(f"frontend action: {frontend_action}, allow_login_auth={authn_params.allow_login_auth}") @@ -170,16 +198,17 @@ def validate_authn_for_action( logger.info(f"Expected at least 2 credentials got: {len(authn.credentials_used)}") return AuthnActionStatus.NO_MFA - if credential_used and not credential_recently_used( - credential=credential_used, action=authn, max_age=int(authn_params.max_age.total_seconds()) + if credential_requested and not credential_recently_used( + user=user, credential=credential_requested, action=authn, max_age=int(authn_params.max_age.total_seconds()) ): - logger.info(f"Credential {credential_used} has not been used recently") + logger.info(f"Credential {credential_requested} has not been used recently") return AuthnActionStatus.CREDENTIAL_NOT_RECENTLY_USED return AuthnActionStatus.OK -def credential_recently_used(credential: Credential, action: SP_AuthnRequest | None, max_age: int) -> bool: +def credential_recently_used(user: User, credential: Credential, action: SP_AuthnRequest | None, max_age: int) -> bool: + # check if the credential was used in an authentication action in the last max_age seconds logger.debug(f"Checking if credential {credential} has been used in the last {max_age} seconds") if action and credential.key in action.credentials_used: if action.authn_instant is not None: @@ -187,4 +216,41 @@ def credential_recently_used(credential: Credential, action: SP_AuthnRequest | N if 0 < age < max_age: logger.debug(f"Credential {credential} has been used recently") return True + + # check if the credential was added to the user in the last max_age seconds + logger.debug(f"Checking if credential {credential} has been added in the last {max_age} seconds") + if user_cred := user.credentials.find(key=credential.key): + age = (utc_now() - user_cred.created_ts).total_seconds() + if 0 < age < max_age: + logger.debug(f"Credential {credential} has been added recently") + return True + return False + + +def check_reauthn( + frontend_action: FrontendAction, user: User, credential_requested: Credential | None = None +) -> FluxData | None: + """Check if a re-authentication has been performed recently enough for this action""" + + # please mypy + conf = getattr(current_app, "conf", None) + if not isinstance(conf, FrontendActionMixin): + raise RuntimeError(f"Could not find conf in {current_app}") + + authn_status = validate_authn_for_action( + config=conf, frontend_action=frontend_action, credential_requested=credential_requested, user=user + ) + current_app.logger.debug(f"check_reauthn called with authn status {authn_status}") + if authn_status != AuthnActionStatus.OK: + if authn_status == AuthnActionStatus.STALE: + # count stale authentications to monitor if users need more time + current_app.stats.count(name=f"{frontend_action.value}_stale_reauthn", value=1) + payload = None + if credential_requested and isinstance(credential_requested, FidoCredential): + current_app.logger.debug(f"Need re-authentication with credential: {credential_requested}") + payload = {"credential_description": credential_requested.description} + return need_authentication_response(frontend_action=frontend_action, authn_status=authn_status, payload=payload) + + current_app.stats.count(name=f"{frontend_action.value}_successful_reauthn", value=1) + return None diff --git a/src/eduid/webapp/common/session/namespaces.py b/src/eduid/webapp/common/session/namespaces.py index a18ae245a..6e81a6e92 100644 --- a/src/eduid/webapp/common/session/namespaces.py +++ b/src/eduid/webapp/common/session/namespaces.py @@ -169,6 +169,7 @@ class Credentials(SessionNSBase): class Signup(TimestampedNS): user_created: bool = False + user_created_at: datetime | None = None name: Name = Field(default_factory=Name) email: EmailVerification = Field(default_factory=EmailVerification) invite: Invite = Field(default_factory=Invite) @@ -289,16 +290,9 @@ def authns_cleanup( return dict(items[:10]) return v - def _remove_get_authn_for_action( - self, action: AuthnAcsAction | EidasAcsAction | BankIDAcsAction - ) -> SP_AuthnRequest | None: - for authn in self.authns.values(): - if authn.post_authn_action == action: - return authn - return None - def get_authn_for_frontend_action(self, action: FrontendAction) -> SP_AuthnRequest | None: - for authn in self.authns.values(): + # sort authn actions by created_ts and return the first one (latest) that matches the action + for authn in sorted(self.authns.values(), reverse=True, key=lambda item: item.created_ts): if authn.frontend_action == action: return authn return None diff --git a/src/eduid/webapp/eidas/acs_actions.py b/src/eduid/webapp/eidas/acs_actions.py index f008181e6..be876d861 100644 --- a/src/eduid/webapp/eidas/acs_actions.py +++ b/src/eduid/webapp/eidas/acs_actions.py @@ -4,13 +4,14 @@ from eduid.webapp.common.api.messages import AuthnStatusMsg from eduid.webapp.common.authn.acs_enums import EidasAcsAction from eduid.webapp.common.authn.acs_registry import ACSArgs, ACSResult, acs_action +from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.common.proofing.messages import ProofingMsg from eduid.webapp.common.proofing.methods import ProofingMethodSAML from eduid.webapp.common.proofing.saml_helpers import authn_ctx_to_loa, is_required_loa, is_valid_authn_instant from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import SP_AuthnRequest from eduid.webapp.eidas.app import current_eidas_app as current_app -from eduid.webapp.eidas.helpers import EidasMsg, check_reauthn +from eduid.webapp.eidas.helpers import EidasMsg from eduid.webapp.eidas.proofing import get_proofing_functions from eduid.webapp.eidas.saml_session_info import BaseSessionInfo @@ -109,7 +110,9 @@ def verify_credential_action(user: User, args: ACSArgs) -> ACSResult: return ACSResult(message=EidasMsg.credential_not_found) # Check (again) if token was used to authenticate this session and that the auth is not stale. - _need_reauthn = check_reauthn(frontend_action=args.authn_req.frontend_action, user=user, credential_used=credential) + _need_reauthn = check_reauthn( + frontend_action=args.authn_req.frontend_action, user=user, credential_requested=credential + ) if _need_reauthn: current_app.logger.error(f"User needs to authenticate: {_need_reauthn}") return ACSResult(message=AuthnStatusMsg.must_authenticate) diff --git a/src/eduid/webapp/eidas/helpers.py b/src/eduid/webapp/eidas/helpers.py index 91d9af7f9..a01a47e56 100644 --- a/src/eduid/webapp/eidas/helpers.py +++ b/src/eduid/webapp/eidas/helpers.py @@ -7,15 +7,10 @@ from saml2.client import Saml2Client from saml2.typing import SAMLHttpArgs -from eduid.common.config.base import FrontendAction -from eduid.userdb import User -from eduid.userdb.credentials import Credential from eduid.userdb.credentials.external import TrustFramework from eduid.webapp.common.api.messages import TranslatableMsg -from eduid.webapp.common.api.schemas.authn_status import AuthnActionStatus from eduid.webapp.common.authn.cache import OutstandingQueriesCache from eduid.webapp.common.authn.session_info import SessionInfo -from eduid.webapp.common.authn.utils import validate_authn_for_action from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import AuthnRequestRef from eduid.webapp.eidas.app import current_eidas_app as current_app @@ -120,21 +115,3 @@ class CredentialVerifyResult: verified_ok: bool message: EidasMsg | None = None credential_description: str | None = None - - -def check_reauthn( - frontend_action: FrontendAction, user: User, credential_used: Credential | None = None -) -> AuthnActionStatus | None: - """Check if a re-authentication has been performed recently enough for this action""" - - authn_status = validate_authn_for_action( - config=current_app.conf, frontend_action=frontend_action, credential_used=credential_used, user=user - ) - current_app.logger.debug(f"check_reauthn called with authn status {authn_status}") - if authn_status != AuthnActionStatus.OK: - if authn_status == AuthnActionStatus.STALE: - # count stale authentications to monitor if users need more time - current_app.stats.count(name=f"{frontend_action.value}_stale_reauthn", value=1) - return authn_status - current_app.stats.count(name=f"{frontend_action.value}_successful_reauthn", value=1) - return None diff --git a/src/eduid/webapp/eidas/tests/test_app.py b/src/eduid/webapp/eidas/tests/test_app.py index efac7eda6..e3c6b4e15 100644 --- a/src/eduid/webapp/eidas/tests/test_app.py +++ b/src/eduid/webapp/eidas/tests/test_app.py @@ -3,6 +3,7 @@ import logging import os from collections.abc import Mapping +from datetime import timedelta from typing import Any from unittest import TestCase from unittest.mock import MagicMock, patch @@ -223,6 +224,7 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: "force_authn": True, "force_mfa": True, "allow_login_auth": True, + "allow_signup_auth": True, "finish_url": "http://test.localhost/testing-verify-credential/{app_name}/{authn_id}", }, }, @@ -361,7 +363,6 @@ def reauthn( identity=identity, logged_in=logged_in, method=method, - next_url=next_url, response_template=response_template, ) @@ -377,6 +378,7 @@ def verify_token( expect_error: bool = False, expect_saml_error: bool = False, identity: NinIdentity | EIDASIdentity | None = None, + logged_in: bool = True, method: str = "freja", response_template: str | None = None, verify_credential: ElementKey | None = None, @@ -392,6 +394,7 @@ def verify_token( expect_saml_error=expect_saml_error, frontend_action=frontend_action, identity=identity, + logged_in=logged_in, method=method, response_template=response_template, verify_credential=verify_credential, @@ -447,7 +450,6 @@ def _call_endpoint_and_saml_acs( expect_saml_error: bool = False, identity: NinIdentity | EIDASIdentity | None = None, logged_in: bool = True, - next_url: str | None = None, response_template: str | None = None, verify_credential: ElementKey | None = None, frontend_state: str | None = "This is a unit test", @@ -468,15 +470,17 @@ def _call_endpoint_and_saml_acs( assert isinstance(browser, CSRFTestClient) - browser_with_session_cookie = self.session_cookie(browser, eppn) - if not logged_in: + if logged_in: + browser_with_session_cookie = self.session_cookie(browser, eppn) + self.set_authn_action( + eppn=eppn, + frontend_action=FrontendAction.LOGIN, + credentials_used=credentials_used, + ) + else: browser_with_session_cookie = self.session_cookie_anon(browser) with browser_with_session_cookie as browser: - if credentials_used: - with browser.session_transaction() as sess: - self._setup_faked_login(session=sess, credentials_used=credentials_used) - if logged_in is False: with browser.session_transaction() as sess: # the user is at least partially logged in at this stage @@ -588,6 +592,55 @@ def test_webauthn_token_verify(self, mock_request_user_sync: MagicMock, mock_get self._verify_user_parameters(eppn, token_verified=True, num_proofings=1) + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_webauthn_token_verify_signup_authn(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + + credential = self.add_security_key_to_user(eppn, "test", "webauthn") + + self._verify_user_parameters(eppn) + + self.setup_signup_authn() + + self.verify_token( + endpoint="/verify-credential", + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + eppn=eppn, + expect_msg=EidasMsg.credential_verify_success, + credentials_used=[credential.key, ElementKey("other_id")], + verify_credential=credential.key, + logged_in=False, + ) + + self._verify_user_parameters(eppn, token_verified=True, num_proofings=1) + + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_webauthn_token_verify_signup_authn_token_to_old(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_user.eppn + credential = self.add_security_key_to_user( + eppn=eppn, keyhandle="test", token_type="webauthn", created_ts=utc_now() + timedelta(minutes=6) + ) + self._verify_user_parameters(eppn) + + self.setup_signup_authn(eppn=eppn) + + with self.session_cookie(self.browser, eppn) as browser: + location = self._get_authn_redirect_url( + browser=browser, + endpoint="/verify-credential", + method="bankid", + frontend_action=FrontendAction.VERIFY_CREDENTIAL, + verify_credential=credential.key, + expect_success=False, + ) + assert location is None + + self._verify_user_parameters(eppn, token_verified=False, num_proofings=0) + def test_mfa_token_verify_wrong_verified_nin(self) -> None: eppn = self.test_user.eppn nin = self.test_user_wrong_nin @@ -801,6 +854,40 @@ def test_nin_verify(self, mock_request_user_sync: MagicMock, mock_get_all_navet_ assert doc["given_name"] == "Ûlla" assert doc["surname"] == "Älm" + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") + def test_nin_verify_signup_auth(self, mock_request_user_sync: MagicMock) -> None: + mock_request_user_sync.side_effect = self.request_user_sync + + eppn = self.test_unverified_user_eppn + self._verify_user_parameters(eppn, num_mfa_tokens=0, identity_verified=False) + + self.setup_signup_authn(eppn=eppn) + + self.reauthn( + "/verify-identity", + frontend_action=FrontendAction.VERIFY_IDENTITY, + expect_msg=EidasMsg.identity_verify_success, + eppn=eppn, + logged_in=False, + ) + user = self.app.central_userdb.get_user_by_eppn(eppn) + self._verify_user_parameters( + eppn, + num_mfa_tokens=0, + identity_verified=True, + num_proofings=1, + locked_identity=user.identities.nin, + proofing_method=IdentityProofingMethod.SWEDEN_CONNECT, + proofing_version=self.app.conf.freja_proofing_version, + ) + # check names + assert user.given_name == "Ûlla" + assert user.surname == "Älm" + # check proofing log + doc = self.app.proofing_log._get_documents_by_attr(attr="eduPersonPrincipalName", value=eppn)[0] + assert doc["given_name"] == "Ûlla" + assert doc["surname"] == "Älm" + @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") def test_mfa_login(self, mock_request_user_sync: MagicMock) -> None: mock_request_user_sync.side_effect = self.request_user_sync diff --git a/src/eduid/webapp/eidas/views.py b/src/eduid/webapp/eidas/views.py index c0b9d7dcf..55639545c 100644 --- a/src/eduid/webapp/eidas/views.py +++ b/src/eduid/webapp/eidas/views.py @@ -17,7 +17,6 @@ FluxData, TranslatableMsg, error_response, - need_authentication_response, success_response, ) from eduid.webapp.common.api.schemas.authn_status import StatusRequestSchema, StatusResponseSchema @@ -25,13 +24,13 @@ from eduid.webapp.common.authn.acs_enums import EidasAcsAction from eduid.webapp.common.authn.acs_registry import ACSArgs, get_action from eduid.webapp.common.authn.eduid_saml2 import process_assertion -from eduid.webapp.common.authn.utils import get_location +from eduid.webapp.common.authn.utils import check_reauthn, get_location from eduid.webapp.common.proofing.methods import ProofingMethodSAML, get_proofing_method from eduid.webapp.common.proofing.saml_helpers import create_metadata from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import AuthnRequestRef, SP_AuthnRequest from eduid.webapp.eidas.app import current_eidas_app as current_app -from eduid.webapp.eidas.helpers import EidasMsg, attribute_remap, check_reauthn, create_authn_info +from eduid.webapp.eidas.helpers import EidasMsg, attribute_remap, create_authn_info from eduid.webapp.eidas.schemas import ( EidasCommonRequestSchema, EidasCommonResponseSchema, @@ -94,14 +93,9 @@ def verify_credential( current_app.logger.error(f"Can't find credential with id: {credential_id}") return error_response(message=EidasMsg.credential_not_found) - _need_reauthn = check_reauthn(frontend_action=_frontend_action, user=user, credential_used=credential) + _need_reauthn = check_reauthn(frontend_action=_frontend_action, user=user, credential_requested=credential) if _need_reauthn: - current_app.logger.debug(f"Need re-authentication for credential: {credential_id}") - return need_authentication_response( - frontend_action=_frontend_action, - authn_status=_need_reauthn, - payload={"credential_description": credential.description}, # type: ignore[attr-defined] - ) + return _need_reauthn result = _authn( EidasAcsAction.verify_credential, diff --git a/src/eduid/webapp/personal_data/helpers.py b/src/eduid/webapp/personal_data/helpers.py index b9bc996b6..d59479b37 100644 --- a/src/eduid/webapp/personal_data/helpers.py +++ b/src/eduid/webapp/personal_data/helpers.py @@ -2,12 +2,7 @@ import re from enum import unique -from eduid.common.config.base import FrontendAction -from eduid.userdb import User -from eduid.webapp.common.api.messages import FluxData, TranslatableMsg, need_authentication_response -from eduid.webapp.common.api.schemas.authn_status import AuthnActionStatus -from eduid.webapp.common.authn.utils import validate_authn_for_action -from eduid.webapp.personal_data.app import current_pdata_app as current_app +from eduid.webapp.common.api.messages import TranslatableMsg logger = logging.getLogger(__name__) @@ -69,17 +64,3 @@ def is_valid_chosen_given_name(given_name: str | None = None, chosen_given_name: logger.debug(f"Allowed parts: {given_name_parts}") logger.debug(f"Extra characters in display name: {chosen_given_name_parts}") return False - - -def check_reauthn(frontend_action: FrontendAction, user: User) -> FluxData | None: - """Check if a re-authentication has been performed recently enough for this action""" - - authn_status = validate_authn_for_action(config=current_app.conf, frontend_action=frontend_action, user=user) - current_app.logger.debug(f"check_reauthn called with authn status {authn_status}") - if authn_status != AuthnActionStatus.OK: - if authn_status == AuthnActionStatus.STALE: - # count stale authentications to monitor if users need more time - current_app.stats.count(name=f"{frontend_action.value}_stale_reauthn", value=1) - return need_authentication_response(frontend_action=frontend_action, authn_status=authn_status) - current_app.stats.count(name=f"{frontend_action.value}_successful_reauthn", value=1) - return None diff --git a/src/eduid/webapp/personal_data/views.py b/src/eduid/webapp/personal_data/views.py index 9a0bdd2bd..9658eb328 100644 --- a/src/eduid/webapp/personal_data/views.py +++ b/src/eduid/webapp/personal_data/views.py @@ -8,8 +8,9 @@ from eduid.webapp.common.api.decorators import MarshalWith, UnmarshalWith, require_user from eduid.webapp.common.api.messages import CommonMsg, FluxData, error_response, success_response from eduid.webapp.common.api.utils import save_and_sync_user +from eduid.webapp.common.authn.utils import check_reauthn from eduid.webapp.personal_data.app import current_pdata_app as current_app -from eduid.webapp.personal_data.helpers import PDataMsg, check_reauthn, is_valid_chosen_given_name +from eduid.webapp.personal_data.helpers import PDataMsg, is_valid_chosen_given_name from eduid.webapp.personal_data.schemas import ( AllDataResponseSchema, IdentitiesResponseSchema, diff --git a/src/eduid/webapp/security/helpers.py b/src/eduid/webapp/security/helpers.py index 4bc304b0b..70b4ed2a9 100644 --- a/src/eduid/webapp/security/helpers.py +++ b/src/eduid/webapp/security/helpers.py @@ -6,7 +6,7 @@ from fido_mds.models.webauthn import AttestationFormat -from eduid.common.config.base import EduidEnvironment, FrontendAction +from eduid.common.config.base import EduidEnvironment from eduid.common.misc.timeutil import utc_now from eduid.common.rpc.msg_relay import FullPostalAddress, NavetData from eduid.common.utils import generate_password @@ -18,10 +18,8 @@ from eduid.userdb.security import SecurityUser from eduid.userdb.user import User from eduid.webapp.common.api.helpers import set_user_names_from_official_address -from eduid.webapp.common.api.messages import FluxData, TranslatableMsg, need_authentication_response -from eduid.webapp.common.api.schemas.authn_status import AuthnActionStatus +from eduid.webapp.common.api.messages import TranslatableMsg from eduid.webapp.common.api.translation import get_user_locale -from eduid.webapp.common.authn.utils import validate_authn_for_action from eduid.webapp.security.app import current_security_app as current_app from eduid.webapp.security.webauthn_proofing import AuthenticatorInformation, is_authenticator_mfa_approved @@ -195,20 +193,6 @@ def send_termination_mail(user: User) -> None: current_app.logger.info(f"Sent termination mail to user {user} to address {email}.") -def check_reauthn(frontend_action: FrontendAction, user: User) -> FluxData | None: - """Check if a re-authentication has been performed recently enough for this action""" - - authn_status = validate_authn_for_action(config=current_app.conf, frontend_action=frontend_action, user=user) - current_app.logger.debug(f"check_reauthn called with authn status {authn_status}") - if authn_status != AuthnActionStatus.OK: - if authn_status == AuthnActionStatus.STALE: - # count stale authentications to monitor if users need more time - current_app.stats.count(name=f"{frontend_action.value}_stale_reauthn", value=1) - return need_authentication_response(frontend_action=frontend_action, authn_status=authn_status) - current_app.stats.count(name=f"{frontend_action.value}_successful_reauthn", value=1) - return None - - def update_user_official_name(security_user: SecurityUser, navet_data: NavetData) -> bool: # please mypy if security_user.identities.nin is None: diff --git a/src/eduid/webapp/security/settings/common.py b/src/eduid/webapp/security/settings/common.py index 79f054c9d..ce1bb1c7b 100644 --- a/src/eduid/webapp/security/settings/common.py +++ b/src/eduid/webapp/security/settings/common.py @@ -1,6 +1,6 @@ from datetime import timedelta -from fido2.webauthn import AttestationConveyancePreference +from fido2.webauthn import AttestationConveyancePreference, UserVerificationRequirement from fido_mds.models.fido_mds import AuthenticatorStatus from pydantic import Field @@ -45,6 +45,7 @@ class SecurityConfig( webauthn_proofing_version: str = Field(default="2022v1") webauthn_max_allowed_tokens: int = 10 webauthn_attestation: AttestationConveyancePreference | None = None + webauthn_user_verification: UserVerificationRequirement = UserVerificationRequirement.PREFERRED webauthn_allowed_user_verification_methods: list[str] = Field( default=[ "faceprint_internal", diff --git a/src/eduid/webapp/security/tests/test_app.py b/src/eduid/webapp/security/tests/test_app.py index 3a53bd27e..c176155be 100644 --- a/src/eduid/webapp/security/tests/test_app.py +++ b/src/eduid/webapp/security/tests/test_app.py @@ -7,6 +7,7 @@ from werkzeug.test import TestResponse from eduid.common.config.base import FrontendAction +from eduid.common.misc.timeutil import utc_now from eduid.common.rpc.msg_relay import DeregisteredCauseCode, DeregistrationInformation, NavetData, OfficialAddress from eduid.userdb import User from eduid.userdb.element import ElementKey @@ -618,7 +619,7 @@ def test_authn_status_no_mfa(self) -> None: def test_authn_status_credential_not_existing(self) -> None: frontend_action = FrontendAction.VERIFY_CREDENTIAL - self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, force_mfa=True) + self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, mock_mfa=True) response = self._get_authn_status(frontend_action=frontend_action, credential_id="none_existing_credential_id") self._check_error_response( response=response, @@ -628,8 +629,10 @@ def test_authn_status_credential_not_existing(self) -> None: def test_authn_status_credential_not_used(self) -> None: frontend_action = FrontendAction.VERIFY_CREDENTIAL - credential = self.add_security_key_to_user(self.test_user_eppn, keyhandle="keyhandle_1") - self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, force_mfa=True) + credential = self.add_security_key_to_user( + self.test_user_eppn, keyhandle="keyhandle_1", created_ts=utc_now() - timedelta(days=1) + ) + self.set_authn_action(eppn=self.test_user_eppn, frontend_action=frontend_action, mock_mfa=True) response = self._get_authn_status(frontend_action=frontend_action, credential_id=credential.key) self._check_success_response( response=response, diff --git a/src/eduid/webapp/security/tests/test_webauthn.py b/src/eduid/webapp/security/tests/test_webauthn.py index 48d31d95c..432cfd428 100644 --- a/src/eduid/webapp/security/tests/test_webauthn.py +++ b/src/eduid/webapp/security/tests/test_webauthn.py @@ -4,13 +4,17 @@ from typing import Any from unittest.mock import MagicMock, patch -from fido2.webauthn import AttestationObject, AuthenticatorAttachment, CollectedClientData +from fido2.webauthn import AttestationObject, AuthenticatorAttachment, CollectedClientData, UserVerificationRequirement from fido_mds import FidoMetadataStore +from future.backports.datetime import timedelta from werkzeug.http import dump_cookie +from werkzeug.test import TestResponse from eduid.common.config.base import EduidEnvironment, FrontendAction +from eduid.common.misc.timeutil import utc_now from eduid.userdb.credentials import U2F, FidoCredential, Webauthn from eduid.userdb.testing import SetupConfig +from eduid.webapp.common.api.schemas.authn_status import AuthnActionStatus from eduid.webapp.common.api.testing import CSRFTestClient, EduidAPITestCase from eduid.webapp.common.session import EduidSession from eduid.webapp.common.session.namespaces import WebauthnRegistration, WebauthnState @@ -167,12 +171,19 @@ def _add_u2f_token_to_user(self, eppn: str) -> U2F: self.app.central_userdb.save(user) return u2f_token + @staticmethod + def _response_json_to_dict(response: TestResponse) -> dict[Any, Any]: + data = response.json + assert data is not None, "No json data returned" + assert isinstance(data, dict) is True, "returned json is not a dict" + return data + def _check_session_state(self, client: CSRFTestClient) -> None: with client.session_transaction() as sess: assert isinstance(sess, EduidSession) assert sess.security.webauthn_registration is not None webauthn_state = sess.security.webauthn_registration.webauthn_state - assert webauthn_state["user_verification"] == "discouraged" + assert webauthn_state["user_verification"] == UserVerificationRequirement.PREFERRED.value assert "challenge" in webauthn_state def _check_registration_begun(self, data: dict) -> None: @@ -200,7 +211,8 @@ def _begin_register_key( existing_legacy_token: bool = False, csrf: str | None = None, check_session: bool = True, - ) -> dict: + setup_authn_action: bool = True, + ) -> TestResponse: """ Start process to register a webauthn token for the test user, possibly adding U2F or webauthn credentials before. @@ -217,11 +229,12 @@ def _begin_register_key( # Fake that user used the other security key to authenticate force_mfa = True - self.set_authn_action( - eppn=self.test_user_eppn, - frontend_action=FrontendAction.ADD_SECURITY_KEY_AUTHN, - force_mfa=force_mfa, - ) + if setup_authn_action: + self.set_authn_action( + eppn=self.test_user_eppn, + frontend_action=FrontendAction.ADD_SECURITY_KEY_AUTHN, + mock_mfa=force_mfa, + ) if existing_legacy_token: self._add_u2f_token_to_user(self.test_user_eppn) @@ -245,7 +258,7 @@ def _begin_register_key( if check_session: self._check_session_state(client) - return json.loads(response2.data) + return response2 @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") def _finish_register_key( @@ -257,7 +270,8 @@ def _finish_register_key( cred_id: bytes, existing_legacy_token: bool = False, csrf: str | None = None, - ) -> dict: + set_authn_action: bool = True, + ) -> TestResponse: """ Finish registering a webauthn token. @@ -275,11 +289,12 @@ def _finish_register_key( # Fake that user used the other security key to authenticate force_mfa = True - self.set_authn_action( - eppn=self.test_user_eppn, - frontend_action=FrontendAction.ADD_SECURITY_KEY_AUTHN, - force_mfa=force_mfa, - ) + if set_authn_action: + self.set_authn_action( + eppn=self.test_user_eppn, + frontend_action=FrontendAction.ADD_SECURITY_KEY_AUTHN, + mock_mfa=force_mfa, + ) if existing_legacy_token: self._add_u2f_token_to_user(self.test_user_eppn) @@ -306,7 +321,7 @@ def _finish_register_key( response2 = client.post( "/webauthn/register/complete", data=json.dumps(data), content_type=self.content_type_json ) - return json.loads(response2.data) + return response2 @patch("eduid.common.rpc.am_relay.AmRelay.request_user_sync") def _remove( @@ -320,7 +335,7 @@ def _remove( state_2: dict, existing_legacy_token: bool = False, csrf: str | None = None, - ) -> tuple[Webauthn, dict]: + ) -> tuple[Webauthn, TestResponse]: """ Send a POST request to remove a webauthn credential from the test user. Before sending the request, add 2 webauthn credentials (and possibly a legacy u2f credential) to the test user. @@ -353,7 +368,7 @@ def _remove( "credential_key": user_token.key, } response2 = client.post("/webauthn/remove", json=data) - return user_token, json.loads(response2.data) + return user_token, response2 def _apple_special_verify_attestation( self: FidoMetadataStore, attestation: Attestation, client_data: bytes @@ -379,106 +394,141 @@ def test_begin_no_login(self) -> None: self.assertEqual(response.status_code, 401) def test_begin_register_first_key(self) -> None: - data = self._begin_register_key() - self._check_registration_begun(data) + response = self._begin_register_key() + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_first_key_with_legacy_token(self) -> None: - data = self._begin_register_key(existing_legacy_token=True) - self._check_registration_begun(data) + response = self._begin_register_key(existing_legacy_token=True) + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_key_ater_ctap1(self) -> None: - data = self._begin_register_key(other="ctap1") - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap1") + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_key_ater_ctap1_with_legacy_token(self) -> None: - data = self._begin_register_key(other="ctap1", existing_legacy_token=True) - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap1", existing_legacy_token=True) + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_key_ater_ctap2(self) -> None: - data = self._begin_register_key(other="ctap2") - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap2") + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_key_ater_ctap2_with_legacy_token(self) -> None: - data = self._begin_register_key(other="ctap2", existing_legacy_token=True) - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap2", existing_legacy_token=True) + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_first_device(self) -> None: - data = self._begin_register_key(authenticator="platform") - self._check_registration_begun(data) + response = self._begin_register_key(authenticator="platform") + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_first_device_with_legacy_token(self) -> None: - data = self._begin_register_key(authenticator="platform", existing_legacy_token=True) - self._check_registration_begun(data) + response = self._begin_register_key(authenticator="platform", existing_legacy_token=True) + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_device_ater_ctap1(self) -> None: - data = self._begin_register_key(other="ctap1", authenticator="platform") - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap1", authenticator="platform") + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_device_ater_ctap1_with_legacy_token(self) -> None: - data = self._begin_register_key(other="ctap1", authenticator="platform", existing_legacy_token=True) - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap1", authenticator="platform", existing_legacy_token=True) + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_device_ater_ctap2(self) -> None: - data = self._begin_register_key(other="ctap2", authenticator="platform") - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap2", authenticator="platform") + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_2nd_device_ater_ctap2_with_legacy_token(self) -> None: - data = self._begin_register_key(other="ctap2", authenticator="platform", existing_legacy_token=True) - self._check_registration_begun(data) + response = self._begin_register_key(other="ctap2", authenticator="platform", existing_legacy_token=True) + self._check_registration_begun(self._response_json_to_dict(response)) + + def test_begin_register_first_key_signup_authn(self) -> None: + self.setup_signup_authn() + response = self._begin_register_key(setup_authn_action=False) + self._check_registration_begun(self._response_json_to_dict(response)) + + def test_begin_register_first_key_signup_authn_to_old(self) -> None: + self.setup_signup_authn(user_created_at=utc_now() - timedelta(minutes=10)) + response = self._begin_register_key(check_session=False, setup_authn_action=False) + self._check_must_authenticate_response( + response=response, + type_="POST_WEBAUTHN_WEBAUTHN_REGISTER_BEGIN_FAIL", + frontend_action=FrontendAction.ADD_SECURITY_KEY_AUTHN, + authn_status=AuthnActionStatus.NOT_FOUND, + ) + + def test_begin_register_2nd_key_ater_ctap2_signup_authn(self) -> None: + self.setup_signup_authn() + response = self._begin_register_key(other="ctap2", setup_authn_action=False) + self._check_registration_begun(self._response_json_to_dict(response)) def test_begin_register_wrong_csrf_token(self) -> None: - data = self._begin_register_key(csrf="wrong-token", check_session=False) + response = self._begin_register_key(csrf="wrong-token", check_session=False) + data = self._response_json_to_dict(response) self.assertEqual(data["type"], "POST_WEBAUTHN_WEBAUTHN_REGISTER_BEGIN_FAIL") self.assertEqual(data["payload"]["error"]["csrf_token"], ["CSRF failed to validate"]) def test_finish_register_ctap1(self) -> None: - data = self._finish_register_key( + response = self._finish_register_key( client_data=CLIENT_DATA_JSON, attestation=ATTESTATION_OBJECT, state=STATE, cred_id=CREDENTIAL_ID ) - self._check_registration_complete(data) + self._check_registration_complete(self._response_json_to_dict(response)) # check that a proofing element was not written as the token is not mfa capable assert self.app.proofing_log.db_count() == 0 def test_finish_register_ctap1_with_legacy_token(self) -> None: - data = self._finish_register_key( + response = self._finish_register_key( client_data=CLIENT_DATA_JSON, attestation=ATTESTATION_OBJECT, state=STATE, cred_id=CREDENTIAL_ID, existing_legacy_token=True, ) - self._check_registration_complete(data) + self._check_registration_complete(self._response_json_to_dict(response)) # check that a proofing element was not written as the token is not mfa capable assert self.app.proofing_log.db_count() == 0 def test_finish_register_ctap2(self) -> None: - data = self._finish_register_key( + response = self._finish_register_key( client_data=CLIENT_DATA_JSON_2, attestation=ATTESTATION_OBJECT_2, state=STATE_2, cred_id=CREDENTIAL_ID_2 ) - self._check_registration_complete(data) + self._check_registration_complete(self._response_json_to_dict(response)) # check that a proofing element was written as the token is mfa capable assert self.app.proofing_log.db_count() == 1 def test_finish_register_ctap2_with_legacy_token(self) -> None: - data = self._finish_register_key( + response = self._finish_register_key( client_data=CLIENT_DATA_JSON_2, attestation=ATTESTATION_OBJECT_2, state=STATE_2, cred_id=CREDENTIAL_ID_2, existing_legacy_token=True, ) - self._check_registration_complete(data) + self._check_registration_complete(self._response_json_to_dict(response)) + # check that a proofing element was written as the token is mfa capable + assert self.app.proofing_log.db_count() == 1 + + def test_finish_register_ctap2_signup_authn(self) -> None: + self.setup_signup_authn() + response = self._finish_register_key( + set_authn_action=False, + client_data=CLIENT_DATA_JSON_2, + attestation=ATTESTATION_OBJECT_2, + state=STATE_2, + cred_id=CREDENTIAL_ID_2, + ) + self._check_registration_complete(self._response_json_to_dict(response)) # check that a proofing element was written as the token is mfa capable assert self.app.proofing_log.db_count() == 1 def test_finish_register_wrong_csrf(self) -> None: - data = self._finish_register_key( + response = self._finish_register_key( client_data=CLIENT_DATA_JSON, attestation=ATTESTATION_OBJECT, state=STATE, cred_id=CREDENTIAL_ID, csrf="wrong-token", ) + data = self._response_json_to_dict(response) self.assertEqual(data["type"], "POST_WEBAUTHN_WEBAUTHN_REGISTER_COMPLETE_FAIL") self.assertEqual(data["payload"]["error"]["csrf_token"], ["CSRF failed to validate"]) @@ -486,10 +536,10 @@ def test_remove_ctap1(self) -> None: self.set_authn_action( eppn=self.test_user_eppn, frontend_action=FrontendAction.REMOVE_SECURITY_KEY_AUTHN, - force_mfa=True, + mock_mfa=True, ) - user_token, data = self._remove( + user_token, response = self._remove( client_data=CLIENT_DATA_JSON, attestation=ATTESTATION_OBJECT, state=STATE, @@ -497,16 +547,16 @@ def test_remove_ctap1(self) -> None: attestation_2=ATTESTATION_OBJECT_2, state_2=STATE_2, ) - self._check_removal(data, user_token) + self._check_removal(self._response_json_to_dict(response), user_token) def test_remove_ctap1_with_legacy_token(self) -> None: self.set_authn_action( eppn=self.test_user_eppn, frontend_action=FrontendAction.REMOVE_SECURITY_KEY_AUTHN, - force_mfa=True, + mock_mfa=True, ) - user_token, data = self._remove( + user_token, response = self._remove( client_data=CLIENT_DATA_JSON, attestation=ATTESTATION_OBJECT, state=STATE, @@ -515,16 +565,16 @@ def test_remove_ctap1_with_legacy_token(self) -> None: state_2=STATE_2, existing_legacy_token=True, ) - self._check_removal(data, user_token) + self._check_removal(self._response_json_to_dict(response), user_token) def test_remove_ctap2(self) -> None: self.set_authn_action( eppn=self.test_user_eppn, frontend_action=FrontendAction.REMOVE_SECURITY_KEY_AUTHN, - force_mfa=True, + mock_mfa=True, ) - user_token, data = self._remove( + user_token, response = self._remove( client_data=CLIENT_DATA_JSON_2, attestation=ATTESTATION_OBJECT_2, state=STATE_2, @@ -532,16 +582,16 @@ def test_remove_ctap2(self) -> None: attestation_2=ATTESTATION_OBJECT, state_2=STATE, ) - self._check_removal(data, user_token) + self._check_removal(self._response_json_to_dict(response), user_token) def test_remove_ctap2_legacy_token(self) -> None: self.set_authn_action( eppn=self.test_user_eppn, frontend_action=FrontendAction.REMOVE_SECURITY_KEY_AUTHN, - force_mfa=True, + mock_mfa=True, ) - user_token, data = self._remove( + user_token, response = self._remove( client_data=CLIENT_DATA_JSON_2, attestation=ATTESTATION_OBJECT_2, state=STATE_2, @@ -550,7 +600,7 @@ def test_remove_ctap2_legacy_token(self) -> None: state_2=STATE, existing_legacy_token=True, ) - self._check_removal(data, user_token) + self._check_removal(self._response_json_to_dict(response), user_token) def test_remove_wrong_csrf(self) -> None: self.set_authn_action( @@ -558,7 +608,7 @@ def test_remove_wrong_csrf(self) -> None: frontend_action=FrontendAction.REMOVE_SECURITY_KEY_AUTHN, ) - _, data = self._remove( + _, response = self._remove( client_data=CLIENT_DATA_JSON, attestation=ATTESTATION_OBJECT, state=STATE, @@ -567,6 +617,7 @@ def test_remove_wrong_csrf(self) -> None: state_2=STATE_2, csrf="wrong-csrf", ) + data = self._response_json_to_dict(response) self.assertEqual(data["type"], "POST_WEBAUTHN_WEBAUTHN_REMOVE_FAIL") self.assertEqual(data["payload"]["error"]["csrf_token"], ["CSRF failed to validate"]) diff --git a/src/eduid/webapp/security/views/change_password.py b/src/eduid/webapp/security/views/change_password.py index 76ad33b7d..f1e13393b 100644 --- a/src/eduid/webapp/security/views/change_password.py +++ b/src/eduid/webapp/security/views/change_password.py @@ -9,13 +9,12 @@ from eduid.webapp.common.api.messages import CommonMsg, FluxData, error_response, success_response from eduid.webapp.common.api.utils import check_password_hash, get_zxcvbn_terms, hash_password, save_and_sync_user from eduid.webapp.common.api.validation import is_valid_password -from eduid.webapp.common.authn.utils import get_authn_for_action +from eduid.webapp.common.authn.utils import check_reauthn, get_authn_for_action from eduid.webapp.common.authn.vccs import change_password from eduid.webapp.common.session import session from eduid.webapp.security.app import current_security_app as current_app from eduid.webapp.security.helpers import ( SecurityMsg, - check_reauthn, compile_credential_list, generate_suggested_password, ) diff --git a/src/eduid/webapp/security/views/security.py b/src/eduid/webapp/security/views/security.py index c62adfade..aa08018fa 100644 --- a/src/eduid/webapp/security/views/security.py +++ b/src/eduid/webapp/security/views/security.py @@ -16,14 +16,13 @@ from eduid.webapp.common.api.messages import CommonMsg, FluxData, error_response, success_response from eduid.webapp.common.api.schemas.csrf import EmptyRequest from eduid.webapp.common.api.utils import save_and_sync_user -from eduid.webapp.common.authn.utils import get_authn_for_action, validate_authn_for_action +from eduid.webapp.common.authn.utils import check_reauthn, get_authn_for_action, validate_authn_for_action from eduid.webapp.common.authn.vccs import revoke_all_credentials from eduid.webapp.common.session import session from eduid.webapp.security.app import current_security_app as current_app from eduid.webapp.security.helpers import ( CredentialInfo, SecurityMsg, - check_reauthn, compile_credential_list, remove_identity_from_user, remove_nin_from_user, @@ -271,6 +270,6 @@ def check_authn_status(user: User, frontend_action: str, credential_id: ElementK return error_response(message=SecurityMsg.frontend_action_not_supported) authn_status = validate_authn_for_action( - config=current_app.conf, frontend_action=_frontend_action, user=user, credential_used=credential + config=current_app.conf, frontend_action=_frontend_action, user=user, credential_requested=credential ) return success_response(payload={"authn_status": authn_status.value}) diff --git a/src/eduid/webapp/security/views/webauthn.py b/src/eduid/webapp/security/views/webauthn.py index 484b7a212..38ef36acd 100644 --- a/src/eduid/webapp/security/views/webauthn.py +++ b/src/eduid/webapp/security/views/webauthn.py @@ -11,7 +11,6 @@ AuthenticatorData, CollectedClientData, PublicKeyCredentialUserEntity, - UserVerificationRequirement, ) from fido_mds.exceptions import AttestationVerificationError, MetadataValidationError from flask import Blueprint @@ -27,13 +26,12 @@ from eduid.webapp.common.api.messages import CommonMsg, FluxData, error_response, success_response from eduid.webapp.common.api.schemas.base import FluxStandardAction from eduid.webapp.common.api.utils import save_and_sync_user -from eduid.webapp.common.authn.utils import get_authn_for_action +from eduid.webapp.common.authn.utils import check_reauthn, get_authn_for_action from eduid.webapp.common.session import session from eduid.webapp.common.session.namespaces import WebauthnRegistration from eduid.webapp.security.app import current_security_app as current_app from eduid.webapp.security.helpers import ( SecurityMsg, - check_reauthn, compile_credential_list, get_approved_security_keys, ) @@ -96,6 +94,7 @@ def registration_begin(user: User, authenticator: str) -> FluxData: _auth_enum = AuthenticatorAttachment(authenticator) except ValueError: return error_response(message=SecurityMsg.invalid_authenticator) + user_webauthn_tokens = user.credentials.filter(FidoCredential) if len(user_webauthn_tokens) >= current_app.conf.webauthn_max_allowed_tokens: current_app.logger.error( @@ -118,7 +117,7 @@ def registration_begin(user: User, authenticator: str) -> FluxData: registration_data, state = server.register_begin( user=user_entity, credentials=creds, - user_verification=UserVerificationRequirement.DISCOURAGED, + user_verification=current_app.conf.webauthn_user_verification, authenticator_attachment=_auth_enum, ) session.security.webauthn_registration = WebauthnRegistration(webauthn_state=state, authenticator=_auth_enum) diff --git a/src/eduid/webapp/signup/views.py b/src/eduid/webapp/signup/views.py index 34092d977..fd1add91e 100644 --- a/src/eduid/webapp/signup/views.py +++ b/src/eduid/webapp/signup/views.py @@ -321,6 +321,7 @@ def create_user(use_suggested_password: bool, use_webauthn: bool, custom_passwor return error_response(message=CommonMsg.out_of_sync) session.signup.user_created = True + session.signup.user_created_at = utc_now() session.signup.credentials.completed = True session.common.eppn = signup_user.eppn # create payload before clearing generated password