Skip to content

Commit

Permalink
Refactoring code profiler transform to new pythonic code layout
Browse files Browse the repository at this point in the history
Signed-off-by: Pankaj Thorat <[email protected]>
  • Loading branch information
pankajskku committed Jan 4, 2025
1 parent 6a06d87 commit 258a39d
Show file tree
Hide file tree
Showing 834 changed files with 3,819 additions and 2,931 deletions.
18 changes: 5 additions & 13 deletions data-connector-lib/src/dpk_connector/core/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
import threading
from typing import Any, Callable, Collection, Type, cast

from dpk_connector.core.utils import validate_domain, validate_url
from scrapy import Spider
from scrapy.crawler import Crawler, CrawlerRunner
from scrapy.settings import Settings
from twisted.internet.defer import Deferred

from dpk_connector.core.utils import validate_domain, validate_url

_lock = threading.Lock()
_reactor_initialized = False
Expand Down Expand Up @@ -61,9 +61,7 @@ def _create_crawler(self, spidercls: str | type[Spider]) -> Crawler:
with _lock:
global _reactor_initialized
init_reactor = not _reactor_initialized
crawler = Crawler(
cast(Type[Spider], spidercls), self.settings, init_reactor
)
crawler = Crawler(cast(Type[Spider], spidercls), self.settings, init_reactor)
_reactor_initialized = True
return crawler

