Skip to content

Commit

Permalink
Add type annotations. Drop unused functions and re-exports.
Browse files Browse the repository at this point in the history
  • Loading branch information
nsoranzo committed Oct 18, 2023
1 parent 85587c0 commit 1dbf66f
Showing 1 changed file with 26 additions and 87 deletions.
113 changes: 26 additions & 87 deletions lib/tool_shed/util/shed_util_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import os
import socket
import string
from typing import TYPE_CHECKING
from typing import (
List,
TYPE_CHECKING,
)

from sqlalchemy import (
false,
Expand All @@ -15,15 +18,7 @@

from galaxy import util
from galaxy.tool_shed.util.shed_util_common import (
can_eliminate_repository_dependency,
clean_dependency_relationships,
generate_tool_guid,
get_ctx_rev,
get_next_prior_import_or_install_required_dict_entry,
get_tool_panel_config_tool_path_install_dir,
get_user,
have_shed_tool_conf_for_install,
set_image_paths,
)
from galaxy.util import (
checkers,
Expand All @@ -39,6 +34,7 @@

if TYPE_CHECKING:
from tool_shed.structured_app import ToolShedApp
from tool_shed.webapp.model import Repository

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,16 +114,7 @@ def get_category_by_name(app: "ToolShedApp", name: str):
return sa_session.scalars(stmt).first()


def get_repository_categories(app, id):
"""Get categories of a repository on the tool shed side from the database via id"""
sa_session = app.model.session
stmt = select(app.model.RepositoryCategoryAssociation).where(
app.model.RepositoryCategoryAssociation.repository_id == app.security.decode_id(id)
)
return sa_session.scalars(stmt).all()


def get_repository_file_contents(app, file_path, repository_id, is_admin=False):
def get_repository_file_contents(app: "ToolShedApp", file_path: str, repository_id: str, is_admin: bool = False) -> str:
"""Return the display-safe contents of a repository file for display in a browser."""
safe_str = ""
if not _is_path_browsable(app, file_path, repository_id, is_admin):
Expand Down Expand Up @@ -188,7 +175,7 @@ def get_repository_files(folder_path):
return contents


def get_repository_from_refresh_on_change(app, **kwd):
def get_repository_from_refresh_on_change(app: "ToolShedApp", **kwd):
# The changeset_revision_select_field in several grids performs a refresh_on_change which sends in request parameters like
# changeset_revison_1, changeset_revision_2, etc. One of the many select fields on the grid performed the refresh_on_change,
# so we loop through all of the received values to see which value is not the repository tip. If we find it, we know the
Expand All @@ -206,52 +193,14 @@ def get_repository_from_refresh_on_change(app, **kwd):
return v, None


def get_repository_type_from_tool_shed(app, tool_shed_url, name, owner):
"""
Send a request to the tool shed to retrieve the type for a repository defined by the
combination of a name and owner.
"""
tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed_url)
params = dict(name=name, owner=owner)
pathspec = ["repository", "get_repository_type"]
repository_type = util.url_get(
tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params
)
return repository_type


def get_tool_dependency_definition_metadata_from_tool_shed(app, tool_shed_url, name, owner):
"""
Send a request to the tool shed to retrieve the current metadata for a
repository of type tool_dependency_definition defined by the combination
of a name and owner.
"""
tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed_url)
params = dict(name=name, owner=owner)
pathspec = ["repository", "get_tool_dependency_definition_metadata"]
metadata = util.url_get(
tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params
)
return metadata


def get_tool_path_by_shed_tool_conf_filename(app, shed_tool_conf):
"""
Return the tool_path config setting for the received shed_tool_conf file by searching the tool box's in-memory list of shed_tool_confs for the
dictionary whose config_filename key has a value matching the received shed_tool_conf.
"""
for shed_tool_conf_dict in app.toolbox.dynamic_confs(include_migrated_tool_conf=True):
config_filename = shed_tool_conf_dict["config_filename"]
if config_filename == shed_tool_conf:
return shed_tool_conf_dict["tool_path"]
else:
file_name = basic_util.strip_path(config_filename)
if file_name == shed_tool_conf:
return shed_tool_conf_dict["tool_path"]
return None


