Skip to content

Commit

Permalink
Merge pull request #712 from SUNET/lundberg_other_device_fix
Browse files Browse the repository at this point in the history
other device tests
  • Loading branch information
helylle authored Nov 7, 2024
2 parents 2d5b9cc + 7c048e7 commit f9fff6d
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 46 deletions.
2 changes: 2 additions & 0 deletions src/eduid/webapp/common/api/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@
"xmlsec": {"level": "INFO"},
"urllib3": {"level": "INFO"},
"pymongo.serverSelection": {"level": "INFO"},
"pymongo.connection": {"level": "INFO"},
"pymongo.command": {"level": "INFO"},
"pymongo.topology": {"level": "INFO"},
"eduid.webapp.common.session": {"level": "INFO"},
"eduid.userdb.userdb.extra_debug": {"level": "INFO"},
"eduid.userdb.db.extra_debug": {"level": "INFO"},
Expand Down
2 changes: 1 addition & 1 deletion src/eduid/webapp/idp/login_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def request_id(self) -> str | None:

@property
def authn_contexts(self) -> list[str]:
if not self.other_device_req.device1.authn_context:
if self.other_device_req.device1.authn_context is None:
return []
return [str(self.other_device_req.device1.authn_context)]

Expand Down
212 changes: 168 additions & 44 deletions src/eduid/webapp/idp/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from eduid.userdb.mail import MailAddress
from eduid.userdb.testing import SetupConfig
from eduid.userdb.user import User
from eduid.webapp.common.api.testing import EduidAPITestCase
from eduid.webapp.common.api.testing import CSRFTestClient, EduidAPITestCase
from eduid.webapp.common.authn.cache import IdentityCache, OutstandingQueriesCache, StateCache
from eduid.webapp.common.authn.utils import get_saml2_config
from eduid.webapp.common.session.namespaces import AuthnRequestRef, PySAML2Dicts
Expand All @@ -35,7 +35,6 @@

__author__ = "ft"


logger = logging.getLogger(__name__)


Expand All @@ -55,6 +54,16 @@ class PwAuthResult(GenericResult):
cookies: dict[str, Any] = field(default_factory=dict)


@dataclass
class OtherDevice1Result(PwAuthResult):
pass


@dataclass
class OtherDevice2Result(PwAuthResult):
pass


@dataclass
class TouResult(GenericResult):
pass
Expand All @@ -78,14 +87,15 @@ class TestUser:

@dataclass
class LoginResultAPI:
response: TestResponse
response: TestResponse | None = None
ref: str | None = None
sso_cookie_val: SSOSessionId | None = None
visit_count: dict[str, int] = field(default_factory=dict)
visit_order: list[IdPAction] = field(default_factory=list)
pwauth_result: PwAuthResult | None = None
tou_result: TouResult | None = None
mfa_result: MfaResult | None = None
other_device1_result: OtherDevice1Result | None = None
finished_result: FinishedResultAPI | None = None
error: dict[str, Any] | None = None

Expand Down Expand Up @@ -130,13 +140,40 @@ def update_config(self, config: dict[str, Any]) -> dict[str, Any]:
"login_bundle_url": "https://idp.eduid.docker/test-bundle",
"tou_version": "2016-v1",
"default_eppn_scope": "test.scope",
"allow_other_device_logins": True,
"other_device_url": "http://test.localhost/login/other",
"other_device_secret_key": "lx0sg0g21QUkiu9JAPfhx4hJ5prJtbk1PPE-OBvpiAk=",
"known_devices_secret_key": "WwemHQgPm1hpx41NYaVBQpRV7BAq0OMtfF3k4H72J7c=",
"geo_statistics_secret_key": "gk5cBWIZ6k-mNHWnA33ZpsgXfgH50Wi_s3mUNI9GF0o=",
"logging_config": {
"loggers": {
"saml2": {"level": "INFO"},
"xmlsec": {"level": "INFO"},
"urllib3": {"level": "INFO"},
"pymongo.serverSelection": {"level": "INFO"},
"pymongo.connection": {"level": "INFO"},
"pymongo.command": {"level": "INFO"},
"pymongo.topology": {"level": "INFO"},
}
},
}
)
return config

@staticmethod
def get_cookie_val(cookie_name: str, cookies: str | None) -> str | None:
"""
Get the value of a cookie.
"""
if cookies is None:
return None

_re = f".*{cookie_name}=(.+?);.*"
_sso_cookie_re = re.match(_re, cookies)
if _sso_cookie_re:
return _sso_cookie_re.groups()[0]
return None

