From 8860ffbd0351a9df3e3efedb607daa9aa272eff2 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Wed, 15 May 2024 22:03:08 +0300 Subject: [PATCH 01/13] Allow domain control calidation by email. New methods: - dcv.start_validation_email - dcv.submit_validation_email --- cert_manager/dcv.py | 48 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index a7953e5..17c3585 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -85,6 +85,27 @@ def start_validation_cname(self, domain: str): return result.json() + def start_validation_email(self, domain: str): + """Start Domain Control Validation using Email method. + + :param string domain: The domain to validate + :return response: List of valid email addresses + """ + + url = self._url("validation", "start", "domain", "email") + data = {"domain": domain} + + try: + result = self._client.post(url, data=data) + except HTTPError as exc: + status_code = exc.response.status_code + if status_code == HTTPStatus.BAD_REQUEST: + err_response = exc.response.json() + raise ValueError(err_response["description"]) from exc + raise exc + + return result.json() + def submit_validation_cname(self, domain: str): """Finish Domain Control Validation using the CNAME method. @@ -111,3 +132,30 @@ def submit_validation_cname(self, domain: str): raise exc return result.json() + + def submit_validation_email(self, domain: str, email: str): + """Finish Domain Control Validation using the email method. + + :param string domain: The domain to validate + :param string email: validation email sent to + + :return response: a dictionary containing + status: The status of the validation + orderStatus: The status of the validation request + message: An optional message to help with debugging + """ + + url = self._url("validation", "submit", "domain", "email") + data = {"domain": domain, + "email": email} + + try: + result = self._client.post(url, data=data) + except HTTPError as exc: + status_code = exc.response.status_code + if status_code == HTTPStatus.BAD_REQUEST: + err_response = exc.response.json() + raise ValueError(err_response["description"]) from exc + raise exc + + return result.json() From 4e6f15fdb5a97ee38f4ca2d6292445887d66d4bd Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Thu, 16 May 2024 20:29:14 +0300 Subject: [PATCH 02/13] acme.get_domains: Get ACME account domains --- cert_manager/acme.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/cert_manager/acme.py b/cert_manager/acme.py index bfadb8c..0f1ae82 100644 --- a/cert_manager/acme.py +++ b/cert_manager/acme.py @@ -175,6 +175,29 @@ def add_domains(self, acme_id, domains): return result.json() + def get_domains (self, acme_id, **kwargs): + """List ACME account’s domains + + :param int acme_id: The ID of the acme account to list domains + """ + #self._change_api_version("v2") + self.__acc_domains = [] + result = self.find_domains(acme_id) + for dom in result: + self.__acc_domains.append(dom) + return self.__acc_domains + + @paginate + def find_domains(self, acme_id, **kwargs): + + params = { + self._find_params_to_api[param]: kwargs.get(param) + for param in self._find_params_to_api # pylint:disable=consider-using-dict-items + } + url = self._url(f"{acme_id}", "domain") + result = self._client.get(url, params=params) + return result.json() + def remove_domains(self, acme_id, domains): """Remove domains from an acme account. From bae3501f0882584f0c12a3641d7a3401601ee705 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Thu, 16 May 2024 20:31:18 +0300 Subject: [PATCH 03/13] CertManager init load DomainControlValidation --- cert_manager/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cert_manager/__init__.py b/cert_manager/__init__.py index 38d3037..a9dbac9 100644 --- a/cert_manager/__init__.py +++ b/cert_manager/__init__.py @@ -10,7 +10,7 @@ from .report import Report from .smime import SMIME from .ssl import SSL - +from .dcv import DomainControlValidation __all__ = [ "ACMEAccount", "Admin", "Client", "Domain", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" ] From 1fd069559b150022f6dbfa4c75cbce4c7936dcbc Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Thu, 16 May 2024 20:40:24 +0300 Subject: [PATCH 04/13] Try to pass automatic tests --- cert_manager/dcv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index 17c3585..da15697 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -144,7 +144,6 @@ def submit_validation_email(self, domain: str, email: str): orderStatus: The status of the validation request message: An optional message to help with debugging """ - url = self._url("validation", "submit", "domain", "email") data = {"domain": domain, "email": email} From 8a2b80f0109ba73822eda107798582e9d5d20a65 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Thu, 12 Sep 2024 15:04:07 +0300 Subject: [PATCH 05/13] move find_domains() private --- cert_manager/acme.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cert_manager/acme.py b/cert_manager/acme.py index 0f1ae82..8d4cbdc 100644 --- a/cert_manager/acme.py +++ b/cert_manager/acme.py @@ -182,13 +182,13 @@ def get_domains (self, acme_id, **kwargs): """ #self._change_api_version("v2") self.__acc_domains = [] - result = self.find_domains(acme_id) + result = self.__find_domains(acme_id) for dom in result: self.__acc_domains.append(dom) return self.__acc_domains @paginate - def find_domains(self, acme_id, **kwargs): + def __find_domains(self, acme_id, **kwargs): params = { self._find_params_to_api[param]: kwargs.get(param) From 5316b6d28b43b85d3040409682509dbe10fced82 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Thu, 26 Sep 2024 17:20:56 +0300 Subject: [PATCH 06/13] New method dcv.all --- cert_manager/dcv.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index da15697..7356aba 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -5,6 +5,7 @@ from requests.exceptions import HTTPError from ._endpoint import Endpoint +from ._helpers import paginate class DomainControlValidation(Endpoint): @@ -17,7 +18,24 @@ def __init__(self, client, api_version="v1"): :param string api_version: The API version to use; the default is "v1" """ super().__init__(client=client, endpoint="/dcv", api_version=api_version) + self.__dcv_domains = None + def all(self, force=False): + """Return list of all domain control validations. + :param bool force: If set to True, force refreshing the data from the API + """ + + if (self.__dcv_domains) and (not force): + return self.__dcv_domains + + self.__dcv_domains = [] + result = self.search() + for dom in result: + self.__dcv_domains.append(dom) + + return self.__dcv_domains + + @paginate def search(self, **kwargs): """Search the DCV statuses of domains. From 888c82ceb2f9f28bcf1703bdea9a33b95d23b102 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 14:31:34 +0300 Subject: [PATCH 07/13] dcv.__search --- cert_manager/dcv.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index 7356aba..0388573 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -29,13 +29,17 @@ def all(self, force=False): return self.__dcv_domains self.__dcv_domains = [] - result = self.search() + result = self.__search() for dom in result: self.__dcv_domains.append(dom) return self.__dcv_domains @paginate + def __search(self, **kwargs): + """Paginated wrapper for search""" + return search(**kwargs) + def search(self, **kwargs): """Search the DCV statuses of domains. From 21ae8767f4fcbd4299c84ea45c92141ebff5c695 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 14:43:33 +0300 Subject: [PATCH 08/13] Fix __search --- cert_manager/dcv.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index 0388573..73af01a 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -35,11 +35,6 @@ def all(self, force=False): return self.__dcv_domains - @paginate - def __search(self, **kwargs): - """Paginated wrapper for search""" - return search(**kwargs) - def search(self, **kwargs): """Search the DCV statuses of domains. @@ -56,6 +51,11 @@ def search(self, **kwargs): return result.json() + @paginate + def __search(self, **kwargs): + """Paginated wrapper for search""" + return self.search(**kwargs) + def get_validation_status(self, domain: str): """Get the DCV statuses of a domain. From a7a29053d372a711787c596826a8743d9fdb7bc4 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 17:20:17 +0300 Subject: [PATCH 09/13] Add missing tests --- tests/test_acme.py | 26 ++++++ tests/test_dcv.py | 210 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+) diff --git a/tests/test_acme.py b/tests/test_acme.py index 6a7d7ee..e68db4d 100644 --- a/tests/test_acme.py +++ b/tests/test_acme.py @@ -354,6 +354,32 @@ def test_ne_acme_id(self): self.assertRaises(HTTPError, acme.get, acme_id) +class TestGet_Domains(TestACMEAccount): + """Test the .get_domains method.""" + + @responses.activate + def test_acme_id(self): + """Return all domains from the specified ACME ID.""" + acme_id = 1234 + api_url = self.get_acme_account_url(acme_id) + "/domain?size=200&position=0" + valid_response = [] + + # Setup the mocked response + responses.add(responses.GET, api_url, json=valid_response, status=200) + + acme = ACMEAccount(client=self.client) + data = acme.get_domains(acme_id) + + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, api_url) + self.assertEqual(data, valid_response) + + + def test_need_acme_id(self): + """Raise an exception without an acme_id parameter.""" + acme = ACMEAccount(client=self.client) + self.assertRaises(TypeError, acme.get_domains) + def _test_create_test_factory(acme_id=1234, header="location", **kwargs): """Act as a wrapper to inject headers and parameters.""" params = ["name", "acmeServer", "organizationId", "evDetails"] diff --git a/tests/test_dcv.py b/tests/test_dcv.py index 4f169e0..fcbeb1c 100644 --- a/tests/test_dcv.py +++ b/tests/test_dcv.py @@ -72,6 +72,68 @@ def test_need_client(self): self.assertRaises(TypeError, DomainControlValidation) +class TestAll(TestDcv): + """Test the all method.""" + + def setUp(self): # pylint: disable=invalid-name + """Initialize the class.""" + # Call the inherited setUp method + super().setUp() + self.api_url = f"{self.cfixt.base_url}/dcv/v1/validation?size=200&position=0" + self.valid_response = [ + { + "domain": "*.mydomain.org", + "dcvStatus": "VALIDATED", + "dcvOrderStatus": "NOT_INITIATED", + "dcvMethod": "CNAME", + "expirationDate": "2024-03-19", + }, + { + "domain": "mydomain.org", + "dcvStatus": "VALIDATED", + "dcvOrderStatus": "NOT_INITIATED", + "dcvMethod": "CNAME", + "expirationDate": "2024-03-19", + }, + ] + + @responses.activate + def test_all(self): + """Return all the data, but it should query the API twice.""" + # Setup the mocked response + responses.add( + responses.GET, self.api_url, json=self.valid_response, status=HTTPStatus.OK + ) + + dcv = DomainControlValidation(client=self.client) + data = dcv.all() + + # Verify all the query information + # There should only be one call the first time "all" is called. + # Due to pagination, this is only guaranteed as long as the number of + # entries returned is less than the page size + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + self.assertEqual(data, self.valid_response) + + @responses.activate + def test_bad_http(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.GET, + self.api_url, + json=self.error_response, + status=HTTPStatus.BAD_REQUEST, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(HTTPError, domain.all) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + class TestSearch(TestDcv): """Test the .all method.""" @@ -292,6 +354,79 @@ def test_server_error(self): self.assertEqual(responses.calls[0].request.url, self.api_url) +class TestStartValidationEmail(TestDcv): + """Test the .all method.""" + + def setUp(self): # pylint: disable=invalid-name + """Initialize the class.""" + # Call the inherited setUp method + super().setUp() + self.params = { + "domain": "ccmqa.com", + } + self.api_url = f"{self.cfixt.base_url}/dcv/v1/validation/start/domain/email" + self.valid_response = { + "emails":["admin@ccmqa.com","administrator@ccmqa.com","hostmaster@ccmqa.com","postmaster@ccmqa.com","webmaster@ccmqa.com","domain-admin@ccmqa.com"] + } + + @responses.activate + def test_success(self): + """Return all the data, but it should query the API twice.""" + # Setup the mocked response + responses.add( + responses.POST, self.api_url, json=self.valid_response, status=HTTPStatus.OK + ) + + dcv = DomainControlValidation(client=self.client) + data = dcv.start_validation_email(**self.params) + + # Verify all the query information + # There should only be one call the first time "all" is called. + # Due to pagination, this is only guaranteed as long as the number of + # entries returned is less than the page size + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + self.assertEqual( + json.loads(responses.calls[0].request.body)["domain"], "ccmqa.com" + ) + self.assertEqual(data, self.valid_response) + + @responses.activate + def test_bad_req(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.BAD_REQUEST, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(ValueError, domain.start_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + + @responses.activate + def test_server_error(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(HTTPError, domain.start_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + class TestSubmitValidationCname(TestDcv): """Test the .all method.""" @@ -366,3 +501,78 @@ def test_server_error(self): # Verify all the query information self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.api_url) + +class TestSubmitValidationEmail(TestDcv): + """Test the .all method.""" + + def setUp(self): # pylint: disable=invalid-name + """Initialize the class.""" + # Call the inherited setUp method + super().setUp() + self.params = { + "domain": "mydomain.org", + "email": "administrator@mydomain.org" + } + self.api_url = f"{self.cfixt.base_url}/dcv/v1/validation/submit/domain/email" + self.valid_response = { + "status":"VALIDATED", + "orderStatus":"SUBMITTED", + "message":"Submitted successfully"} + + @responses.activate + def test_success(self): + """Return all the data, but it should query the API twice.""" + # Setup the mocked response + responses.add( + responses.POST, self.api_url, json=self.valid_response, status=HTTPStatus.OK + ) + + dcv = DomainControlValidation(client=self.client) + data = dcv.submit_validation_email(**self.params) + + # Verify all the query information + # There should only be one call the first time "all" is called. + # Due to pagination, this is only guaranteed as long as the number of + # entries returned is less than the page size + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + self.assertEqual( + json.loads(responses.calls[0].request.body)["domain"], "mydomain.org" + ) + self.assertEqual(data, self.valid_response) + + @responses.activate + def test_bad_req(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.BAD_REQUEST, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(ValueError, domain.submit_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + + @responses.activate + def test_server_error(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(HTTPError, domain.submit_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) From 1d2b222fca5b7e22d673fcd96edc8a0935b8ac2c Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 17:41:32 +0300 Subject: [PATCH 10/13] Load DomainControlValidation on __init__ --- cert_manager/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cert_manager/__init__.py b/cert_manager/__init__.py index a9dbac9..b896a93 100644 --- a/cert_manager/__init__.py +++ b/cert_manager/__init__.py @@ -12,5 +12,5 @@ from .ssl import SSL from .dcv import DomainControlValidation __all__ = [ - "ACMEAccount", "Admin", "Client", "Domain", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" + "ACMEAccount", "Admin", "Client", "Domain", "DomainControlValidation", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" ] From 89a76e3faa880e4563a95b50b7f99ffd29ccc745 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 17:46:09 +0300 Subject: [PATCH 11/13] Try to pass automatic tests --- cert_manager/dcv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index 73af01a..786d734 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -113,7 +113,6 @@ def start_validation_email(self, domain: str): :param string domain: The domain to validate :return response: List of valid email addresses """ - url = self._url("validation", "start", "domain", "email") data = {"domain": domain} From 638f94b9268173042e6f3570056b2af364dbd5f4 Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 18:03:40 +0300 Subject: [PATCH 12/13] Try to pass automatic tests --- cert_manager/__init__.py | 1 + cert_manager/dcv.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/cert_manager/__init__.py b/cert_manager/__init__.py index b896a93..c4779e6 100644 --- a/cert_manager/__init__.py +++ b/cert_manager/__init__.py @@ -11,6 +11,7 @@ from .smime import SMIME from .ssl import SSL from .dcv import DomainControlValidation + __all__ = [ "ACMEAccount", "Admin", "Client", "Domain", "DomainControlValidation", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" ] diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index 786d734..48f6ea6 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -10,7 +10,6 @@ class DomainControlValidation(Endpoint): """Query the Sectigo Cert Manager REST API for Domain Control Validation (DCV) data.""" - def __init__(self, client, api_version="v1"): """Initialize the class. From daa1cfc397c461336c3c360a282ce9e6d0a8e62a Mon Sep 17 00:00:00 2001 From: Hirvonen Mikko Vasili Date: Fri, 27 Sep 2024 18:15:56 +0300 Subject: [PATCH 13/13] Try to pass automatic tests --- cert_manager/dcv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index 48f6ea6..9a141ec 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -23,7 +23,6 @@ def all(self, force=False): """Return list of all domain control validations. :param bool force: If set to True, force refreshing the data from the API """ - if (self.__dcv_domains) and (not force): return self.__dcv_domains