Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

da_revocation: function out generation of revocation set from the crl #36933

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 95 additions & 43 deletions credentials/generate-revocation-set.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,96 @@ def fetch_crl_from_url(url: str, timeout: int) -> x509.CertificateRevocationList
logging.error('Failed to fetch a valid CRL')


def generate_revocation_set_from_crl(crl_file: x509.CertificateRevocationList,
crl_signer_certificate: x509.Certificate,
certificate_authority_name_b64: str,
certificate_akid_hex: str,
crl_signer_delegator_cert: x509.Certificate) -> dict:
"""Generate a revocation set from a CRL file.

Args:
crl_file: The CRL object containing revoked certificates
crl_signer_certificate: The certificate object used to sign the CRL
certificate_authority_name_b64: Base64 encoded issuer name
certificate_akid_hex: Hex encoded Authority Key Identifier
crl_signer_delegator_cert: crl signer delegator certificate object

Returns:
dict: A dictionary containing the revocation set data with fields:
- type: "revocation_set"
- issuer_subject_key_id: Authority Key Identifier (hex)
- issuer_name: Issuer name (base64)
- revoked_serial_numbers: List of revoked serial numbers
- crl_signer_cert: CRL signer certificate (base64 DER)
- crl_signer_delegator: Optional delegator certificate (base64 DER)
"""
serialnumber_list = []

for revoked_cert in crl_file:
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
pass

serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
}

if crl_signer_delegator_cert:
entry["crl_signer_delegator"] = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8')

return entry


def get_certificate_authority_details(crl_signer_certificate: x509.Certificate,
crl_signer_delegator_cert: x509.Certificate,
paa_certificate_object: x509.Certificate,
is_paa: bool) -> tuple[str, str]:
"""Get certificate authority name and AKID based on certificate hierarchy.

Args:
crl_signer_certificate: The CRL signer certificate
crl_signer_delegator_cert: Optional delegator certificate
paa_certificate_object: Optional PAA certificate
is_paa: Whether this is a PAA certificate

Returns:
tuple[str, str]: (certificate_authority_name_b64, certificate_akid_hex)
"""
if is_paa and not is_self_signed_certificate(crl_signer_certificate):
cert_for_details = paa_certificate_object
logging.debug("Using PAA certificate for details")
elif crl_signer_delegator_cert:
cert_for_details = crl_signer_delegator_cert
logging.debug("Using CRL Signer Delegator certificate for details")
else:
cert_for_details = crl_signer_certificate
logging.debug("Using CRL Signer certificate for details")

certificate_authority_name_b64 = get_subject_b64(cert_for_details)
certificate_akid = get_skid(cert_for_details)
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")

return certificate_authority_name_b64, certificate_akid_hex


class DCLDClient:
'''
A client for interacting with DCLD using either the REST API or command line interface (CLI).
Expand Down Expand Up @@ -325,7 +415,7 @@ def get_issuer_cert(self, cert: x509.Certificate) -> Optional[x509.Certificate]:

return issuer_certificate_object

def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
def get_revocations_points_by_skid(self, issuer_subject_key_id: str) -> list[dict]:
'''
Get revocation points by subject key ID

Expand Down Expand Up @@ -364,7 +454,6 @@ def get_revocations_points_by_skid(self, issuer_subject_key_id) -> list[dict]:
help='Determines the verbosity of script output')
def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool, use_test_net_http: bool, output: str, log_level: str):
"""Tool to construct revocation set from DCL"""

logging.basicConfig(
level=log_level,
format='%(asctime)s %(name)s %(levelname)-7s %(message)s',
Expand Down Expand Up @@ -467,56 +556,19 @@ def main(use_main_net_dcld: str, use_test_net_dcld: str, use_main_net_http: bool
# TODO: 8. Validate CRL as per Section 6.3 of RFC 5280

# 9. decide on certificate authority name and AKID
if revocation_point["isPAA"] and not is_self_signed_certificate(crl_signer_certificate):
certificate_authority_name_b64 = get_subject_b64(paa_certificate_object)
certificate_akid = get_skid(paa_certificate_object)
elif crl_signer_delegator_cert:
certificate_authority_name_b64 = get_subject_b64(crl_signer_delegator_cert)
certificate_akid = get_skid(crl_signer_delegator_cert)
else:
certificate_authority_name_b64 = get_subject_b64(crl_signer_certificate)
certificate_akid = get_skid(crl_signer_certificate)
certificate_authority_name_b64, certificate_akid_hex = get_certificate_authority_details(
crl_signer_certificate, crl_signer_delegator_cert, paa_certificate_object, revocation_point["isPAA"])

# validate issuer skid matchces with the one in revocation points
certificate_akid_hex = ''.join('{:02X}'.format(x) for x in certificate_akid)

logging.debug(f"Certificate Authority Name: {certificate_authority_name_b64}")
logging.debug(f"Certificate AKID: {certificate_akid_hex}")
logging.debug(f"revocation_point['issuerSubjectKeyID']: {revocation_point['issuerSubjectKeyID']}")

if revocation_point["issuerSubjectKeyID"] != certificate_akid_hex:
logging.warning("CRL Issuer Subject Key ID is not CRL Signer Subject Key ID, continue...")
continue

serialnumber_list = []
# 10. Iterate through the Revoked Certificates List
for revoked_cert in crl_file:
try:
revoked_cert_issuer = revoked_cert.extensions.get_extension_for_oid(
x509.CRLEntryExtensionOID.CERTIFICATE_ISSUER).value.get_values_for_type(x509.DirectoryName).value

if revoked_cert_issuer is not None:
# check if this really are the same thing
if revoked_cert_issuer != certificate_authority_name_b64:
logging.warning("CRL Issuer is not CRL File Issuer, continue...")
continue
except Exception:
logging.warning("certificateIssuer entry extension not found in CRL")
pass

serialnumber_list.append(bytes(str('{:02X}'.format(revoked_cert.serial_number)), 'utf-8').decode('utf-8'))

entry = {
"type": "revocation_set",
"issuer_subject_key_id": certificate_akid_hex,
"issuer_name": certificate_authority_name_b64,
"revoked_serial_numbers": serialnumber_list,
"crl_signer_cert": base64.b64encode(crl_signer_certificate.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
}

if crl_signer_delegator_cert:
entry["crl_signer_delegator"] = base64.b64encode(
crl_signer_delegator_cert.public_bytes(serialization.Encoding.DER)).decode('utf-8'),
entry = generate_revocation_set_from_crl(crl_file, crl_signer_certificate,
certificate_authority_name_b64, certificate_akid_hex, crl_signer_delegator_cert)
logging.debug(f"Entry to append: {entry}")
revocation_set.append(entry)

Expand Down
Loading