def _try_login(
self,
saml2_client: Saml2Client | None = None,
Expand All @@ -145,51 +182,64 @@ def _try_login(
assertion_consumer_service_url: str | None = None,
test_user: TestUser | None = None,
sso_cookie_val: str | None = None,
session_cookie_val: str | None = None,
mfa_credential: Credential | None = None,
other_device: bool = False,
device: CSRFTestClient | None = None,
login_ref: str | None = None,
) -> LoginResultAPI:
"""
Try logging in to the IdP.
:return: Information about how far we got (reached LoginState) and the last response instance.
"""
_saml2_client = saml2_client if saml2_client is not None else self.saml2_client

session_id: str
info: Mapping[str, Any]
(session_id, info) = _saml2_client.prepare_for_authenticate(
entityid=self.idp_entity_id,
relay_state=self.relay_state,
binding=BINDING_HTTP_REDIRECT,
requested_authn_context=authn_context,
force_authn=force_authn,
assertion_consumer_service_url=assertion_consumer_service_url,
)
self.pysaml2_oq.set(session_id, self.relay_state)

path = self._extract_path_from_info(info)

user: TestUser = test_user if test_user is not None else self.default_user

with self.session_cookie_anon(self.browser) as browser:
# Send SAML request to SAML endpoint, expect a redirect to the login bundle back
resp = browser.get(path)
if resp.status_code != 302:
return LoginResultAPI(response=resp)
_saml2_client = saml2_client if saml2_client is not None else self.saml2_client

redirect_loc = self._extract_path_from_response(resp)
ref = redirect_loc.split("/")[-1]
if device is None:
device = self.browser

with self.session_cookie_anon(device) as browser:
ref = login_ref
resp = None
if login_ref is None:
# create SAML request
session_id: str
info: Mapping[str, Any]
(session_id, info) = _saml2_client.prepare_for_authenticate(
entityid=self.idp_entity_id,
relay_state=self.relay_state,
binding=BINDING_HTTP_REDIRECT,
requested_authn_context=authn_context,
force_authn=force_authn,
assertion_consumer_service_url=assertion_consumer_service_url,
)
self.pysaml2_oq.set(session_id, self.relay_state)

# Send SAML request to SAML endpoint, expect a redirect to the login bundle back
path = self._extract_path_from_info(info)
resp = browser.get(path)
if resp.status_code != 302:
return LoginResultAPI(response=resp)

redirect_loc = self._extract_path_from_response(resp)
ref = redirect_loc.split("/")[-1]

result = LoginResultAPI(ref=ref, response=resp)

cookie_jar = {}
if sso_cookie_val is not None:
cookie_jar["idpauthn"] = sso_cookie_val
if session_cookie_val is not None:
cookie_jar["sessid"] = session_cookie_val

assert ref is not None, "Login ref needs to be set by this point"

while True:
logger.info(f"Main API test loop, current state: {result}")

# Call the 'next' endpoint
_next = self._call_next(ref)
_next = self._call_next(device, ref)

if _next.error:
result.error = _next.error
Expand All @@ -207,11 +257,18 @@ def _try_login(
return result

if _action == IdPAction.PWAUTH:
if other_device:
result.other_device1_result = self._call_other_device1(
device, target="http://test.localhost/use_other_1", ref=ref
)
return result
if not user.eppn or not user.password:
logger.error(f"Can't login without username and password, aborting with result {result}")
return result

result.pwauth_result = self._call_pwauth(_next.payload["target"], ref, user.eppn, user.password)
result.pwauth_result = self._call_pwauth(
device, _next.payload["target"], ref, user.eppn, user.password
)
result.sso_cookie_val = result.pwauth_result.sso_cookie_val
cookie_jar.update(result.pwauth_result.cookies)

Expand All @@ -228,19 +285,19 @@ def _try_login(
f"No FidoCredential found for user {_user.eppn}, aborting with result {result}"
)

result.mfa_result = self._call_mfa(_next.payload["target"], ref, mfa_credential)
result.mfa_result = self._call_mfa(device, _next.payload["target"], ref, mfa_credential)

if _action == IdPAction.TOU:
result.tou_result = self._call_tou(
_next.payload["target"], ref, user_accepts=self.app.conf.tou_version
device, _next.payload["target"], ref, user_accepts=self.app.conf.tou_version
)

if _action == IdPAction.FINISHED:
result.finished_result = FinishedResultAPI(payload=_next.payload)
return result

def _call_next(self, ref: str) -> NextResult:
with self.session_cookie_anon(self.browser) as client:
def _call_next(self, device: CSRFTestClient, ref: str) -> NextResult:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"ref": ref, "csrf_token": sess.get_csrf_token()}
Expand All @@ -264,8 +321,8 @@ def _call_next(self, ref: str) -> NextResult:
return NextResult(payload={}, error=_error)
return NextResult(payload=self.get_response_payload(response))

def _call_pwauth(self, target: str, ref: str, username: str, password: str) -> PwAuthResult:
with self.session_cookie_anon(self.browser) as client:
def _call_pwauth(self, device: CSRFTestClient, target: str, ref: str, username: str, password: str) -> PwAuthResult:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"ref": ref, "username": username, "password": password, "csrf_token": sess.get_csrf_token()}
Expand All @@ -277,18 +334,17 @@ def _call_pwauth(self, target: str, ref: str, username: str, password: str) -> P
return result

