Skip to content

Commit

Permalink
Change exceptions Pending and Revoked to PendingError and RevokedErro…
Browse files Browse the repository at this point in the history
…r (linting)
  • Loading branch information
coreone committed Oct 12, 2023
1 parent 040c5b2 commit 05cbba8
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 46 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ while(True):
cert_pem = ssl.collect(cert_id=result["sslId"], cert_format="x509CO")
print(cert_pem)
break
except Pending:
except PendingError:
print("Certificate is still pending...sleeping for 60s")
sleep(60)
continue
Expand Down
6 changes: 4 additions & 2 deletions cert_manager/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Initialize the cert_manager module."""

from ._helpers import Pending
from ._helpers import PendingError
from .acme import ACMEAccount
from .admin import Admin
from .client import Client
Expand All @@ -11,4 +11,6 @@
from .smime import SMIME
from .ssl import SSL

__all__ = ["ACMEAccount", "Admin", "Client", "Domain", "Organization", "Pending", "Person", "Report", "SMIME", "SSL"]
__all__ = [
"ACMEAccount", "Admin", "Client", "Domain", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL"
]
11 changes: 6 additions & 5 deletions cert_manager/_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from requests.exceptions import HTTPError

from ._endpoint import Endpoint
from ._helpers import CustomFieldsError, Pending
from ._helpers import CustomFieldsError, PendingError

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,6 +41,7 @@ def __init__(self, client, endpoint, api_version="v1"):
# Set to None initially. Will be filled in by methods later.
self.__cert_types = None
self.__custom_fields = None
self.reason_maxlen = 512

@property
def types(self):
Expand Down Expand Up @@ -111,7 +112,7 @@ def _validate_custom_fields(self, custom_fields):
def collect(self, cert_id, cert_format):
"""Retrieve an existing certificate from the API.
This method will raise a Pending exception if the certificate is still in a pending state.
This method will raise a PendingError exception if the certificate is still in a pending state.
:param int cert_id: The certificate ID
:param str cert_format: The format in which to retreive the certificate. Allowed values: *self.valid_formats*
Expand All @@ -125,7 +126,7 @@ def collect(self, cert_id, cert_format):
try:
result = self._client.get(url)
except HTTPError as exc:
raise Pending(f"certificate {cert_id} still in 'pending' state") from exc
raise PendingError(f"certificate {cert_id} still in 'pending' state") from exc

# The certificate is ready for collection
return result.content.decode(result.encoding)
Expand Down Expand Up @@ -228,8 +229,8 @@ def revoke(self, cert_id, reason=""):
url = self._url(f"/revoke/{cert_id}")

# Sectigo has a 512 character limit on the "reason" message, so catch that here.
if (not reason) or (len(reason) > 511):
raise ValueError("Sectigo limit: reason must be > 0 character and < 512 characters")
if not reason or len(reason) >= self.reason_maxlen:
raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.reason_maxlen} characters")

data = {"reason": reason}

Expand Down
29 changes: 12 additions & 17 deletions cert_manager/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,18 @@ def log_traffic(*args, **kwargs):

# Check if the URL or headers exist in the parameters
# Note: *self* will be the first argument, so actual arguments start after that.
url = kwargs.get("url", "")
if not url:
if len(args) > 1:
url = args[1]
headers = kwargs.get("headers", "")
if not headers:
if len(args) > 2:
headers = args[2]
data = kwargs.get("data", "")
if not data:
if len(args) > 3:
data = args[3]
params = ["url", "headers", "data"]
values = {}
for index, param in enumerate(params, start=1):
values[param] = kwargs.get(param, "")
if not values[param]:
if len(args) > index:
values[param] = args[index]

# Print out before messages with URL and header data
traffic_logger.debug(f"Performing a {func_name} on url: {url}")
traffic_logger.debug(f"Extra request headers: {headers}")
traffic_logger.debug(f"Data: {data}")
traffic_logger.debug(f"Performing a {func_name} on url: {values['url']}")
traffic_logger.debug(f"Extra request headers: {values['headers']}")
traffic_logger.debug(f"Data: {values['data']}")

# Run the wrapped function
try:
Expand Down Expand Up @@ -148,12 +143,12 @@ def decorator(*args, **kwargs):
return decorator


class Pending(Exception):
class PendingError(Exception):
"""Serve as a generic Exception indicating a certificate is in a pending state."""
CODE = -183


class Revoked(Exception):
class RevokedError(Exception):
"""Serve as a generic Exception indicating a certificate has been revoked."""
CODE = -192

Expand Down
21 changes: 11 additions & 10 deletions cert_manager/smime.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from requests.exceptions import HTTPError

from ._certificates import Certificates
from ._helpers import Pending, Revoked, paginate, version_hack
from ._helpers import PendingError, RevokedError, paginate, version_hack

LOGGER = logging.getLogger(__name__)

