From 33c71e7ce886748a0dfb85c937fd04b9ee5b48e4 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Wed, 3 Apr 2024 23:29:04 +0000 Subject: [PATCH 1/5] style: format code with Autopep8, Black, isort and Yapf This commit fixes the style issues introduced in ca95975 according to the output from Autopep8, Black, isort and Yapf. Details: https://github.com/One-Click-Auth/TrustAuthx-Py-SDK/pull/53 --- setup.py | 36 +-- trustauthx/authlite.py | 488 ++++++++++++++++++++++++----------------- 2 files changed, 303 insertions(+), 221 deletions(-) diff --git a/setup.py b/setup.py index 1a011be..21b3409 100644 --- a/setup.py +++ b/setup.py @@ -1,27 +1,27 @@ -from setuptools import setup, find_packages import os +from setuptools import find_packages, setup + this_directory = os.path.abspath(os.path.dirname(__file__)) -with open(os.path.join(this_directory, 'README.md'), encoding='utf-8') as f: +with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f: long_description = f.read() - setup( - name='trustauthx', - version='0.7.2', - description='Official connector SDK for TrustAuthx', + name="trustauthx", + version="0.7.2", + description="Official connector SDK for TrustAuthx", long_description=long_description, - long_description_content_type='text/markdown', # This is important! - author='moonlightnexus', - author_email='nexus@trustauthx.com', + long_description_content_type="text/markdown", # This is important! + author="moonlightnexus", + author_email="nexus@trustauthx.com", url="https://github.com/One-Click-Auth/TrustAuthx-Py-SDK.git", license="MIT", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.9', - ], + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.9", + ], packages=find_packages(), install_requires=[ "certifi>=2023.5.7", @@ -37,11 +37,11 @@ "urllib3<=3.0.0", "charset-normalizer>=3.2.0", "python-jose>=3.3.0", - "python-dotenv==1.0.0" - ], + "python-dotenv==1.0.0", + ], entry_points={ - 'console_scripts': [ - 'trustauthx = trustauthx.cli:main', + "console_scripts": [ + "trustauthx = trustauthx.cli:main", ], }, ) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 329e44a..9a670af 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -1,13 +1,16 @@ -import requests -from requests.exceptions import HTTPError -from jose import JWTError, jwt -from jose.constants import ALGORITHMS import json import sqlite3 -from .scheme import * -from functools import wraps import threading from dataclasses import asdict +from functools import wraps + +import requests +from jose import JWTError, jwt +from jose.constants import ALGORITHMS +from requests.exceptions import HTTPError + +from .scheme import * + class _EdgeDBRoleQuery: """ @@ -32,6 +35,7 @@ class _EdgeDBRoleQuery: count_roles(self): Returns the number of roles stored. """ + total_roles = 0 roles = None @@ -45,19 +49,26 @@ def __init__(self, roles, in_memory=True): """ self.in_memory = in_memory if self.in_memory: - self.__class__.roles = {role['rol_id']: role['permissions'][0] for role in roles} + self.__class__.roles = { + role["rol_id"]: role["permissions"][0] for role in roles + } else: - self.conn = sqlite3.connect(':memory:') # replace ':memory:' with your database path + # replace ':memory:' with your database path + self.conn = sqlite3.connect(":memory:") self.cursor = self.conn.cursor() - self.cursor.execute(""" + self.cursor.execute( + """ CREATE TABLE IF NOT EXISTS roles ( role_id TEXT PRIMARY KEY, permissions TEXT ) - """) + """ + ) for role in roles: for role_id, permissions in role.items(): - self.cursor.execute("INSERT INTO roles VALUES (?, ?)", (role_id, permissions)) + self.cursor.execute( + "INSERT INTO roles VALUES (?, ?)", (role_id, permissions) + ) self.conn.commit() self.count_roles() @@ -78,21 +89,33 @@ def query(self, role_id=None, permission_key=None): elif role_id: return self.__class__.roles.get(role_id, None) elif permission_key: - return {role_id: permissions[permission_key] for role_id, permissions in self.__class__.roles.items() if permission_key in permissions} + return { + role_id: permissions[permission_key] + for role_id, permissions in self.__class__.roles.items() + if permission_key in permissions + } else: return self.__class__.roles else: if role_id and permission_key: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) permissions = self.cursor.fetchone() if permissions: return permissions[0].get(permission_key, None) elif role_id: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) return self.cursor.fetchone() elif permission_key: self.cursor.execute("SELECT * FROM roles") - return {role_id: permissions[permission_key] for role_id, permissions in self.cursor.fetchall() if permission_key in permissions} + return { + role_id: permissions[permission_key] + for role_id, permissions in self.cursor.fetchall() + if permission_key in permissions + } else: self.cursor.execute("SELECT * FROM roles") return self.cursor.fetchall() @@ -110,9 +133,14 @@ def validate(self, role_id, permission_key, permission_val): bool: True if the permission value matches the expected value, False otherwise. """ if self.in_memory: - return self.__class__.roles.get(role_id, {}).get(permission_key, None) == permission_val + return ( + self.__class__.roles.get(role_id, {}).get(permission_key, None) + == permission_val + ) else: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) permissions = self.cursor.fetchone() if permissions: return permissions[0].get(permission_key, None) == permission_val @@ -138,18 +166,21 @@ def count_roles(self): def reinitialize_all(foreground=True): if foreground: for instance in AuthLiteClient.instances: - instance:AuthLiteClient = instance + instance: AuthLiteClient = instance instance._re_init_roles() else: + def target(): for instance in AuthLiteClient.instances: - instance:AuthLiteClient = instance + instance: AuthLiteClient = instance instance._re_init_roles() + thread = threading.Thread(target=target) thread.start() @staticmethod def _EDGE_Wrapper(func): + @wraps(func) def wrapper(*args, **kwargs): # Call the function @@ -158,14 +189,17 @@ def wrapper(*args, **kwargs): x_edge = response.headers.get("X-EDGE") if x_edge: if int(x_edge) != _EdgeDBRoleQuery.total_roles: - _EdgeDBRoleQuery.reinitialize_all() # Add data + _EdgeDBRoleQuery.reinitialize_all() # Add data return response + return wrapper + requests.get = _EdgeDBRoleQuery._EDGE_Wrapper(requests.get) requests.post = _EdgeDBRoleQuery._EDGE_Wrapper(requests.post) requests.delete = _EdgeDBRoleQuery._EDGE_Wrapper(requests.delete) + class _Roles(_EdgeDBRoleQuery): """ A class for managing roles and permissions in the EdgeDB system. @@ -194,9 +228,19 @@ class _Roles(_EdgeDBRoleQuery): delete_permission(self, rol_id, **Permission_): Deletes a permission from a role with the specified role ID. """ + instances = [] - def __init__(self, roles, org_id, api_key, signed_key, secret_key, API_BASE_URL, InMemory=True): + def __init__( + self, + roles, + org_id, + api_key, + signed_key, + secret_key, + API_BASE_URL, + InMemory=True, + ): """ Initializes the _Roles instance. @@ -225,7 +269,7 @@ def get_all_roles(self) -> GetAllRolesResponse: Returns: roles_list = List[Role]: A list of Role objects representing the roles and their permissions.roles roles_json_list = List[dict]: A list of dict representing the roles and their permissions - + demo response ==> [ { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", @@ -260,18 +304,20 @@ def get_all_roles(self) -> GetAllRolesResponse: ] } ]""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._signed_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._signed_key}", } response = requests.get(url, headers=headers, params=params) roles = [Role(**role_data) for role_data in response.json()] - return GetAllRolesResponse(roles_list=roles, roles_json_list=[asdict(role) for role in roles]) + return GetAllRolesResponse( + roles_list=roles, roles_json_list=[asdict(role) for role in roles] + ) - def add_role(self, name, **Permission_)->AddRoleResponse: + def add_role(self, name, **Permission_) -> AddRoleResponse: """ Adds a new role with the specified name and permissions. @@ -281,7 +327,7 @@ def add_role(self, name, **Permission_)->AddRoleResponse: Returns: AddRoleResponse: An AddRoleResponse object representing the newly created role. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_rce_474ae9e59b3d49ce", @@ -298,34 +344,28 @@ def add_role(self, name, **Permission_)->AddRoleResponse: } ] }""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._signed_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._signed_key}", } permissions = [{k: v} for k, v in Permission_.items()] - data = { - "org_id": f'{self.org_id}', - "name": name, - "permissions": permissions - } - response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + data = {"org_id": f"{self.org_id}", "name": name, "permissions": permissions} + response = requests.post( + url, headers=headers, params=params, data=json.dumps(data) + ) role_data = response.json() permissions = [Permission(**p) for p in role_data.get("permissions", [])] return AddRoleResponse( org_id=role_data.get("org_id"), rol_id=role_data.get("rol_id"), name=role_data.get("name"), - permissions=[p.__dict__ for p in permissions] + permissions=[p.__dict__ for p in permissions], ) def delete_role(self, rol_id) -> DeleteRoleResponse: - """ Deletes a role with the specified role ID. @@ -334,7 +374,7 @@ def delete_role(self, rol_id) -> DeleteRoleResponse: Returns: DeleteRoleResponse: A DeleteRoleResponse object representing the deleted role. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_YHV_78ae9006bcaa4c77", @@ -351,31 +391,29 @@ def delete_role(self, rol_id) -> DeleteRoleResponse: } ] }""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._signed_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._signed_key}", } - data = { - "org_id": f'{self.org_id}', - "rol_id": rol_id - } - response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + data = {"org_id": f"{self.org_id}", "rol_id": rol_id} + response = requests.delete( + url, headers=headers, params=params, data=json.dumps(data) + ) role_data = response.json() permissions = [Permission(**p) for p in role_data.get("permissions", [])] return DeleteRoleResponse( org_id=role_data.get("org_id"), rol_id=role_data.get("rol_id"), name=role_data.get("name"), - permissions=[p.__dict__ for p in permissions] + permissions=[p.__dict__ for p in permissions], ) - def add_permission(self, rol_id, foreground=False, **Permission_) -> AddPermissionResponse: + def add_permission( + self, rol_id, foreground=False, **Permission_ + ) -> AddPermissionResponse: """ Adds a new permission to a role with the specified role ID. @@ -385,7 +423,7 @@ def add_permission(self, rol_id, foreground=False, **Permission_) -> AddPermissi Returns: AddPermissionResponse: An AddPermissionResponse object representing the added permission. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_rce_474ae9e59b3d49ce", @@ -395,33 +433,34 @@ def add_permission(self, rol_id, foreground=False, **Permission_) -> AddPermissi } ] }""" - url = f'{self.API_BASE_URL}/rbac/permission' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + url = f"{self.API_BASE_URL}/rbac/permission" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._signed_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._signed_key}", } permissions = [{k: v} for k, v in Permission_.items()] data = { - "org_id": f'{self.org_id}', + "org_id": f"{self.org_id}", "rol_id": rol_id, - "permissions": permissions + "permissions": permissions, } - response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + response = requests.post( + url, headers=headers, params=params, data=json.dumps(data) + ) response_data = response.json() permissions = [Permission(**{k: v}) for k, v in permissions.items()] self.reinitialize_all(foreground) return AddPermissionResponse( org_id=response_data.get("org_id"), rol_id=response_data.get("rol_id"), - permissions=[p.__dict__ for p in permissions] + permissions=[p.__dict__ for p in permissions], ) - def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePermissionResponse: + def delete_permission( + self, rol_id, foreground=False, **Permission_ + ) -> DeletePermissionResponse: """ Deletes a permission from a role with the specified role ID. @@ -431,7 +470,7 @@ def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePe Returns: DeletePermissionResponse: A DeletePermissionResponse object representing the role with the deleted permission. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_rce_474ae9e59b3d49ce", @@ -446,30 +485,29 @@ def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePe "maintainer": "administration" } ] - }""" #return full - url = f'{self.API_BASE_URL}/rbac/permission' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + }""" # return full + url = f"{self.API_BASE_URL}/rbac/permission" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._signed_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._signed_key}", } permissions = [{k: v} for k, v in Permission_.items()] data = { - "org_id": f'{self.org_id}', + "org_id": f"{self.org_id}", "rol_id": rol_id, - "permissions": permissions + "permissions": permissions, } - response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + response = requests.delete( + url, headers=headers, params=params, data=json.dumps(data) + ) self.reinitialize_all(foreground) return response.json() - -class AuthLiteClient(): - instances = [] + +class AuthLiteClient: + instances = [] """ AuthLiteClient is a Python client for the TrustAuthX authentication service. @@ -525,49 +563,70 @@ class TokenCheck: refresh (str): The refresh token. state (bool): The state of the tokens (True if valid, False otherwise). """ - access :str - refresh:str - state:bool - def __init__(self, api_key, secret_key, org_id=None, API_BASE_URL="https://api.trustauthx.com", in_memory=True): + access: str + refresh: str + state: bool + + def __init__( + self, + api_key, + secret_key, + org_id=None, + API_BASE_URL="https://api.trustauthx.com", + in_memory=True, + ): """ - Initializes the AuthLiteClient instance. + Initializes the AuthLiteClient instance. + + Args: + api_key (str): The API key used for authentication. + secret_key (str): The secret key used for JWT encoding. + org_id (str, optional): The organization ID for generating authentication URLs. + API_BASE_URL (str, optional): The base URL for the API. Defaults to "https://api.trustauthx.com". + in_memory (bool, optional): Flag indicating whether to store the roles in-memory or in a SQLite database. Defaults to True (ie. in-memory). - Args: - api_key (str): The API key used for authentication. - secret_key (str): The secret key used for JWT encoding. - org_id (str, optional): The organization ID for generating authentication URLs. - API_BASE_URL (str, optional): The base URL for the API. Defaults to "https://api.trustauthx.com". - in_memory (bool, optional): Flag indicating whether to store the roles in-memory or in a SQLite database. Defaults to True (ie. in-memory). - - """ - self.jwt_encode = lambda key, data: jwt.encode(data, key=key, algorithm= ALGORITHMS.HS256) - self.jwt_decode = lambda key, data: jwt.decode(str(data), key=key, algorithms=ALGORITHMS.HS256) + """ + self.jwt_encode = lambda key, data: jwt.encode( + data, key=key, algorithm=ALGORITHMS.HS256 + ) + self.jwt_decode = lambda key, data: jwt.decode( + str(data), key=key, algorithms=ALGORITHMS.HS256 + ) self._secret_key = secret_key self._api_key = api_key self.org_id = org_id - self._signed_key = self.jwt_encode(key=self._secret_key, data={"api_key":self._api_key}) + self._signed_key = self.jwt_encode( + key=self._secret_key, data={"api_key": self._api_key} + ) self.API_BASE_URL = API_BASE_URL self.in_memory = in_memory - self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id, - api_key=self._api_key, signed_key=self._signed_key, - secret_key=self._secret_key, API_BASE_URL=self.API_BASE_URL, - InMemory=in_memory) + self.Roles: _Roles = _Roles( + roles=self._set_edge_roles(), + org_id=self.org_id, + api_key=self._api_key, + signed_key=self._signed_key, + secret_key=self._secret_key, + API_BASE_URL=self.API_BASE_URL, + InMemory=in_memory, + ) self.__class__.instances.append(self) - + def generate_url(self) -> str: """ Generates an authentication URL for the given organization. Returns: str: The generated authentication URL. - + Raises: ValueError: If org_id is not provided. """ # Generate an authentication url for the given org - if self.org_id:return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" - else:raise ValueError("must provide org_id") + if self.org_id: + return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" + else: + raise ValueError("must provide org_id") def generate_edit_user_url(self, access_token, url) -> str: """ @@ -581,15 +640,15 @@ def generate_edit_user_url(self, access_token, url) -> str: str: The generated authentication URL. """ # Generate an authentication url for the given org - headers = {'accept': 'application/json'} + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self._api_key, - 'signed_key': self._signed_key, - 'url':url - } + "AccessToken": access_token, + "api_key": self._api_key, + "signed_key": self._signed_key, + "url": url, + } url = f"{self.API_BASE_URL}/api/user/me/settings/" - req = requests.Request('GET', url, params=params, headers=headers).prepare() + req = requests.Request("GET", url, params=params, headers=headers).prepare() return req.url def re_auth(self, code): @@ -608,23 +667,24 @@ def re_auth(self, code): url = f"{self.API_BASE_URL}/api/user/me/widget/re-auth/token" params = { "code": code, - 'api_key': self._api_key, - 'signed_key': self._signed_key + "api_key": self._api_key, + "signed_key": self._signed_key, } headers = {"accept": "application/json"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self._secret_key,response.json()) + rtn = self.jwt_decode(self._secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user(self, token) -> dict: """ @@ -640,26 +700,27 @@ def get_user(self, token) -> dict: HTTPError: If the request fails with an HTTP error status code. """ # Validate the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/auth/data' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/auth/data" + headers = {"accept": "application/json"} params = { - 'UserToken': token, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "UserToken": token, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self._secret_key,response.json()) + rtn = self.jwt_decode(self._secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user_data(self, AccessToken) -> dict: """ @@ -676,22 +737,23 @@ def get_user_data(self, AccessToken) -> dict: """ # Validate the given authentication token """returns a dict containing 'access_token', 'refresh_token', 'img', 'sub'""" - url = f'{self.API_BASE_URL}/api/user/me/data' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/data" + headers = {"accept": "application/json"} params = { - 'AccessToken': AccessToken, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "AccessToken": AccessToken, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self._secret_key,response.json()) + rtn = self.jwt_decode(self._secret_key, response.json()) return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_access_token_from_refresh_token(self, refresh_token): """ @@ -707,20 +769,22 @@ def get_access_token_from_refresh_token(self, refresh_token): HTTPError: If the request fails with an HTTP error status code. """ # Store the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/access/token/' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/access/token/" + headers = {"accept": "application/json"} params = { - 'RefreshToken': refresh_token, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "RefreshToken": refresh_token, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def validate_access_token(self, access_token) -> bool: """ @@ -733,17 +797,22 @@ def validate_access_token(self, access_token) -> bool: bool: True if the access token is valid, False otherwise. """ # Store the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/auth/validate/token' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/auth/validate/token" + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "AccessToken": access_token, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) return response.status_code == 200 - def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_tokens:bool = False) -> bool: + def revoke_token( + self, + AccessToken: str = None, + RefreshToken: str = None, + revoke_all_tokens: bool = False, + ) -> bool: """ Revokes an access token or refresh token. @@ -759,22 +828,28 @@ def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_ HTTPError: If the request fails with an HTTP error status code. AttributeError: If neither AccessToken nor RefreshToken is provided. """ - url = f'{self.API_BASE_URL}/api/user/me/token/' - headers = {'accept': 'application/json'} - if not AccessToken and not RefreshToken:raise AttributeError("must provide either AccessToken or RefreshToken") - tt=True if AccessToken else False + url = f"{self.API_BASE_URL}/api/user/me/token/" + headers = {"accept": "application/json"} + if not AccessToken and not RefreshToken: + raise AttributeError("must provide either AccessToken or RefreshToken") + tt = True if AccessToken else False t = AccessToken if AccessToken else RefreshToken params = { - 'Token': t, - 'api_key': self._api_key, - 'signed_key': self._signed_key, - 'AccessToken': tt, - 'SpecificTokenOnly':not revoke_all_tokens, - } + "Token": t, + "api_key": self._api_key, + "signed_key": self._signed_key, + "AccessToken": tt, + "SpecificTokenOnly": not revoke_all_tokens, + } response = requests.delete(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format(response.status_code, response.text)) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def validate_token_set(self, access_token, refresh_token) -> TokenCheck: """ @@ -797,8 +872,8 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: if refresh_token: new_tokens = self.get_access_token_from_refresh_token(refresh_token) d.state = False - d.access = new_tokens['access_token'] - d.refresh = new_tokens['refresh_token'] + d.access = new_tokens["access_token"] + d.refresh = new_tokens["refresh_token"] return d else: d.state = True @@ -806,30 +881,37 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: d.refresh = refresh_token return d except: - raise HTTPError('both tokens are invalid login again') - + raise HTTPError("both tokens are invalid login again") + def _set_edge_roles(self) -> list: # self.Roles - url = f'{self.API_BASE_URL}/rbac/role' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._signed_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._signed_key}", } response = requests.get(url, headers=headers, params=params) roles = [Role(**role_data) for role_data in response.json()] - roles = GetAllRolesResponse(roles_list=roles, roles_json_list=[asdict(role) for role in roles]) + roles = GetAllRolesResponse( + roles_list=roles, roles_json_list=[asdict(role) for role in roles] + ) # print(roles.roles_json_list) return roles.roles_json_list def _re_init_roles(self) -> _Roles: - self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id, - api_key=self._api_key, signed_key=self._signed_key, - secret_key=self._secret_key, API_BASE_URL=self.API_BASE_URL, - InMemory=self.in_memory) + self.Roles: _Roles = _Roles( + roles=self._set_edge_roles(), + org_id=self.org_id, + api_key=self._api_key, + signed_key=self._signed_key, + secret_key=self._secret_key, + API_BASE_URL=self.API_BASE_URL, + InMemory=self.in_memory, + ) return self.Roles - + def attach_role(self, uid, signoff_session_and_assign, refresh_token, access_token): # add this and test pass @@ -840,4 +922,4 @@ def remove_role(self, uid, signoff_session_and_assign, refresh_token, access_tok def update_role(self, uid, signoff_session_and_assign, refresh_token, access_token): # add this and test - pass \ No newline at end of file + pass From c725684e59127f93fb41eabae1a90dec0285e487 Mon Sep 17 00:00:00 2001 From: moonlightnexus Date: Fri, 5 Apr 2024 21:48:53 +0530 Subject: [PATCH 2/5] added respinse class dict function to each class rest in progress --- trustauthx/authlite.py | 7 ++++--- trustauthx/scheme.py | 36 ++++++++++++++++++++++++++++++++++-- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 329e44a..f464492 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -466,7 +466,7 @@ def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePe response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) self.reinitialize_all(foreground) return response.json() - + class AuthLiteClient(): instances = [] @@ -626,7 +626,7 @@ def re_auth(self, code): response.text) ) - def get_user(self, token) -> dict: + def get_user(self, token, return_class=False) -> User|dict: """ Validates the given authentication token and returns user data. @@ -654,7 +654,8 @@ def get_user(self, token) -> dict: rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] - return rtn + if not return_class:return User(rtn).to_dict() + else: return User(rtn) else:raise HTTPError( 'Request failed with status code : {} \n this code contains a msg : {}'.format( response.status_code, diff --git a/trustauthx/scheme.py b/trustauthx/scheme.py index 7771c7a..547161c 100644 --- a/trustauthx/scheme.py +++ b/trustauthx/scheme.py @@ -9,7 +9,9 @@ class Permission: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) - + + def to_dict(self): + return asdict(self) @dataclass class Role: """ @@ -26,12 +28,15 @@ class Role: name: str permissions: List[Permission] - + def to_dict(self): + return asdict(self) @dataclass class GetAllRolesResponse: roles_list: List[Role] roles_json_list: List[Dict[str, Union[str, List[Dict[str, str]]]]] + def to_dict(self): + return asdict(self) @dataclass class AddRoleResponse: org_id: str @@ -39,6 +44,8 @@ class AddRoleResponse: name: str permissions: List[Permission] + def to_dict(self): + return asdict(self) @dataclass class DeleteRoleResponse: org_id: str @@ -46,19 +53,44 @@ class DeleteRoleResponse: name: str permissions: List[Permission] + def to_dict(self): + return asdict(self) @dataclass class AddPermissionResponse: org_id: str rol_id: str permissions: List[Dict[str, str]] + def to_dict(self): + return asdict(self) + @dataclass class DeletePermissionResponse: org_id: str rol_id: str permissions: List[Permission] + def to_dict(self): + return asdict(self) + +@dataclass +class User: + iss: str + jti: str + access_token: str + type: str + exp: float + refresh_token: str + refresh_exp: int + scope: List[str] + img: str + name: str + iat: int + email: str + uid: str + def to_dict(self): + return asdict(self) """# Demo data demo_get_all_roles_response = GetAllRolesResponse(roles=[ From 6eb5b440470f653865c91818343ea981c60fbe8f Mon Sep 17 00:00:00 2001 From: moonlightnexus Date: Fri, 5 Apr 2024 22:16:40 +0530 Subject: [PATCH 3/5] "Add methods to attach, remove, and update roles in AuthLiteClient class; add error checking for role ID types" This commit adds new methods to the AuthLiteClient class for attaching, removing, and updating roles. It also includes error checking to ensure that the role IDs are of the correct type. The methods make API requests to the AuthLite server to assign or remove roles for a user. The 'signoff\_session\_and\_assign', 'refresh\_token', and 'access\_token' parameters are included to authenticate the requests. --- trustauthx/authlite.py | 104 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 9 deletions(-) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index c2004fd..2796f51 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -1,4 +1,5 @@ import json +from lib2to3.pgen2.parse import ParseError import sqlite3 import threading from dataclasses import asdict @@ -911,14 +912,99 @@ def _re_init_roles(self) -> _Roles: ) return self.Roles - def attach_role(self, uid, signoff_session_and_assign, refresh_token, access_token): - # add this and test - pass + def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, refresh_token=None, access_token=None): + if signoff_session_and_assign: + if not refresh_token or not access_token: + raise ParseError( + "must parse refresh_token and access_token if signoff_session_and_assign is true" + ) + url = f"{self.API_BASE_URL}/rbac/assign/permission" + headers = { + "accept": "application/json", + "Content-Type": "application/json", + } + params = { + "org_id": self.org_id, + "api_key": self._api_key, + "signed_key": self._signed_key, + } + rols = [] + if isinstance(rol_ids) == str: rols.append(rol_ids) + elif isinstance(rol_ids) == list: rols = [i for i in rol_ids] + else:raise TypeError() + data = { + "uid": uid, + "rol_id": rol_ids, + "inplace": [], + "signoff_session_and_assign": signoff_session_and_assign, + "AccessToken": access_token, + "RefreshToken": refresh_token, + } + response = requests.post(url, headers=headers, params=params, json=data) + return response.json() - def remove_role(self, uid, signoff_session_and_assign, refresh_token, access_token): - # add this and test - pass + def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, refresh_token=None, access_token=None): + if signoff_session_and_assign: + if not refresh_token or not access_token: + raise ParseError( + "must parse refresh_token and access_token if signoff_session_and_assign is true" + ) + url = f"{self.API_BASE_URL}/rbac/assign/permission" + headers = { + "accept": "application/json", + "Content-Type": "application/json", + } + params = { + "org_id": self.org_id, + "api_key": self._api_key, + "signed_key": self._signed_key, + } + rols = [] + if isinstance(rol_ids) == str: rols.append(rol_ids) + elif isinstance(rol_ids) == list: rols = [i for i in rol_ids] + else:raise TypeError() + data = { + "uid": uid, + "rol_id": [], + "inplace": rol_ids, + "signoff_session_and_assign": signoff_session_and_assign, + "AccessToken": access_token, + "RefreshToken": refresh_token, + } + response = requests.post(url, headers=headers, params=params, json=data) + return response.json() - def update_role(self, uid, signoff_session_and_assign, refresh_token, access_token): - # add this and test - pass + def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|list, signoff_session_and_assign=False, refresh_token=None, access_token=None): + if signoff_session_and_assign: + if not refresh_token or not access_token: + raise ParseError( + "must parse refresh_token and access_token if signoff_session_and_assign is true" + ) + url = f"{self.API_BASE_URL}/rbac/assign/permission" + headers = { + "accept": "application/json", + "Content-Type": "application/json", + } + params = { + "org_id": self.org_id, + "api_key": self._api_key, + "signed_key": self._signed_key, + } + rols_add = [] + if isinstance(rol_ids_to_add) == str: rols_add.append(rol_ids_to_add) + elif isinstance(rol_ids_to_add) == list: rols_add = [i for i in rol_ids_to_add] + else:raise TypeError() + rols_rem = [] + if isinstance(rol_ids_to_remove) == str: rols_rem.append(rol_ids_to_remove) + elif isinstance(rol_ids_to_remove) == list: rols_rem = [i for i in rol_ids_to_remove] + else:raise TypeError() + data = { + "uid": uid, + "rol_id": rols_add, + "inplace": rols_rem, + "signoff_session_and_assign": signoff_session_and_assign, + "AccessToken": access_token, + "RefreshToken": refresh_token, + } + response = requests.post(url, headers=headers, params=params, json=data) + return response.json() From 669e7abb9b12d31d6784bb4df455a71a6481c983 Mon Sep 17 00:00:00 2001 From: moonlightnexus Date: Fri, 5 Apr 2024 23:19:35 +0530 Subject: [PATCH 4/5] Added `return_class` parameter to `attach_role`, `remove_role`, and `update_role` methods in `AuthLiteClient` class to allow returning a class instance instead of a dictionary. Added `SignOffSessionReplace` class to represent the response from these methods when `return_class` is True. Raised `ParseError` when `signoff_session_and_assign` is True but `refresh_token` or `access_token` is not provided. --- trustauthx/authlite.py | 79 ++++++++++++++++++++++++++++++++++++++---- trustauthx/scheme.py | 10 ++++++ 2 files changed, 83 insertions(+), 6 deletions(-) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 2796f51..c81387c 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -912,7 +912,26 @@ def _re_init_roles(self) -> _Roles: ) return self.Roles - def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, refresh_token=None, access_token=None): + def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, + refresh_token=None, access_token=None, + return_class:bool=False) -> dict|SignOffSessionReplace: + """ + Attaches a role to a user. + + Args: + uid (str): The user ID to attach the role to. + rol_ids (str | list): The ID(s) of the role(s) to attach. + signoff_session_and_assign (bool, optional): Whether to sign off the session and assign. Default is False. + refresh_token (str, optional): The refresh token for authentication. + access_token (str, optional): The access token for authentication. + return_class (bool, optional): Whether to return a class instance. Default is False. + + Returns: + dict | SignOffSessionReplace: The response from the API, or a class instance if return_class is True. + + Raises: + ParseError: If signoff_session_and_assign is True but refresh_token or access_token is not provided. + """ if signoff_session_and_assign: if not refresh_token or not access_token: raise ParseError( @@ -941,9 +960,31 @@ def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals "RefreshToken": refresh_token, } response = requests.post(url, headers=headers, params=params, json=data) - return response.json() + if signoff_session_and_assign: return response.json() + else: + if return_class: return SignOffSessionReplace(response.json()) + else: return SignOffSessionReplace(response.json()).to_dict() + + def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, + refresh_token=None, access_token=None, + return_class:bool=False) -> dict|SignOffSessionReplace: + """ + Removes a role from a user. - def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, refresh_token=None, access_token=None): + Args: + uid (str): The user ID to remove the role from. + rol_ids (str | list): The ID(s) of the role(s) to remove. + signoff_session_and_assign (bool, optional): Whether to sign off the session and assign. Default is False. + refresh_token (str, optional): The refresh token for authentication. + access_token (str, optional): The access token for authentication. + return_class (bool, optional): Whether to return a class instance. Default is False. + + Returns: + dict | SignOffSessionReplace: The response from the API, or a class instance if return_class is True. + + Raises: + ParseError: If signoff_session_and_assign is True but refresh_token or access_token is not provided. + """ if signoff_session_and_assign: if not refresh_token or not access_token: raise ParseError( @@ -972,9 +1013,32 @@ def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals "RefreshToken": refresh_token, } response = requests.post(url, headers=headers, params=params, json=data) - return response.json() + if signoff_session_and_assign: return response.json() + else: + if return_class: return SignOffSessionReplace(response.json()) + else: return SignOffSessionReplace(response.json()).to_dict() + + def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|list, + signoff_session_and_assign=False, refresh_token=None, access_token=None, + return_class:bool=False) -> dict|SignOffSessionReplace: + """ + Updates a user's roles by adding and/or removing roles. + + Args: + uid (str): The user ID to update roles for. + rol_ids_to_add (str | list): The ID(s) of the role(s) to add. + rol_ids_to_remove (str | list): The ID(s) of the role(s) to remove. + signoff_session_and_assign (bool, optional): Whether to sign off the session and assign. Default is False. + refresh_token (str, optional): The refresh token for authentication. + access_token (str, optional): The access token for authentication. + return_class (bool, optional): Whether to return a class instance. Default is False. + + Returns: + dict | SignOffSessionReplace: The response from the API, or a class instance if return_class is True. - def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|list, signoff_session_and_assign=False, refresh_token=None, access_token=None): + Raises: + ParseError: If signoff_session_and_assign is True but refresh_token or access_token is not provided. + """ if signoff_session_and_assign: if not refresh_token or not access_token: raise ParseError( @@ -1007,4 +1071,7 @@ def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|li "RefreshToken": refresh_token, } response = requests.post(url, headers=headers, params=params, json=data) - return response.json() + if signoff_session_and_assign: return response.json() + else: + if return_class: return SignOffSessionReplace(response.json()) + else: return SignOffSessionReplace(response.json()).to_dict() \ No newline at end of file diff --git a/trustauthx/scheme.py b/trustauthx/scheme.py index 547161c..d872c21 100644 --- a/trustauthx/scheme.py +++ b/trustauthx/scheme.py @@ -91,6 +91,16 @@ class User: def to_dict(self): return asdict(self) + +@dataclass +class SignOffSessionReplace: + uid: str + access_token: str + refresh_token: str + role: List[str] + + def to_dict(self): + return asdict(self) """# Demo data demo_get_all_roles_response = GetAllRolesResponse(roles=[ From 1ee2756955196914f4e48d8a77855160d92e24c8 Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Fri, 5 Apr 2024 17:53:45 +0000 Subject: [PATCH 5/5] style: format code with Autopep8, Black, isort and Yapf This commit fixes the style issues introduced in 669e7ab according to the output from Autopep8, Black, isort and Yapf. Details: https://github.com/One-Click-Auth/TrustAuthx-Py-SDK/pull/55 --- trustauthx/authlite.py | 150 ++++++++++++++++++++++++++--------------- trustauthx/scheme.py | 25 +++++-- 2 files changed, 118 insertions(+), 57 deletions(-) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index c81387c..3252ee1 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -1,9 +1,9 @@ import json -from lib2to3.pgen2.parse import ParseError import sqlite3 import threading from dataclasses import asdict from functools import wraps +from lib2to3.pgen2.parse import ParseError import requests from jose import JWTError, jwt @@ -506,6 +506,7 @@ def delete_permission( self.reinitialize_all(foreground) return response.json() + class AuthLiteClient: instances = [] """ @@ -686,7 +687,7 @@ def re_auth(self, code): ) ) - def get_user(self, token, return_class=False) -> User|dict: + def get_user(self, token, return_class=False) -> User | dict: """ Validates the given authentication token and returns user data. @@ -714,13 +715,16 @@ def get_user(self, token, return_class=False) -> User|dict: rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] - if not return_class:return User(rtn).to_dict() - else: return User(rtn) - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + if not return_class: + return User(rtn).to_dict() + else: + return User(rtn) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user_data(self, AccessToken) -> dict: """ @@ -912,9 +916,15 @@ def _re_init_roles(self) -> _Roles: ) return self.Roles - def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, - refresh_token=None, access_token=None, - return_class:bool=False) -> dict|SignOffSessionReplace: + def attach_role( + self, + uid: str, + rol_ids: str | list, + signoff_session_and_assign=False, + refresh_token=None, + access_token=None, + return_class: bool = False, + ) -> dict | SignOffSessionReplace: """ Attaches a role to a user. @@ -933,10 +943,10 @@ def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals ParseError: If signoff_session_and_assign is True but refresh_token or access_token is not provided. """ if signoff_session_and_assign: - if not refresh_token or not access_token: + if not refresh_token or not access_token: raise ParseError( "must parse refresh_token and access_token if signoff_session_and_assign is true" - ) + ) url = f"{self.API_BASE_URL}/rbac/assign/permission" headers = { "accept": "application/json", @@ -945,12 +955,15 @@ def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals params = { "org_id": self.org_id, "api_key": self._api_key, - "signed_key": self._signed_key, + "signed_key": self._signed_key, } rols = [] - if isinstance(rol_ids) == str: rols.append(rol_ids) - elif isinstance(rol_ids) == list: rols = [i for i in rol_ids] - else:raise TypeError() + if isinstance(rol_ids) == str: + rols.append(rol_ids) + elif isinstance(rol_ids) == list: + rols = [i for i in rol_ids] + else: + raise TypeError() data = { "uid": uid, "rol_id": rol_ids, @@ -960,14 +973,23 @@ def attach_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals "RefreshToken": refresh_token, } response = requests.post(url, headers=headers, params=params, json=data) - if signoff_session_and_assign: return response.json() - else: - if return_class: return SignOffSessionReplace(response.json()) - else: return SignOffSessionReplace(response.json()).to_dict() - - def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=False, - refresh_token=None, access_token=None, - return_class:bool=False) -> dict|SignOffSessionReplace: + if signoff_session_and_assign: + return response.json() + else: + if return_class: + return SignOffSessionReplace(response.json()) + else: + return SignOffSessionReplace(response.json()).to_dict() + + def remove_role( + self, + uid: str, + rol_ids: str | list, + signoff_session_and_assign=False, + refresh_token=None, + access_token=None, + return_class: bool = False, + ) -> dict | SignOffSessionReplace: """ Removes a role from a user. @@ -986,10 +1008,10 @@ def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals ParseError: If signoff_session_and_assign is True but refresh_token or access_token is not provided. """ if signoff_session_and_assign: - if not refresh_token or not access_token: + if not refresh_token or not access_token: raise ParseError( "must parse refresh_token and access_token if signoff_session_and_assign is true" - ) + ) url = f"{self.API_BASE_URL}/rbac/assign/permission" headers = { "accept": "application/json", @@ -998,12 +1020,15 @@ def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals params = { "org_id": self.org_id, "api_key": self._api_key, - "signed_key": self._signed_key, + "signed_key": self._signed_key, } rols = [] - if isinstance(rol_ids) == str: rols.append(rol_ids) - elif isinstance(rol_ids) == list: rols = [i for i in rol_ids] - else:raise TypeError() + if isinstance(rol_ids) == str: + rols.append(rol_ids) + elif isinstance(rol_ids) == list: + rols = [i for i in rol_ids] + else: + raise TypeError() data = { "uid": uid, "rol_id": [], @@ -1013,14 +1038,24 @@ def remove_role(self, uid:str, rol_ids:str|list, signoff_session_and_assign=Fals "RefreshToken": refresh_token, } response = requests.post(url, headers=headers, params=params, json=data) - if signoff_session_and_assign: return response.json() - else: - if return_class: return SignOffSessionReplace(response.json()) - else: return SignOffSessionReplace(response.json()).to_dict() - - def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|list, - signoff_session_and_assign=False, refresh_token=None, access_token=None, - return_class:bool=False) -> dict|SignOffSessionReplace: + if signoff_session_and_assign: + return response.json() + else: + if return_class: + return SignOffSessionReplace(response.json()) + else: + return SignOffSessionReplace(response.json()).to_dict() + + def update_role( + self, + uid: str, + rol_ids_to_add: str | list, + rol_ids_to_remove: str | list, + signoff_session_and_assign=False, + refresh_token=None, + access_token=None, + return_class: bool = False, + ) -> dict | SignOffSessionReplace: """ Updates a user's roles by adding and/or removing roles. @@ -1040,10 +1075,10 @@ def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|li ParseError: If signoff_session_and_assign is True but refresh_token or access_token is not provided. """ if signoff_session_and_assign: - if not refresh_token or not access_token: + if not refresh_token or not access_token: raise ParseError( "must parse refresh_token and access_token if signoff_session_and_assign is true" - ) + ) url = f"{self.API_BASE_URL}/rbac/assign/permission" headers = { "accept": "application/json", @@ -1052,16 +1087,22 @@ def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|li params = { "org_id": self.org_id, "api_key": self._api_key, - "signed_key": self._signed_key, + "signed_key": self._signed_key, } rols_add = [] - if isinstance(rol_ids_to_add) == str: rols_add.append(rol_ids_to_add) - elif isinstance(rol_ids_to_add) == list: rols_add = [i for i in rol_ids_to_add] - else:raise TypeError() + if isinstance(rol_ids_to_add) == str: + rols_add.append(rol_ids_to_add) + elif isinstance(rol_ids_to_add) == list: + rols_add = [i for i in rol_ids_to_add] + else: + raise TypeError() rols_rem = [] - if isinstance(rol_ids_to_remove) == str: rols_rem.append(rol_ids_to_remove) - elif isinstance(rol_ids_to_remove) == list: rols_rem = [i for i in rol_ids_to_remove] - else:raise TypeError() + if isinstance(rol_ids_to_remove) == str: + rols_rem.append(rol_ids_to_remove) + elif isinstance(rol_ids_to_remove) == list: + rols_rem = [i for i in rol_ids_to_remove] + else: + raise TypeError() data = { "uid": uid, "rol_id": rols_add, @@ -1071,7 +1112,10 @@ def update_role(self, uid:str, rol_ids_to_add:str|list, rol_ids_to_remove:str|li "RefreshToken": refresh_token, } response = requests.post(url, headers=headers, params=params, json=data) - if signoff_session_and_assign: return response.json() - else: - if return_class: return SignOffSessionReplace(response.json()) - else: return SignOffSessionReplace(response.json()).to_dict() \ No newline at end of file + if signoff_session_and_assign: + return response.json() + else: + if return_class: + return SignOffSessionReplace(response.json()) + else: + return SignOffSessionReplace(response.json()).to_dict() diff --git a/trustauthx/scheme.py b/trustauthx/scheme.py index d872c21..f3b73c5 100644 --- a/trustauthx/scheme.py +++ b/trustauthx/scheme.py @@ -1,17 +1,21 @@ -from dataclasses import dataclass, asdict -from typing import List, Dict, Union +from dataclasses import asdict, dataclass +from typing import Dict, List, Union + @dataclass class Permission: """ A class representing a permission object. """ + def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) - + def to_dict(self): return asdict(self) + + @dataclass class Role: """ @@ -23,6 +27,7 @@ class Role: name (str): The name of the role. permissions (List[Permission]): A list of permissions associated with the role. """ + org_id: str rol_id: str name: str @@ -30,6 +35,8 @@ class Role: def to_dict(self): return asdict(self) + + @dataclass class GetAllRolesResponse: roles_list: List[Role] @@ -37,6 +44,8 @@ class GetAllRolesResponse: def to_dict(self): return asdict(self) + + @dataclass class AddRoleResponse: org_id: str @@ -46,6 +55,8 @@ class AddRoleResponse: def to_dict(self): return asdict(self) + + @dataclass class DeleteRoleResponse: org_id: str @@ -55,6 +66,8 @@ class DeleteRoleResponse: def to_dict(self): return asdict(self) + + @dataclass class AddPermissionResponse: org_id: str @@ -64,6 +77,7 @@ class AddPermissionResponse: def to_dict(self): return asdict(self) + @dataclass class DeletePermissionResponse: org_id: str @@ -73,6 +87,7 @@ class DeletePermissionResponse: def to_dict(self): return asdict(self) + @dataclass class User: iss: str @@ -91,7 +106,8 @@ class User: def to_dict(self): return asdict(self) - + + @dataclass class SignOffSessionReplace: uid: str @@ -102,6 +118,7 @@ class SignOffSessionReplace: def to_dict(self): return asdict(self) + """# Demo data demo_get_all_roles_response = GetAllRolesResponse(roles=[ {