diff --git a/edx_rest_framework_extensions/auth/jwt/decoder.py b/edx_rest_framework_extensions/auth/jwt/decoder.py index 8c57fe13..3ff7876b 100644 --- a/edx_rest_framework_extensions/auth/jwt/decoder.py +++ b/edx_rest_framework_extensions/auth/jwt/decoder.py @@ -194,7 +194,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token): # DEPR: Symmetric JWTs: https://github.com/openedx/public-engineering/issues/83 # Use add_symmetric_keys=False to only include asymmetric keys at first - key_set = _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=False) + key_set = get_signing_jwk_key_set(jwt_issuer['SECRET_KEY'], add_symmetric_keys=False) # .. custom_attribute_name: jwt_auth_verify_asymmetric_keys_count # .. custom_attribute_description: Number of JWT verification keys in use for this # verification. Should be same as number of asymmetric public keys. This is @@ -203,7 +203,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token): set_custom_attribute('jwt_auth_verify_asymmetric_keys_count', len(key_set)) try: - _verify_jwk_signature_using_keyset(token, key_set, jwt_issuer) + verify_jwk_signature_using_keyset(token, key_set, aud=jwt_issuer['AUDIENCE']) # .. custom_attribute_name: jwt_auth_asymmetric_verified # .. custom_attribute_description: Whether the JWT was successfully verified # using an asymmetric key. @@ -218,7 +218,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token): # the asymmetric keys here is redundant and unnecessary, but this code is temporary and # will be simplified once symmetric keys have been fully retired. - key_set = _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=decode_symmetric_token) + key_set = get_signing_jwk_key_set(jwt_issuer['SECRET_KEY'], add_symmetric_keys=decode_symmetric_token) # .. custom_attribute_name: jwt_auth_verify_all_keys_count # .. custom_attribute_description: Number of JWT verification keys in use for this # verification. Should be same as number of asymmetric public keys, plus one if @@ -228,7 +228,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token): set_custom_attribute('jwt_auth_verify_all_keys_count', len(key_set)) try: - _verify_jwk_signature_using_keyset(token, key_set, jwt_issuer) + verify_jwk_signature_using_keyset(token, key_set, aud=jwt_issuer['AUDIENCE']) # .. custom_attribute_name: jwt_auth_symmetric_verified # .. custom_attribute_description: Whether the JWT was successfully verified # using a symmetric key. @@ -248,7 +248,16 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token): raise jwt.InvalidTokenError(exc_info[2]) from token_error -def _verify_jwk_signature_using_keyset(token, key_set, jwt_issuer): +def verify_jwk_signature_using_keyset(token, key_set, aud=None, iss=None, verify_signature=True, verify_exp=True): + + options = { + 'verify_signature': verify_signature, + 'verify_exp': verify_exp, + 'verify_aud': bool(aud), + 'verify_iss': bool(iss) + } + data = None + for i in range(0, len(key_set)): try: algorithms = None @@ -257,16 +266,19 @@ def _verify_jwk_signature_using_keyset(token, key_set, jwt_issuer): elif key_set[i].key_type == 'oct': algorithms = ['HS256',] - _ = jwt.decode( + data = jwt.decode( token, key=key_set[i].key, algorithms=algorithms, - audience=jwt_issuer['AUDIENCE'], + issuer=iss, + audience=aud, + options=options ) break except Exception: # pylint: disable=broad-except if i == len(key_set) - 1: raise + return data def _decode_and_verify_token(token, jwt_issuer): @@ -315,21 +327,21 @@ def _decode_and_verify_token(token, jwt_issuer): return decoded_token -def _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=True): +def get_signing_jwk_key_set(secret_key, add_symmetric_keys=True, add_asymmetric_keys=True): """ Returns a JWK Keyset containing all active keys that are configured for verifying signatures. """ key_set = [] - # asymmetric keys - signing_jwk_set = settings.JWT_AUTH.get('JWT_PUBLIC_SIGNING_JWK_SET') - if signing_jwk_set: + if add_asymmetric_keys: + # asymmetric keys + signing_jwk_set = settings.JWT_AUTH.get('JWT_PUBLIC_SIGNING_JWK_SET') key_set.extend(PyJWKSet.from_json(signing_jwk_set).keys) if add_symmetric_keys: # symmetric key - encoded_secret_key = base64url_encode(jwt_issuer['SECRET_KEY'].encode('utf-8')) + encoded_secret_key = base64url_encode(secret_key.encode('utf-8')) key_set.append(PyJWK({'k': encoded_secret_key, 'kty': 'oct'})) return key_set