Expand All @@ -19,6 +19,7 @@ def __init__(self, client, api_version="v1"):
:param string api_version: The API version to use; the default is "v1"
"""
super().__init__(client=client, endpoint="/smime", api_version=api_version)
self.reason_maxlen = 512

@paginate
def list(self, **kwargs):
Expand Down Expand Up @@ -118,7 +119,7 @@ def enroll(self, **kwargs):
def collect(self, cert_id):
"""Retrieve an existing client certificate from the API.
This method will raise a Pending exception if the certificate is still in a pending state.
This method will raise a PendingError exception if the certificate is still in a pending state.
:param int cert_id: The Certificate ID given on enroll success
:return str: the string representing the certificate in the requested format
Expand All @@ -131,10 +132,10 @@ def collect(self, cert_id):
result = self._client.get(url)
except HTTPError as exc:
err_code = exc.response.json().get("code")
if err_code == Revoked.CODE:
raise Revoked(f"certificate {cert_id} in 'revoked' state") from exc
if err_code == Pending.CODE:
raise Pending(f"certificate {cert_id} still in 'pending' state") from exc
if err_code == RevokedError.CODE:
raise RevokedError(f"certificate {cert_id} in 'revoked' state") from exc
if err_code == PendingError.CODE:
raise PendingError(f"certificate {cert_id} still in 'pending' state") from exc
raise exc

# The certificate is ready for collection
Expand Down Expand Up @@ -193,8 +194,8 @@ def revoke(self, cert_id, reason=""):
raise ValueError("Argument 'cert_id' can't be None")

# Sectigo has a 512 character limit on the "reason" message, so catch that here.
if (not reason) or (len(reason) > 511):
raise ValueError("Sectigo limit: reason must be > 0 character and < 512 characters")
if not reason or len(reason) >= self.reason_maxlen:
raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.reason_maxlen} characters")

data = {"reason": reason}
self._client.post(url, data=data)
Expand All @@ -212,8 +213,8 @@ def revoke_by_email(self, email, reason=""):
raise ValueError("Argument 'email' can't be empty or None")

# Sectigo has a 512 character limit on the "reason" message, so catch that here.
if (not reason) or (len(reason) > 511):
raise ValueError("Sectigo limit: reason must be > 0 character and < 512 characters")
if not reason or len(reason) >= self.reason_maxlen:
raise ValueError(f"Sectigo limit: reason must be > 0 character and < {self.reason_maxlen} characters")

data = {"email": email, "reason": reason}
self._client.post(url, data=data)
9 changes: 5 additions & 4 deletions tests/test_certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from testtools import TestCase

from cert_manager._certificates import Certificates
from cert_manager._helpers import Pending
from cert_manager._helpers import PendingError

from .lib.testbase import ClientFixture

Expand Down Expand Up @@ -49,10 +49,11 @@ def fake_csr():
@staticmethod
def fake_cert():
"""Build a fake certificate to use with tests."""
numrows = 27
data = "-----BEGIN CERTIFICATE-----\n"
for row in range(1, 33):
char = chr(row + 64)
if row > 26:
if row >= numrows:
char = chr(row + 70)
data += f"{char * 64}\n"
data += "-----END CERTIFICATE-----\n"
Expand Down Expand Up @@ -254,12 +255,12 @@ def test_success(self):

@responses.activate
def test_pending(self):
"""Raise a Pending exception if an error status code is returned."""
"""Raise a PendingError exception if an error status code is returned."""
# Setup the mocked response
responses.add(responses.GET, self.test_url, body="", status=404)

# Call the function, expecting an exception
self.assertRaises(Pending, self.certobj.collect, self.test_id, self.test_type)
self.assertRaises(PendingError, self.certobj.collect, self.test_id, self.test_type)

# Verify all the query information
self.assertEqual(len(responses.calls), 1)
Expand Down
14 changes: 7 additions & 7 deletions tests/test_smime.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from requests import HTTPError
from testtools import TestCase

from cert_manager._helpers import Pending, Revoked
from cert_manager._helpers import PendingError, RevokedError
from cert_manager.smime import SMIME

from .lib.testbase import ClientFixture
Expand Down Expand Up @@ -323,29 +323,29 @@ def test_no_cert_id(self):

@responses.activate
def test_pending(self):
"""Raise a Pending exception if an Http error with the pending code in the body is returned."""
"""Raise a PendingError exception if an Http error with the pending code in the body is returned."""
# Setup the mocked response
body = json.dumps({"code": Pending.CODE, "description": "Certificate is not collectable."})
body = json.dumps({"code": PendingError.CODE, "description": "Certificate is not collectable."})
responses.add(responses.GET, self.test_url, body=body, status=400)

# Call the function, expecting an exception
smime = SMIME(client=self.client)
self.assertRaises(Pending, smime.collect, self.test_id)
self.assertRaises(PendingError, smime.collect, self.test_id)

# Verify all the query information
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, self.test_url)

@responses.activate
def test_revoked(self):
"""Raise a Revoked exception if an Http error with the pending code in the body is returned."""
"""Raise a RevokedError exception if an Http error with the pending code in the body is returned."""
# Setup the mocked response
body = json.dumps({"code": Revoked.CODE, "description": "The Certificate has been revoked!"})
body = json.dumps({"code": RevokedError.CODE, "description": "The Certificate has been revoked!"})
responses.add(responses.GET, self.test_url, body=body, status=400)

# Call the function, expecting an exception
smime = SMIME(client=self.client)
self.assertRaises(Revoked, smime.collect, self.test_id)
self.assertRaises(RevokedError, smime.collect, self.test_id)

# Verify all the query information
self.assertEqual(len(responses.calls), 1)
Expand Down

0 comments on commit 05cbba8

Please sign in to comment.