Skip to content

Commit

Permalink
feat: make jwt decode fucntion generic
Browse files Browse the repository at this point in the history
  • Loading branch information
mumarkhan999 committed Jun 19, 2023
1 parent 69ee267 commit 8ef7018
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions edx_rest_framework_extensions/auth/jwt/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token):
# DEPR: Symmetric JWTs: https://github.com/openedx/public-engineering/issues/83

# Use add_symmetric_keys=False to only include asymmetric keys at first
key_set = _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=False)
key_set = get_signing_jwk_key_set(jwt_issuer['SECRET_KEY'], add_symmetric_keys=False)
# .. custom_attribute_name: jwt_auth_verify_asymmetric_keys_count
# .. custom_attribute_description: Number of JWT verification keys in use for this
# verification. Should be same as number of asymmetric public keys. This is
Expand All @@ -203,7 +203,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token):
set_custom_attribute('jwt_auth_verify_asymmetric_keys_count', len(key_set))

try:
_verify_jwk_signature_using_keyset(token, key_set, jwt_issuer)
verify_jwk_signature_using_keyset(token, key_set, aud=jwt_issuer['AUDIENCE'])
# .. custom_attribute_name: jwt_auth_asymmetric_verified
# .. custom_attribute_description: Whether the JWT was successfully verified
# using an asymmetric key.
Expand All @@ -218,7 +218,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token):
# the asymmetric keys here is redundant and unnecessary, but this code is temporary and
# will be simplified once symmetric keys have been fully retired.

key_set = _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=decode_symmetric_token)
key_set = get_signing_jwk_key_set(jwt_issuer['SECRET_KEY'], add_symmetric_keys=decode_symmetric_token)
# .. custom_attribute_name: jwt_auth_verify_all_keys_count
# .. custom_attribute_description: Number of JWT verification keys in use for this
# verification. Should be same as number of asymmetric public keys, plus one if
Expand All @@ -228,7 +228,7 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token):
set_custom_attribute('jwt_auth_verify_all_keys_count', len(key_set))

try:
_verify_jwk_signature_using_keyset(token, key_set, jwt_issuer)
verify_jwk_signature_using_keyset(token, key_set, aud=jwt_issuer['AUDIENCE'])
# .. custom_attribute_name: jwt_auth_symmetric_verified
# .. custom_attribute_description: Whether the JWT was successfully verified
# using a symmetric key.
Expand All @@ -248,7 +248,16 @@ def _verify_jwt_signature(token, jwt_issuer, decode_symmetric_token):
raise jwt.InvalidTokenError(exc_info[2]) from token_error


def _verify_jwk_signature_using_keyset(token, key_set, jwt_issuer):
def verify_jwk_signature_using_keyset(token, key_set, aud=None, iss=None, verify_signature=True, verify_exp=True):

options = {
'verify_signature': verify_signature,
'verify_exp': verify_exp,
'verify_aud': bool(aud),
'verify_iss': bool(iss)
}
data = None

for i in range(0, len(key_set)):
try:
algorithms = None
Expand All @@ -257,16 +266,19 @@ def _verify_jwk_signature_using_keyset(token, key_set, jwt_issuer):
elif key_set[i].key_type == 'oct':
algorithms = ['HS256',]

_ = jwt.decode(
data = jwt.decode(
token,
key=key_set[i].key,
algorithms=algorithms,
audience=jwt_issuer['AUDIENCE'],
issuer=iss,
audience=aud,
options=options
)
break
except Exception: # pylint: disable=broad-except
if i == len(key_set) - 1:
raise
return data


def _decode_and_verify_token(token, jwt_issuer):
Expand Down Expand Up @@ -315,21 +327,21 @@ def _decode_and_verify_token(token, jwt_issuer):
return decoded_token


def _get_signing_jwk_key_set(jwt_issuer, add_symmetric_keys=True):
def get_signing_jwk_key_set(secret_key, add_symmetric_keys=True, add_asymmetric_keys=True):
"""
Returns a JWK Keyset containing all active keys that are configured
for verifying signatures.
"""
key_set = []

# asymmetric keys
signing_jwk_set = settings.JWT_AUTH.get('JWT_PUBLIC_SIGNING_JWK_SET')
if signing_jwk_set:
if add_asymmetric_keys:
# asymmetric keys
signing_jwk_set = settings.JWT_AUTH.get('JWT_PUBLIC_SIGNING_JWK_SET')
key_set.extend(PyJWKSet.from_json(signing_jwk_set).keys)

if add_symmetric_keys:
# symmetric key
encoded_secret_key = base64url_encode(jwt_issuer['SECRET_KEY'].encode('utf-8'))
encoded_secret_key = base64url_encode(secret_key.encode('utf-8'))
key_set.append(PyJWK({'k': encoded_secret_key, 'kty': 'oct'}))

return key_set

0 comments on commit 8ef7018

Please sign in to comment.