diff --git a/README.md b/README.md index 2c506c3..e625aa0 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ while(True): cert_pem = ssl.collect(cert_id=result["sslId"], cert_format="x509CO") print(cert_pem) break - except Pending: + except PendingError: print("Certificate is still pending...sleeping for 60s") sleep(60) continue diff --git a/cert_manager/__init__.py b/cert_manager/__init__.py index 0242811..38d3037 100644 --- a/cert_manager/__init__.py +++ b/cert_manager/__init__.py @@ -1,6 +1,6 @@ """Initialize the cert_manager module.""" -from ._helpers import Pending +from ._helpers import PendingError from .acme import ACMEAccount from .admin import Admin from .client import Client @@ -11,4 +11,6 @@ from .smime import SMIME from .ssl import SSL -__all__ = ["ACMEAccount", "Admin", "Client", "Domain", "Organization", "Pending", "Person", "Report", "SMIME", "SSL"] +__all__ = [ + "ACMEAccount", "Admin", "Client", "Domain", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" +] diff --git a/cert_manager/_certificates.py b/cert_manager/_certificates.py index 96d5999..85e9be8 100644 --- a/cert_manager/_certificates.py +++ b/cert_manager/_certificates.py @@ -5,7 +5,7 @@ from requests.exceptions import HTTPError from ._endpoint import Endpoint -from ._helpers import CustomFieldsError, Pending +from ._helpers import CustomFieldsError, PendingError LOGGER = logging.getLogger(__name__) @@ -41,6 +41,7 @@ def __init__(self, client, endpoint, api_version="v1"): # Set to None initially. Will be filled in by methods later. self.__cert_types = None self.__custom_fields = None + self.reason_maxlen = 512 @property def types(self): @@ -111,7 +112,7 @@ def _validate_custom_fields(self, custom_fields): def collect(self, cert_id, cert_format): """Retrieve an existing certificate from the API. - This method will raise a Pending exception if the certificate is still in a pending state. + This method will raise a PendingError exception if the certificate is still in a pending state. :param int cert_id: The certificate ID :param str cert_format: The format in which to retreive the certificate. Allowed values: *self.valid_formats* @@ -125,7 +126,7 @@ def collect(self, cert_id, cert_format): try: result = self._client.get(url) except HTTPError as exc: - raise Pending(f"certificate {cert_id} still in 'pending' state") from exc + raise PendingError(f"certificate {cert_id} still in 'pending' state") from exc # The certificate is ready for collection return result.content.decode(result.encoding) @@ -228,8 +229,8 @@ def revoke(self, cert_id, reason=""): url = self._url(f"/revoke/{cert_id}") # Sectigo has a 512 character limit on the "reason" message, so catch that here. - if (not reason) or (len(reason) > 511): - raise ValueError("Sectigo limit: reason must be > 0 character and < 512 characters") + if not reason or len(reason) >= self.reason_maxlen: + raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.reason_maxlen} characters") data = {"reason": reason} diff --git a/cert_manager/_helpers.py b/cert_manager/_helpers.py index fd6ccfc..cc059d0 100644 --- a/cert_manager/_helpers.py +++ b/cert_manager/_helpers.py @@ -35,23 +35,18 @@ def log_traffic(*args, **kwargs): # Check if the URL or headers exist in the parameters # Note: *self* will be the first argument, so actual arguments start after that. - url = kwargs.get("url", "") - if not url: - if len(args) > 1: - url = args[1] - headers = kwargs.get("headers", "") - if not headers: - if len(args) > 2: - headers = args[2] - data = kwargs.get("data", "") - if not data: - if len(args) > 3: - data = args[3] + params = ["url", "headers", "data"] + values = {} + for index, param in enumerate(params, start=1): + values[param] = kwargs.get(param, "") + if not values[param]: + if len(args) > index: + values[param] = args[index] # Print out before messages with URL and header data - traffic_logger.debug(f"Performing a {func_name} on url: {url}") - traffic_logger.debug(f"Extra request headers: {headers}") - traffic_logger.debug(f"Data: {data}") + traffic_logger.debug(f"Performing a {func_name} on url: {values['url']}") + traffic_logger.debug(f"Extra request headers: {values['headers']}") + traffic_logger.debug(f"Data: {values['data']}") # Run the wrapped function try: @@ -148,12 +143,12 @@ def decorator(*args, **kwargs): return decorator -class Pending(Exception): +class PendingError(Exception): """Serve as a generic Exception indicating a certificate is in a pending state.""" CODE = -183 -class Revoked(Exception): +class RevokedError(Exception): """Serve as a generic Exception indicating a certificate has been revoked.""" CODE = -192 diff --git a/cert_manager/smime.py b/cert_manager/smime.py index 1740795..27df9bd 100644 --- a/cert_manager/smime.py +++ b/cert_manager/smime.py @@ -4,7 +4,7 @@ from requests.exceptions import HTTPError from ._certificates import Certificates -from ._helpers import Pending, Revoked, paginate, version_hack +from ._helpers import PendingError, RevokedError, paginate, version_hack LOGGER = logging.getLogger(__name__) @@ -19,6 +19,7 @@ def __init__(self, client, api_version="v1"): :param string api_version: The API version to use; the default is "v1" """ super().__init__(client=client, endpoint="/smime", api_version=api_version) + self.reason_maxlen = 512 @paginate def list(self, **kwargs): @@ -118,7 +119,7 @@ def enroll(self, **kwargs): def collect(self, cert_id): """Retrieve an existing client certificate from the API. - This method will raise a Pending exception if the certificate is still in a pending state. + This method will raise a PendingError exception if the certificate is still in a pending state. :param int cert_id: The Certificate ID given on enroll success :return str: the string representing the certificate in the requested format @@ -131,10 +132,10 @@ def collect(self, cert_id): result = self._client.get(url) except HTTPError as exc: err_code = exc.response.json().get("code") - if err_code == Revoked.CODE: - raise Revoked(f"certificate {cert_id} in 'revoked' state") from exc - if err_code == Pending.CODE: - raise Pending(f"certificate {cert_id} still in 'pending' state") from exc + if err_code == RevokedError.CODE: + raise RevokedError(f"certificate {cert_id} in 'revoked' state") from exc + if err_code == PendingError.CODE: + raise PendingError(f"certificate {cert_id} still in 'pending' state") from exc raise exc # The certificate is ready for collection @@ -193,8 +194,8 @@ def revoke(self, cert_id, reason=""): raise ValueError("Argument 'cert_id' can't be None") # Sectigo has a 512 character limit on the "reason" message, so catch that here. - if (not reason) or (len(reason) > 511): - raise ValueError("Sectigo limit: reason must be > 0 character and < 512 characters") + if not reason or len(reason) >= self.reason_maxlen: + raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.reason_maxlen} characters") data = {"reason": reason} self._client.post(url, data=data) @@ -212,8 +213,8 @@ def revoke_by_email(self, email, reason=""): raise ValueError("Argument 'email' can't be empty or None") # Sectigo has a 512 character limit on the "reason" message, so catch that here. - if (not reason) or (len(reason) > 511): - raise ValueError("Sectigo limit: reason must be > 0 character and < 512 characters") + if not reason or len(reason) >= self.reason_maxlen: + raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.reason_maxlen} characters") data = {"email": email, "reason": reason} self._client.post(url, data=data) diff --git a/tests/test_certificates.py b/tests/test_certificates.py index 1b7035b..ab9f48c 100644 --- a/tests/test_certificates.py +++ b/tests/test_certificates.py @@ -9,7 +9,7 @@ from testtools import TestCase from cert_manager._certificates import Certificates -from cert_manager._helpers import Pending +from cert_manager._helpers import PendingError from .lib.testbase import ClientFixture @@ -49,10 +49,11 @@ def fake_csr(): @staticmethod def fake_cert(): """Build a fake certificate to use with tests.""" + numrows = 27 data = "-----BEGIN CERTIFICATE-----\n" for row in range(1, 33): char = chr(row + 64) - if row > 26: + if row >= numrows: char = chr(row + 70) data += f"{char * 64}\n" data += "-----END CERTIFICATE-----\n" @@ -254,12 +255,12 @@ def test_success(self): @responses.activate def test_pending(self): - """Raise a Pending exception if an error status code is returned.""" + """Raise a PendingError exception if an error status code is returned.""" # Setup the mocked response responses.add(responses.GET, self.test_url, body="", status=404) # Call the function, expecting an exception - self.assertRaises(Pending, self.certobj.collect, self.test_id, self.test_type) + self.assertRaises(PendingError, self.certobj.collect, self.test_id, self.test_type) # Verify all the query information self.assertEqual(len(responses.calls), 1) diff --git a/tests/test_smime.py b/tests/test_smime.py index 8149083..a06b5e0 100644 --- a/tests/test_smime.py +++ b/tests/test_smime.py @@ -9,7 +9,7 @@ from requests import HTTPError from testtools import TestCase -from cert_manager._helpers import Pending, Revoked +from cert_manager._helpers import PendingError, RevokedError from cert_manager.smime import SMIME from .lib.testbase import ClientFixture @@ -323,14 +323,14 @@ def test_no_cert_id(self): @responses.activate def test_pending(self): - """Raise a Pending exception if an Http error with the pending code in the body is returned.""" + """Raise a PendingError exception if an Http error with the pending code in the body is returned.""" # Setup the mocked response - body = json.dumps({"code": Pending.CODE, "description": "Certificate is not collectable."}) + body = json.dumps({"code": PendingError.CODE, "description": "Certificate is not collectable."}) responses.add(responses.GET, self.test_url, body=body, status=400) # Call the function, expecting an exception smime = SMIME(client=self.client) - self.assertRaises(Pending, smime.collect, self.test_id) + self.assertRaises(PendingError, smime.collect, self.test_id) # Verify all the query information self.assertEqual(len(responses.calls), 1) @@ -338,14 +338,14 @@ def test_pending(self): @responses.activate def test_revoked(self): - """Raise a Revoked exception if an Http error with the pending code in the body is returned.""" + """Raise a RevokedError exception if an Http error with the pending code in the body is returned.""" # Setup the mocked response - body = json.dumps({"code": Revoked.CODE, "description": "The Certificate has been revoked!"}) + body = json.dumps({"code": RevokedError.CODE, "description": "The Certificate has been revoked!"}) responses.add(responses.GET, self.test_url, body=body, status=400) # Call the function, expecting an exception smime = SMIME(client=self.client) - self.assertRaises(Revoked, smime.collect, self.test_id) + self.assertRaises(RevokedError, smime.collect, self.test_id) # Verify all the query information self.assertEqual(len(responses.calls), 1)