def handle_email_alerts(app, host, repository, content_alert_str="", new_repo_alert=False, admin_only=False):
def handle_email_alerts(
app: "ToolShedApp",
host: str,
repository: "Repository",
content_alert_str: str = "",
new_repo_alert: bool = False,
admin_only: bool = False,
) -> None:
"""
There are 2 complementary features that enable a tool shed user to receive email notification:
Expand Down Expand Up @@ -353,7 +302,7 @@ def handle_email_alerts(app, host, repository, content_alert_str="", new_repo_al
log.exception("An error occurred sending a tool shed repository update alert by email.")


def _is_path_browsable(app, path, repository_id, is_admin=False):
def _is_path_browsable(app: "ToolShedApp", path: str, repository_id: str, is_admin: bool = False) -> bool:
"""
Detects whether the given path is browsable i.e. is within the
allowed repository folders. Admins can additionaly browse folders
Expand All @@ -364,7 +313,7 @@ def _is_path_browsable(app, path, repository_id, is_admin=False):
return is_path_within_repo(app, path, repository_id)


def is_path_within_dependency_dir(app, path):
def is_path_within_dependency_dir(app: "ToolShedApp", path: str) -> bool:
"""
Detect whether the given path is within the tool_dependency_dir folder on the disk.
(Specified by the config option). Use to filter malicious symlinks targeting outside paths.
Expand All @@ -378,7 +327,7 @@ def is_path_within_dependency_dir(app, path):
return allowed


def is_path_within_repo(app, path, repository_id):
def is_path_within_repo(app: "ToolShedApp", path: str, repository_id: str) -> bool:
"""
Detect whether the given path is within the repository folder on the disk.
Use to filter malicious symlinks targeting outside paths.
Expand All @@ -388,7 +337,9 @@ def is_path_within_repo(app, path, repository_id):
return os.path.commonprefix([repo_path, resolved_path]) == repo_path


def open_repository_files_folder(app, folder_path, repository_id, is_admin=False):
def open_repository_files_folder(
app: "ToolShedApp", folder_path: str, repository_id: str, is_admin: bool = False
) -> List:
"""
Return a list of dictionaries, each of which contains information for a file or directory contained
within a directory in a repository file hierarchy.
Expand Down Expand Up @@ -437,33 +388,21 @@ def tool_shed_is_this_tool_shed(toolshed_base_url, trans=None):
return cleaned_toolshed_base_url == cleaned_tool_shed


def get_users_with_repo_alert(session: scoped_session, user_model):
stmt = select(user_model).where(user_model.deleted == false()).where(user_model.new_repo_alert == true())
return session.scalars(stmt)


__all__ = (
"can_eliminate_repository_dependency",
"clean_dependency_relationships",
"count_repositories_in_category",
"generate_tool_guid",
"get_categories",
"get_category",
"get_category_by_name",
"get_ctx_rev",
"get_next_prior_import_or_install_required_dict_entry",
"get_repository_categories",
"get_repository_file_contents",
"get_repository_type_from_tool_shed",
"get_tool_dependency_definition_metadata_from_tool_shed",
"get_tool_panel_config_tool_path_install_dir",
"get_tool_path_by_shed_tool_conf_filename",
"get_user",
"handle_email_alerts",
"have_shed_tool_conf_for_install",
"is_path_within_dependency_dir",
"is_path_within_repo",
"open_repository_files_folder",
"set_image_paths",
"tool_shed_is_this_tool_shed",
)


def get_users_with_repo_alert(session: scoped_session, user_model):
stmt = select(user_model).where(user_model.deleted == false()).where(user_model.new_repo_alert == true())
return session.scalars(stmt)

0 comments on commit 1dbf66f

Please sign in to comment.