# Save the SSO cookie value
_re = f".*{self.app.conf.sso_cookie.key}=(.+?);.*"
_sso_cookie_re = re.match(_re, cookies)
if _sso_cookie_re:
result.sso_cookie_val = SSOSessionId(_sso_cookie_re.groups()[0])
sso_cookie_val = self.get_cookie_val(cookie_name=self.app.conf.sso_cookie.key, cookies=cookies)
if sso_cookie_val:
result.sso_cookie_val = SSOSessionId(sso_cookie_val)

if result.sso_cookie_val:
result.cookies = {self.app.conf.sso_cookie.key: result.sso_cookie_val}

return result

def _call_tou(self, target: str, ref: str, user_accepts: str | None) -> TouResult:
with self.session_cookie_anon(self.browser) as client:
def _call_tou(self, device: CSRFTestClient, target: str, ref: str, user_accepts: str | None) -> TouResult:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"ref": ref, "csrf_token": sess.get_csrf_token()}
Expand All @@ -302,12 +358,18 @@ def _call_tou(self, target: str, ref: str, user_accepts: str | None) -> TouResul
@patch("eduid.webapp.idp.views.mfa_auth._check_webauthn")
@patch("eduid.webapp.common.authn.fido_tokens.start_token_verification")
def _call_mfa(
self, target: str, ref: str, mfa_credential: Credential, mock_stv: MagicMock, mock_cw: MagicMock
self,
device: CSRFTestClient,
target: str,
ref: str,
mfa_credential: Credential,
mock_stv: MagicMock,
mock_cw: MagicMock,
) -> MfaResult:
mock_stv.return_value = WebauthnChallenge(webauthn_options="{'mock_webautn_options': 'mock_webauthn_options'}")
mock_cw.return_value = None
# first call to mfa endpoint returns a challenge
with self.session_cookie_anon(self.browser) as client:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"ref": ref, "csrf_token": sess.get_csrf_token()}
Expand All @@ -326,7 +388,7 @@ def _call_mfa(
credential=mfa_credential, authn_data=AuthnData(cred_id=mfa_credential.key, timestamp=utc_now())
)
# second call to mfa endpoint returns a result
with self.session_cookie_anon(self.browser) as client:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"ref": ref, "csrf_token": sess.get_csrf_token()}
Expand All @@ -335,6 +397,68 @@ def _call_mfa(
result = MfaResult(payload=self.get_response_payload(response))
return result

def _call_other_device1(
self, device: CSRFTestClient, target: str, ref: str, response_code: str | None = None
) -> OtherDevice1Result:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"ref": ref, "csrf_token": sess.get_csrf_token()}
if response_code is not None:
data["action"] = "SUBMIT_CODE"
data["response_code"] = response_code
response = client.post(target, data=json.dumps(data), content_type=self.content_type_json)
logger.debug(f"Other device1 endpoint returned:\n{json.dumps(response.json, indent=4)}")
result = OtherDevice1Result(payload=self.get_response_payload(response))
session_cookie_val = self.get_cookie_val(
cookie_name=self.app.conf.flask.session_cookie_name, cookies=response.headers.get("Set-Cookie")
)
result.cookies = {self.app.conf.flask.session_cookie_name: session_cookie_val}
if not result.cookies:
return result

# Save the SSO cookie value
sso_cookie_val = self.get_cookie_val(
cookie_name=self.app.conf.sso_cookie.key, cookies=response.headers.get("Set-Cookie")
)
if sso_cookie_val:
result.sso_cookie_val = SSOSessionId(sso_cookie_val)

if result.sso_cookie_val:
result.cookies.update({self.app.conf.sso_cookie.key: result.sso_cookie_val})

return result

def _call_other_device2(
self,
device: CSRFTestClient,
target: str,
state_id: str,
) -> OtherDevice2Result:
with self.session_cookie_anon(device) as client:
with self.app.test_request_context():
with client.session_transaction() as sess:
data = {"state_id": state_id, "csrf_token": sess.get_csrf_token()}
response = client.post(target, data=json.dumps(data), content_type=self.content_type_json)
logger.debug(f"Other device2 endpoint returned:\n{json.dumps(response.json, indent=4)}")
result = OtherDevice2Result(payload=self.get_response_payload(response))
session_cookie_val = self.get_cookie_val(
cookie_name=self.app.conf.flask.session_cookie_name, cookies=response.headers.get("Set-Cookie")
)
result.cookies = {self.app.conf.flask.session_cookie_name: session_cookie_val}

# Save the SSO cookie value
sso_cookie_val = self.get_cookie_val(
cookie_name=self.app.conf.sso_cookie.key, cookies=response.headers.get("Set-Cookie")
)
if sso_cookie_val:
result.sso_cookie_val = SSOSessionId(sso_cookie_val)

if result.sso_cookie_val:
result.cookies.update({self.app.conf.sso_cookie.key: result.sso_cookie_val})

return result

def _extract_path_from_response(self, response: TestResponse) -> str:
return self._extract_path_from_info({"headers": response.headers})

Expand Down
Loading

0 comments on commit f9fff6d

Please sign in to comment.