Expand Down Expand Up @@ -140,19 +138,15 @@ def async_crawl(
if concurrent_requests < 1:
raise ValueError(f"Invalid concurrent requests {concurrent_requests}")
if concurrent_requests_per_domain < 1:
raise ValueError(
f"Invalid concurrent reuqests per domain {concurrent_requests_per_domain}"
)
raise ValueError(f"Invalid concurrent reuqests per domain {concurrent_requests_per_domain}")
if download_delay < 0:
raise ValueError(f"Invalid download delay {download_delay}")
if download_timeout < 0:
raise ValueError(f"Invalid donwload timeout {download_timeout}")
if autothrottle_max_delay < 0:
raise ValueError(f"Invalid autothrottle max delay {autothrottle_max_delay}")
if autothrottle_target_concurrency < 1:
raise ValueError(
f"Invalid autothrottle target concurrency {autothrottle_target_concurrency}"
)
raise ValueError(f"Invalid autothrottle target concurrency {autothrottle_target_concurrency}")
if robots_max_crawl_delay < 0:
raise ValueError(f"Invalid robots max crawl delay {robots_max_crawl_delay}")

Expand All @@ -178,9 +172,7 @@ def async_crawl(
priority="spider",
)
settings.set("DOWNLOAD_DELAY", download_delay, priority="spider")
settings.set(
"RANDOMIZE_DOWNLOAD_DELAY", randomize_download_delay, priority="spider"
)
settings.set("RANDOMIZE_DOWNLOAD_DELAY", randomize_download_delay, priority="spider")
settings.set("DOWNLOAD_TIMEOUT", download_timeout, priority="spider")
settings.set("AUTOTHROTTLE_ENABLED", autothrottle_enabled, priority="spider")
settings.set("AUTOTHROTTLE_MAX_DELAY", autothrottle_max_delay, priority="spider")
Expand Down
37 changes: 14 additions & 23 deletions data-connector-lib/src/dpk_connector/core/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
import logging
from typing import Any, Generator, Iterable

from dpk_connector.core.item import ConnectorItem
from dpk_connector.core.utils import (
get_content_type,
get_etld1,
get_mime_type,
get_netloc,
)
from scrapy import Spider, signals
from scrapy.crawler import Crawler
from scrapy.downloadermiddlewares.robotstxt import RobotsTxtMiddleware
Expand All @@ -26,8 +33,6 @@
from scrapy.utils.python import to_unicode
from twisted.internet.defer import Deferred

from dpk_connector.core.item import ConnectorItem
from dpk_connector.core.utils import get_content_type, get_etld1, get_mime_type, get_netloc

logger = logging.getLogger(__name__)

Expand All @@ -48,9 +53,7 @@ def delay(self, user_agent: str | bytes) -> float | None:
if crawl_delay is None and request_rate is None:
return None
crawl_delay = crawl_delay or 0
request_rate = (
request_rate.seconds / request_rate.requests if request_rate else 0
)
request_rate = request_rate.seconds / request_rate.requests if request_rate else 0
delay = min(max(crawl_delay, request_rate), self.max_delay)
return delay

Expand All @@ -64,9 +67,7 @@ def __init__(self, crawler: Crawler, download_timeout: float):
super().__init__(crawler)
self.download_timeout = download_timeout
self._delays: dict[str, float] = {}
crawler.signals.connect(
self._request_reached_downloader, signal=signals.request_reached_downloader
)
crawler.signals.connect(self._request_reached_downloader, signal=signals.request_reached_downloader)

@classmethod
def from_crawler(cls, crawler: Crawler):
Expand All @@ -86,25 +87,19 @@ def _request_reached_downloader(self, request: Request, spider: Spider) -> None:
slot.delay = delay
slot.randomize_delay = False

def process_request_2(
self, rp: RobotParser, request: Request, spider: Spider
) -> None:
def process_request_2(self, rp: RobotParser, request: Request, spider: Spider) -> None:
super().process_request_2(rp, request, spider)
if isinstance(rp, DelayingProtegoRobotParser):
parts = urlparse_cached(request)
domain = parts.netloc
if domain not in self._delays:
user_agent = self._robotstxt_useragent
if not user_agent:
user_agent = request.headers.get(
b"User-Agent", self._default_useragent
)
user_agent = request.headers.get(b"User-Agent", self._default_useragent)
delay = rp.delay(user_agent) or 0.0
self._delays[domain] = delay
if delay:
logger.info(
f"Set download delay to {delay} according to robots.txt. domain: {domain}"
)
logger.info(f"Set download delay to {delay} according to robots.txt. domain: {domain}")

def robot_parser(self, request: Request, spider: Spider):
url = urlparse_cached(request)
Expand Down Expand Up @@ -207,19 +202,15 @@ def process_request(self, request: Request, spider: Spider):
super().process_request(request, spider)
prefix = "dpk_connector/requested"
if not request.meta.get("system_request", False):
_update_request_stats(
self.stats, request, spider, prefix, self.skip_domains
)
_update_request_stats(self.stats, request, spider, prefix, self.skip_domains)
if request.meta.get("sitemap", False):
_update_sitemap_stats(self.stats, spider, prefix)

def process_response(self, request: Request, response: Response, spider: Spider):
ret = super().process_response(request, response, spider)
prefix = "dpk_connector/accessed"
if not request.meta.get("system_request", False):
_update_stats(
self.stats, request, response, spider, prefix, self.skip_domains
)
_update_stats(self.stats, request, response, spider, prefix, self.skip_domains)
if request.meta.get("sitemap", False):
_update_sitemap_stats(self.stats, spider, prefix)
return ret
Expand Down
4 changes: 2 additions & 2 deletions data-connector-lib/src/dpk_connector/core/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
################################################################################

from typing import Any

from dpk_connector.core.item import ConnectorItem
from scrapy import Spider
from scrapy.crawler import Crawler
from scrapy.exceptions import DropItem

from dpk_connector.core.item import ConnectorItem


class DropPipeline:
@classmethod
Expand Down
67 changes: 20 additions & 47 deletions data-connector-lib/src/dpk_connector/core/spiders/sitemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@
from typing import Any, Callable, Collection, Generator
from urllib.parse import ParseResult

from scrapy import Request
from scrapy.http import Response
from scrapy.link import Link
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import SitemapSpider
from scrapy.spiders.sitemap import iterloc
from scrapy.utils.sitemap import Sitemap, sitemap_urls_from_robots

from dpk_connector.core.item import ConnectorItem
from dpk_connector.core.utils import (
get_base_url,
Expand All @@ -32,6 +24,13 @@
is_allowed_path,
urlparse_cached,
)
from scrapy import Request
from scrapy.http import Response
from scrapy.link import Link
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import SitemapSpider
from scrapy.spiders.sitemap import iterloc
from scrapy.utils.sitemap import Sitemap, sitemap_urls_from_robots


class BaseSitemapSpider(SitemapSpider):
Expand Down Expand Up @@ -99,13 +98,9 @@ def __init__(
self.allowed_domains.add(fqdn)
else:
self.allowed_domains = set(get_etld1(url) for url in seed_urls)
self.allow_mime_types = set(
[m.lower() for m in allow_mime_types] if len(allow_mime_types) > 0 else ()
)
self.allow_mime_types = set([m.lower() for m in allow_mime_types] if len(allow_mime_types) > 0 else ())
self.disallow_mime_types = set(
[m.lower() for m in disallow_mime_types]
if len(disallow_mime_types) > 0
else ()
[m.lower() for m in disallow_mime_types] if len(disallow_mime_types) > 0 else ()
)

# Link extraction from html
Expand Down Expand Up @@ -161,15 +156,11 @@ def start_requests(self):
)

def _parse_sitemap(self, response: Response):
yield ConnectorItem(
dropped=False, downloaded=False, system_request=True, sitemap=True
)
yield ConnectorItem(dropped=False, downloaded=False, system_request=True, sitemap=True)

seed_url = response.meta["seed_url"]

if response.url.endswith("/robots.txt") or response.url.endswith(
"/robots.txt/"
):
if response.url.endswith("/robots.txt") or response.url.endswith("/robots.txt/"):
for url in sitemap_urls_from_robots(response.text, base_url=response.url):
yield Request(
url,
Expand Down Expand Up @@ -197,9 +188,7 @@ def _parse_sitemap(self, response: Response):

if s.type == "sitemapindex":
for loc in iterloc(it, self.sitemap_alternate_links):
if any(
x.search(loc) for x in self._follow
) and self._is_allowed_path(loc):
if any(x.search(loc) for x in self._follow) and self._is_allowed_path(loc):
yield Request(
loc,
callback=self._parse_sitemap,
Expand Down Expand Up @@ -244,9 +233,7 @@ def _should_download(self, content_type: str | None) -> bool:
return not self._is_disallowed_content_type(ctype)
if not self.disallow_mime_types:
return self._is_allowed_content_type(ctype)
return (
not self._is_disallowed_content_type(ctype)
) and self._is_allowed_content_type(ctype)
return (not self._is_disallowed_content_type(ctype)) and self._is_allowed_content_type(ctype)

def _explore_sitemap(self, response: Response) -> Generator[Request, Any, None]:
depth = response.meta.get("depth", 0)
Expand All @@ -255,9 +242,7 @@ def _explore_sitemap(self, response: Response) -> Generator[Request, Any, None]:
parts = urlparse_cached(response)
domain = parts.netloc
if domain not in self.sitemaps_seen:
self.log(
f"New domain {domain} found. Search for sitemap.", logging.INFO
)
self.log(f"New domain {domain} found. Search for sitemap.", logging.INFO)
self.sitemaps_seen.add(domain)
for sitemap in self._get_sitemap_urls(parts):
yield Request(
Expand All @@ -272,9 +257,7 @@ def _explore_sitemap(self, response: Response) -> Generator[Request, Any, None]:
},
)

def _explore_links(
self, response: Response, links: list[Link]
) -> Generator[Request, Any, None]:
def _explore_links(self, response: Response, links: list[Link]) -> Generator[Request, Any, None]:
depth = response.meta.get("depth", 0)
depth_limit = self.depth_limit
if depth_limit == 0 or depth < depth_limit:
Expand Down Expand Up @@ -303,9 +286,7 @@ def __init__(

self.callback = callback

def parse(
self, response: Response, **kwargs: Any
) -> Generator[Request | ConnectorItem, Any, None]:
def parse(self, response: Response, **kwargs: Any) -> Generator[Request | ConnectorItem, Any, None]:
drop = False
content_type = get_content_type(response)
if not content_type:
Expand All @@ -315,24 +296,16 @@ def parse(
if not (is_html or should_download):
drop = True
if drop:
yield ConnectorItem(
dropped=True, downloaded=False, system_request=False, sitemap=False
)
yield ConnectorItem(dropped=True, downloaded=False, system_request=False, sitemap=False)
return

# Download contents
if should_download:
self.callback(
str(response.url), response.body, response.headers.to_unicode_dict()
)
self.callback(str(response.url), response.body, response.headers.to_unicode_dict())
# to count up downloaded pages and collect stats
yield ConnectorItem(
dropped=False, downloaded=True, system_request=False, sitemap=False
)
yield ConnectorItem(dropped=False, downloaded=True, system_request=False, sitemap=False)
else:
yield ConnectorItem(
dropped=False, downloaded=False, system_request=False, sitemap=False
)
yield ConnectorItem(dropped=False, downloaded=False, system_request=False, sitemap=False)

# Search for sitemap
yield from self._explore_sitemap(response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def list_folders(self, key: str) -> tuple[list[str], int]:
:param key: complete folder
:return: list of folders within a given folder and number of retries
"""

def _get_sub_folders(bck: str, p: str) -> tuple[list[str], int]:
sub_folders = []
# use paginator
Expand All @@ -113,6 +114,7 @@ def _get_sub_folders(bck: str, p: str) -> tuple[list[str], int]:
internal_retries += r
sub_folders.extend(sf)
return sub_folders, internal_retries

bucket, prefix = self._get_bucket_key(key)
subs, retries = _get_sub_folders(bck=bucket, p=prefix)
return [f"{bucket}/{f}" for f in subs], retries
Expand Down
Loading

0 comments on commit 258a39d

Please sign in to comment.