diff --git a/src/eduid/webapp/common/api/testing.py b/src/eduid/webapp/common/api/testing.py index 95b62aede..2a64a1ae6 100644 --- a/src/eduid/webapp/common/api/testing.py +++ b/src/eduid/webapp/common/api/testing.py @@ -74,7 +74,9 @@ "xmlsec": {"level": "INFO"}, "urllib3": {"level": "INFO"}, "pymongo.serverSelection": {"level": "INFO"}, + "pymongo.connection": {"level": "INFO"}, "pymongo.command": {"level": "INFO"}, + "pymongo.topology": {"level": "INFO"}, "eduid.webapp.common.session": {"level": "INFO"}, "eduid.userdb.userdb.extra_debug": {"level": "INFO"}, "eduid.userdb.db.extra_debug": {"level": "INFO"}, diff --git a/src/eduid/webapp/idp/login_context.py b/src/eduid/webapp/idp/login_context.py index b63847a7e..c5508a058 100644 --- a/src/eduid/webapp/idp/login_context.py +++ b/src/eduid/webapp/idp/login_context.py @@ -250,7 +250,7 @@ def request_id(self) -> str | None: @property def authn_contexts(self) -> list[str]: - if not self.other_device_req.device1.authn_context: + if self.other_device_req.device1.authn_context is None: return [] return [str(self.other_device_req.device1.authn_context)] diff --git a/src/eduid/webapp/idp/tests/test_api.py b/src/eduid/webapp/idp/tests/test_api.py index 33f78474a..359269e1c 100644 --- a/src/eduid/webapp/idp/tests/test_api.py +++ b/src/eduid/webapp/idp/tests/test_api.py @@ -23,7 +23,7 @@ from eduid.userdb.mail import MailAddress from eduid.userdb.testing import SetupConfig from eduid.userdb.user import User -from eduid.webapp.common.api.testing import EduidAPITestCase +from eduid.webapp.common.api.testing import CSRFTestClient, EduidAPITestCase from eduid.webapp.common.authn.cache import IdentityCache, OutstandingQueriesCache, StateCache from eduid.webapp.common.authn.utils import get_saml2_config from eduid.webapp.common.session.namespaces import AuthnRequestRef, PySAML2Dicts @@ -35,7 +35,6 @@ __author__ = "ft" - logger = logging.getLogger(__name__) @@ -55,6 +54,16 @@ class PwAuthResult(GenericResult): cookies: dict[str, Any] = field(default_factory=dict) +@dataclass +class OtherDevice1Result(PwAuthResult): + pass + + +@dataclass +class OtherDevice2Result(PwAuthResult): + pass + + @dataclass class TouResult(GenericResult): pass @@ -78,7 +87,7 @@ class TestUser: @dataclass class LoginResultAPI: - response: TestResponse + response: TestResponse | None = None ref: str | None = None sso_cookie_val: SSOSessionId | None = None visit_count: dict[str, int] = field(default_factory=dict) @@ -86,6 +95,7 @@ class LoginResultAPI: pwauth_result: PwAuthResult | None = None tou_result: TouResult | None = None mfa_result: MfaResult | None = None + other_device1_result: OtherDevice1Result | None = None finished_result: FinishedResultAPI | None = None error: dict[str, Any] | None = None @@ -130,13 +140,40 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]: "login_bundle_url": "https://idp.eduid.docker/test-bundle", "tou_version": "2016-v1", "default_eppn_scope": "test.scope", + "allow_other_device_logins": True, + "other_device_url": "http://test.localhost/login/other", "other_device_secret_key": "lx0sg0g21QUkiu9JAPfhx4hJ5prJtbk1PPE-OBvpiAk=", "known_devices_secret_key": "WwemHQgPm1hpx41NYaVBQpRV7BAq0OMtfF3k4H72J7c=", "geo_statistics_secret_key": "gk5cBWIZ6k-mNHWnA33ZpsgXfgH50Wi_s3mUNI9GF0o=", + "logging_config": { + "loggers": { + "saml2": {"level": "INFO"}, + "xmlsec": {"level": "INFO"}, + "urllib3": {"level": "INFO"}, + "pymongo.serverSelection": {"level": "INFO"}, + "pymongo.connection": {"level": "INFO"}, + "pymongo.command": {"level": "INFO"}, + "pymongo.topology": {"level": "INFO"}, + } + }, } ) return config + @staticmethod + def get_cookie_val(cookie_name: str, cookies: str | None) -> str | None: + """ + Get the value of a cookie. + """ + if cookies is None: + return None + + _re = f".*{cookie_name}=(.+?);.*" + _sso_cookie_re = re.match(_re, cookies) + if _sso_cookie_re: + return _sso_cookie_re.groups()[0] + return None + def _try_login( self, saml2_client: Saml2Client | None = None, @@ -145,51 +182,64 @@ def _try_login( assertion_consumer_service_url: str | None = None, test_user: TestUser | None = None, sso_cookie_val: str | None = None, + session_cookie_val: str | None = None, mfa_credential: Credential | None = None, + other_device: bool = False, + device: CSRFTestClient | None = None, + login_ref: str | None = None, ) -> LoginResultAPI: """ Try logging in to the IdP. :return: Information about how far we got (reached LoginState) and the last response instance. """ - _saml2_client = saml2_client if saml2_client is not None else self.saml2_client - - session_id: str - info: Mapping[str, Any] - (session_id, info) = _saml2_client.prepare_for_authenticate( - entityid=self.idp_entity_id, - relay_state=self.relay_state, - binding=BINDING_HTTP_REDIRECT, - requested_authn_context=authn_context, - force_authn=force_authn, - assertion_consumer_service_url=assertion_consumer_service_url, - ) - self.pysaml2_oq.set(session_id, self.relay_state) - - path = self._extract_path_from_info(info) - user: TestUser = test_user if test_user is not None else self.default_user - with self.session_cookie_anon(self.browser) as browser: - # Send SAML request to SAML endpoint, expect a redirect to the login bundle back - resp = browser.get(path) - if resp.status_code != 302: - return LoginResultAPI(response=resp) + _saml2_client = saml2_client if saml2_client is not None else self.saml2_client - redirect_loc = self._extract_path_from_response(resp) - ref = redirect_loc.split("/")[-1] + if device is None: + device = self.browser + + with self.session_cookie_anon(device) as browser: + ref = login_ref + resp = None + if login_ref is None: + # create SAML request + session_id: str + info: Mapping[str, Any] + (session_id, info) = _saml2_client.prepare_for_authenticate( + entityid=self.idp_entity_id, + relay_state=self.relay_state, + binding=BINDING_HTTP_REDIRECT, + requested_authn_context=authn_context, + force_authn=force_authn, + assertion_consumer_service_url=assertion_consumer_service_url, + ) + self.pysaml2_oq.set(session_id, self.relay_state) + + # Send SAML request to SAML endpoint, expect a redirect to the login bundle back + path = self._extract_path_from_info(info) + resp = browser.get(path) + if resp.status_code != 302: + return LoginResultAPI(response=resp) + + redirect_loc = self._extract_path_from_response(resp) + ref = redirect_loc.split("/")[-1] result = LoginResultAPI(ref=ref, response=resp) - cookie_jar = {} if sso_cookie_val is not None: cookie_jar["idpauthn"] = sso_cookie_val + if session_cookie_val is not None: + cookie_jar["sessid"] = session_cookie_val + + assert ref is not None, "Login ref needs to be set by this point" while True: logger.info(f"Main API test loop, current state: {result}") # Call the 'next' endpoint - _next = self._call_next(ref) + _next = self._call_next(device, ref) if _next.error: result.error = _next.error @@ -207,11 +257,18 @@ def _try_login( return result if _action == IdPAction.PWAUTH: + if other_device: + result.other_device1_result = self._call_other_device1( + device, target="http://test.localhost/use_other_1", ref=ref + ) + return result if not user.eppn or not user.password: logger.error(f"Can't login without username and password, aborting with result {result}") return result - result.pwauth_result = self._call_pwauth(_next.payload["target"], ref, user.eppn, user.password) + result.pwauth_result = self._call_pwauth( + device, _next.payload["target"], ref, user.eppn, user.password + ) result.sso_cookie_val = result.pwauth_result.sso_cookie_val cookie_jar.update(result.pwauth_result.cookies) @@ -228,19 +285,19 @@ def _try_login( f"No FidoCredential found for user {_user.eppn}, aborting with result {result}" ) - result.mfa_result = self._call_mfa(_next.payload["target"], ref, mfa_credential) + result.mfa_result = self._call_mfa(device, _next.payload["target"], ref, mfa_credential) if _action == IdPAction.TOU: result.tou_result = self._call_tou( - _next.payload["target"], ref, user_accepts=self.app.conf.tou_version + device, _next.payload["target"], ref, user_accepts=self.app.conf.tou_version ) if _action == IdPAction.FINISHED: result.finished_result = FinishedResultAPI(payload=_next.payload) return result - def _call_next(self, ref: str) -> NextResult: - with self.session_cookie_anon(self.browser) as client: + def _call_next(self, device: CSRFTestClient, ref: str) -> NextResult: + with self.session_cookie_anon(device) as client: with self.app.test_request_context(): with client.session_transaction() as sess: data = {"ref": ref, "csrf_token": sess.get_csrf_token()} @@ -264,8 +321,8 @@ def _call_next(self, ref: str) -> NextResult: return NextResult(payload={}, error=_error) return NextResult(payload=self.get_response_payload(response)) - def _call_pwauth(self, target: str, ref: str, username: str, password: str) -> PwAuthResult: - with self.session_cookie_anon(self.browser) as client: + def _call_pwauth(self, device: CSRFTestClient, target: str, ref: str, username: str, password: str) -> PwAuthResult: + with self.session_cookie_anon(device) as client: with self.app.test_request_context(): with client.session_transaction() as sess: data = {"ref": ref, "username": username, "password": password, "csrf_token": sess.get_csrf_token()} @@ -277,18 +334,17 @@ def _call_pwauth(self, target: str, ref: str, username: str, password: str) -> P return result # Save the SSO cookie value - _re = f".*{self.app.conf.sso_cookie.key}=(.+?);.*" - _sso_cookie_re = re.match(_re, cookies) - if _sso_cookie_re: - result.sso_cookie_val = SSOSessionId(_sso_cookie_re.groups()[0]) + sso_cookie_val = self.get_cookie_val(cookie_name=self.app.conf.sso_cookie.key, cookies=cookies) + if sso_cookie_val: + result.sso_cookie_val = SSOSessionId(sso_cookie_val) if result.sso_cookie_val: result.cookies = {self.app.conf.sso_cookie.key: result.sso_cookie_val} return result - def _call_tou(self, target: str, ref: str, user_accepts: str | None) -> TouResult: - with self.session_cookie_anon(self.browser) as client: + def _call_tou(self, device: CSRFTestClient, target: str, ref: str, user_accepts: str | None) -> TouResult: + with self.session_cookie_anon(device) as client: with self.app.test_request_context(): with client.session_transaction() as sess: data = {"ref": ref, "csrf_token": sess.get_csrf_token()} @@ -302,12 +358,18 @@ def _call_tou(self, target: str, ref: str, user_accepts: str | None) -> TouResul @patch("eduid.webapp.idp.views.mfa_auth._check_webauthn") @patch("eduid.webapp.common.authn.fido_tokens.start_token_verification") def _call_mfa( - self, target: str, ref: str, mfa_credential: Credential, mock_stv: MagicMock, mock_cw: MagicMock + self, + device: CSRFTestClient, + target: str, + ref: str, + mfa_credential: Credential, + mock_stv: MagicMock, + mock_cw: MagicMock, ) -> MfaResult: mock_stv.return_value = WebauthnChallenge(webauthn_options="{'mock_webautn_options': 'mock_webauthn_options'}") mock_cw.return_value = None # first call to mfa endpoint returns a challenge - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie_anon(device) as client: with self.app.test_request_context(): with client.session_transaction() as sess: data = {"ref": ref, "csrf_token": sess.get_csrf_token()} @@ -326,7 +388,7 @@ def _call_mfa( credential=mfa_credential, authn_data=AuthnData(cred_id=mfa_credential.key, timestamp=utc_now()) ) # second call to mfa endpoint returns a result - with self.session_cookie_anon(self.browser) as client: + with self.session_cookie_anon(device) as client: with self.app.test_request_context(): with client.session_transaction() as sess: data = {"ref": ref, "csrf_token": sess.get_csrf_token()} @@ -335,6 +397,68 @@ def _call_mfa( result = MfaResult(payload=self.get_response_payload(response)) return result + def _call_other_device1( + self, device: CSRFTestClient, target: str, ref: str, response_code: str | None = None + ) -> OtherDevice1Result: + with self.session_cookie_anon(device) as client: + with self.app.test_request_context(): + with client.session_transaction() as sess: + data = {"ref": ref, "csrf_token": sess.get_csrf_token()} + if response_code is not None: + data["action"] = "SUBMIT_CODE" + data["response_code"] = response_code + response = client.post(target, data=json.dumps(data), content_type=self.content_type_json) + logger.debug(f"Other device1 endpoint returned:\n{json.dumps(response.json, indent=4)}") + result = OtherDevice1Result(payload=self.get_response_payload(response)) + session_cookie_val = self.get_cookie_val( + cookie_name=self.app.conf.flask.session_cookie_name, cookies=response.headers.get("Set-Cookie") + ) + result.cookies = {self.app.conf.flask.session_cookie_name: session_cookie_val} + if not result.cookies: + return result + + # Save the SSO cookie value + sso_cookie_val = self.get_cookie_val( + cookie_name=self.app.conf.sso_cookie.key, cookies=response.headers.get("Set-Cookie") + ) + if sso_cookie_val: + result.sso_cookie_val = SSOSessionId(sso_cookie_val) + + if result.sso_cookie_val: + result.cookies.update({self.app.conf.sso_cookie.key: result.sso_cookie_val}) + + return result + + def _call_other_device2( + self, + device: CSRFTestClient, + target: str, + state_id: str, + ) -> OtherDevice2Result: + with self.session_cookie_anon(device) as client: + with self.app.test_request_context(): + with client.session_transaction() as sess: + data = {"state_id": state_id, "csrf_token": sess.get_csrf_token()} + response = client.post(target, data=json.dumps(data), content_type=self.content_type_json) + logger.debug(f"Other device2 endpoint returned:\n{json.dumps(response.json, indent=4)}") + result = OtherDevice2Result(payload=self.get_response_payload(response)) + session_cookie_val = self.get_cookie_val( + cookie_name=self.app.conf.flask.session_cookie_name, cookies=response.headers.get("Set-Cookie") + ) + result.cookies = {self.app.conf.flask.session_cookie_name: session_cookie_val} + + # Save the SSO cookie value + sso_cookie_val = self.get_cookie_val( + cookie_name=self.app.conf.sso_cookie.key, cookies=response.headers.get("Set-Cookie") + ) + if sso_cookie_val: + result.sso_cookie_val = SSOSessionId(sso_cookie_val) + + if result.sso_cookie_val: + result.cookies.update({self.app.conf.sso_cookie.key: result.sso_cookie_val}) + + return result + def _extract_path_from_response(self, response: TestResponse) -> str: return self._extract_path_from_info({"headers": response.headers}) diff --git a/src/eduid/webapp/idp/tests/test_login.py b/src/eduid/webapp/idp/tests/test_login.py index 1c495b3b3..78607d41f 100644 --- a/src/eduid/webapp/idp/tests/test_login.py +++ b/src/eduid/webapp/idp/tests/test_login.py @@ -1,7 +1,7 @@ import logging import os from datetime import datetime, timedelta -from typing import Any +from typing import Any, cast from unittest.mock import MagicMock, patch from bson import ObjectId @@ -17,8 +17,10 @@ from eduid.userdb.maccapi.userdb import ManagedAccount from eduid.userdb.mail import MailAddressList from eduid.vccs.client import VCCSClient +from eduid.webapp.common.api.testing import CSRFTestClient from eduid.webapp.common.authn.utils import get_saml2_config from eduid.webapp.idp.helpers import IdPAction, IdPMsg +from eduid.webapp.idp.other_device.data import OtherDeviceState from eduid.webapp.idp.tests.test_api import FinishedResultAPI, IdPAPITests, PwAuthResult, TestUser from eduid.workers.am import AmCelerySingleton @@ -274,6 +276,125 @@ def test_ForceAuthn_with_existing_SSO_session(self) -> None: # Make sure the second response isn't referring to the first login request assert authn_response.in_response_to != authn_response2.in_response_to + def test_login_other_device(self) -> None: + # pre-accept ToU for this test + self.add_test_user_tou() + + # initiate other device login flow + device_1_result1 = self._try_login(other_device=True) + assert device_1_result1.ref is not None + assert device_1_result1.other_device1_result is not None + + # "read" qr code and start device 2 flow + state_id = device_1_result1.other_device1_result.payload.get("state_id") + assert state_id is not None + qr_url = device_1_result1.other_device1_result.payload.get("qr_url") + assert qr_url is not None + assert qr_url.endswith(state_id) is True + + device2 = cast(CSRFTestClient, self.app.test_client()) + device_2_result1 = self._call_other_device2( + device=device2, target="http://test.localhost/use_other_2", state_id=state_id + ) + + assert device_2_result1.payload.get("state") == OtherDeviceState.IN_PROGRESS + login_ref = device_2_result1.payload.get("login_ref") + assert login_ref is not None + + # login in with device 2 + # Patch the VCCSClient, so we do not need a vccs server + with patch.object(VCCSClient, "authenticate") as mock_vccs: + mock_vccs.return_value = True + device_2_result2 = self._try_login(device=device2, login_ref=login_ref) + + self._check_login_result( + result=device_2_result2, + visit_order=[IdPAction.PWAUTH, IdPAction.FINISHED], + ) + + # after login with device 2, retrieve the response code + device_2_result3 = self._call_other_device2( + device=device2, + target="http://test.localhost/use_other_2", + state_id=state_id, + ) + assert device_2_result3.payload.get("state") == OtherDeviceState.AUTHENTICATED.value + response_code = device_2_result3.payload.get("response_code") + assert response_code is not None + + # input the response code with device 1 + device_1_result2 = self._call_other_device1( + device=self.browser, + target="http://test.localhost/use_other_1", + ref=device_1_result1.ref, + response_code=response_code, + ) + assert device_1_result2.payload.get("state") == OtherDeviceState.FINISHED.value + + def test_login_other_device_with_accr(self) -> None: + # pre-accept ToU for this test + self.add_test_user_tou() + + # add security key to user + self.add_test_user_security_key() + + # initiate other device login flow + device_1_result1 = self._try_login( + other_device=True, + authn_context={ + "authn_context_class_ref": [EduidAuthnContextClass.REFEDS_MFA.value], + "comparison": "exact", + }, + ) + assert device_1_result1.ref is not None + assert device_1_result1.other_device1_result is not None + + # "read" qr code and start device 2 flow + state_id = device_1_result1.other_device1_result.payload.get("state_id") + assert state_id is not None + qr_url = device_1_result1.other_device1_result.payload.get("qr_url") + assert qr_url is not None + assert qr_url.endswith(state_id) is True + + device2 = cast(CSRFTestClient, self.app.test_client()) + device_2_result1 = self._call_other_device2( + device=device2, target="http://test.localhost/use_other_2", state_id=state_id + ) + + assert device_2_result1.payload.get("state") == OtherDeviceState.IN_PROGRESS + login_ref = device_2_result1.payload.get("login_ref") + assert login_ref is not None + + # login in with device 2 + # Patch the VCCSClient, so we do not need a vccs server + with patch.object(VCCSClient, "authenticate") as mock_vccs: + mock_vccs.return_value = True + device_2_result2 = self._try_login(device=device2, login_ref=login_ref) + + self._check_login_result( + result=device_2_result2, + visit_order=[IdPAction.PWAUTH, IdPAction.MFA, IdPAction.FINISHED], + ) + + # after login with device 2, retrieve the response code + device_2_result3 = self._call_other_device2( + device=device2, + target="http://test.localhost/use_other_2", + state_id=state_id, + ) + assert device_2_result3.payload.get("state") == OtherDeviceState.AUTHENTICATED.value + response_code = device_2_result3.payload.get("response_code") + assert response_code is not None + + # input the response code with device 1 + device_1_result2 = self._call_other_device1( + device=self.browser, + target="http://test.localhost/use_other_1", + ref=device_1_result1.ref, + response_code=response_code, + ) + assert device_1_result2.payload.get("state") == OtherDeviceState.FINISHED.value + def test_terminated_user(self) -> None: user = self.amdb.get_user_by_eppn(self.test_user.eppn) user.terminated = datetime.fromisoformat("2020-02-25T15:52:59.745")