Skip to content

Commit

Permalink
Settle on 'page_batch' and 'page_batches' terminology
Browse files Browse the repository at this point in the history
  • Loading branch information
amywieliczka committed Feb 14, 2024
1 parent 3c1c3b2 commit d833812
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 47 deletions.
36 changes: 19 additions & 17 deletions dags/harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ def merge_children(version):


@task()
def get_mapped_page_filenames_task(mapped_pages):
mapped_pages = [
[mapped['mapped_page_path'] for mapped in mapped_page_list
if mapped['mapped_page_path']]
for mapped_page_list in mapped_pages]
def get_mapped_page_filenames_task(mapped_page_batches: list[list[dict]]):
batches = []
for mapped_page_batch in mapped_page_batches:
batch = [
mapped_page['mapped_page_path'] for mapped_page in mapped_page_batch
if mapped_page['mapped_page_path']]
batches.append(json.dumps(batch))

return json.dumps(mapped_pages)
return batches


@dag(
Expand All @@ -104,24 +106,24 @@ def harvest():
collection = get_collection_metadata_task()

vernacular_version = create_vernacular_version_task(collection=fetchdata)
fetched_pages = fetch_collection_task(
fetched_page_batches = fetch_collection_task(
collection=fetchdata, vernacular_version=vernacular_version)
mapped_data_version = create_mapped_version_task(
collection=collection,
vernacular_pages=fetched_pages
vernacular_page_batches=fetched_page_batches
)
mapped_pages = (
mapped_status_batches = (
map_page_task
.partial(collection=collection, mapped_data_version=mapped_data_version)
.expand(vernacular_pages=fetched_pages)
.expand(vernacular_page_batch=fetched_page_batches)
)

mapping_status = get_mapping_status_task(collection, mapped_pages)
mapping_status = get_mapping_status_task(collection, mapped_status_batches)
validate_collection_task(collection['id'], mapping_status['mapped_page_paths'])
mapped_page_paths = get_mapped_page_filenames_task(mapped_pages)
mapped_page_batches = get_mapped_page_filenames_task(mapped_status_batches)

with_content_urls_version = create_with_content_urls_version_task(
collection, mapped_pages)
collection, mapped_page_batches)

content_harvest_task = (
ContentHarvestOperator
Expand All @@ -132,14 +134,14 @@ def harvest():
mapper_type=collection['rikolti_mapper_type']
)
.expand(
pages=mapped_page_paths
pages=mapped_page_batches
)
)

merged_parent_records = merge_children(with_content_urls_version)
content_harvest_task >> merged_parent_records
merged_pages = merge_children(with_content_urls_version)
content_harvest_task >> merged_pages

stage_index = create_stage_index_task(collection, merged_parent_records)
stage_index = create_stage_index_task(collection, merged_pages)
cleanup_failed_index_creation_task(stage_index)


Expand Down
21 changes: 13 additions & 8 deletions dags/mapper_dag.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
from datetime import datetime
from typing import Optional

Expand All @@ -9,22 +10,26 @@
from rikolti.dags.shared_tasks import map_page_task
from rikolti.dags.shared_tasks import get_mapping_status_task
from rikolti.dags.shared_tasks import validate_collection_task
from rikolti.dags.shared_tasks import paginate_filepaths_for_fanout
from rikolti.dags.shared_tasks import batched
from rikolti.utils.versions import get_most_recent_vernacular_version
from rikolti.utils.versions import get_most_recent_mapped_version
from rikolti.utils.versions import get_vernacular_pages
from rikolti.utils.versions import get_mapped_pages


@task()
def get_vernacular_pages_task(collection: dict, params: Optional[dict]=None):
def get_vernacular_page_batches_task(
collection: dict, params: Optional[dict]=None) -> list[list[str]]:
collection_id = collection['id']
vernacular_version = params.get('vernacular_version') if params else None
if not vernacular_version:
vernacular_version = get_most_recent_vernacular_version(collection_id)
pages = get_vernacular_pages(vernacular_version)
# TODO: split page_list into pages and children?
return paginate_filepaths_for_fanout(pages)

# 1024 is the maximum number of fanout tasks allowed
batch_size = math.ceil(len(pages) / 1024)
return batched(pages, batch_size)

@task()
def get_mapped_pages_task(params: Optional[dict] = None):
Expand Down Expand Up @@ -65,18 +70,18 @@ def get_mapped_pages_task(params: Optional[dict] = None):
)
def mapper_dag():
collection = get_collection_metadata_task()
page_list = get_vernacular_pages_task(collection=collection)
page_batches = get_vernacular_page_batches_task(collection=collection)
mapped_data_version = create_mapped_version_task(
collection=collection,
vernacular_pages=page_list
vernacular_page_batches=page_batches
)
mapped_pages = (
mapped_status_batches = (
map_page_task
.partial(collection=collection, mapped_data_version=mapped_data_version)
.expand(vernacular_pages=page_list)
.expand(vernacular_page_batch=page_batches)
)

mapping_status = get_mapping_status_task(collection, mapped_pages)
mapping_status = get_mapping_status_task(collection, mapped_status_batches)
validate_collection_task(collection['id'], mapping_status['mapped_page_paths'])

mapper_dag()
Expand Down
49 changes: 27 additions & 22 deletions dags/shared_tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import boto3
import pprint
import json
import os
import math
from datetime import datetime
Expand Down Expand Up @@ -42,13 +43,12 @@ def get_collection_fetchdata_task(params=None):
return resp.json()


def paginate_filepaths_for_fanout(filepaths):
# 1024 is the maximum number of fanout tasks allowed
page_size = math.ceil(len(filepaths) / 1024)
paginated_filepaths = []
for i in range(0, len(filepaths), page_size):
paginated_filepaths.append(filepaths[i:i+page_size])
return paginated_filepaths
# TODO: in python3.12 we can use itertools.batched
def batched(l, batch_size):
batches = []
for i in range(0, len(l), batch_size):
batches.append(l[i:i+batch_size])
return batches


@task()
Expand All @@ -58,7 +58,8 @@ def create_vernacular_version_task(collection) -> str:


@task()
def fetch_collection_task(collection: dict, vernacular_version: str):
def fetch_collection_task(
collection: dict, vernacular_version: str) -> list[list[str]]:
"""
returns a list of the filepaths of the vernacular metadata relative to the
collection id, ex: [
Expand Down Expand Up @@ -137,7 +138,9 @@ def flatten_stats(stats):
f"\n{pprint.pprint(fetch_status)}\n{stats['success']}\n{stats['filepaths']}"
)

return paginate_filepaths_for_fanout(stats['filepaths'])
# 1024 is the maximum number of fanout tasks allowed
batch_size = math.ceil(len(stats['filepaths']) / 1024)
return batched(stats['filepaths'], batch_size)


@task(multiple_outputs=True)
Expand All @@ -159,11 +162,11 @@ def get_collection_metadata_task(params=None):
# instances can be running at the same time, *across all DAG runs*
@task()
def map_page_task(
vernacular_pages: Union[str,list[str]],
vernacular_page_batch: Union[str,list[str]],
collection: dict,
mapped_data_version: str):
"""
vernacular_pages is a list of filepaths relative to the collection id, ex:
vernacular_page_batches is a list of filepaths relative to the collection id, ex:
[ 3433/vernacular_metadata_2023-01-01T00:00:00/data/1 ]
or:
[
Expand All @@ -184,15 +187,15 @@ def map_page_task(
return False

mapped_pages = []
for vernacular_page in vernacular_pages:
for vernacular_page in vernacular_page_batch:
mapped_page = map_page(
collection_id, vernacular_page, mapped_data_version, collection)
mapped_pages.append(mapped_page)
return mapped_pages


@task(multiple_outputs=True)
def get_mapping_status_task(collection: dict, paginated_mapped_pages: list):
def get_mapping_status_task(collection: dict, mapped_page_batches: list) -> dict:
"""
mapped_pages is a list of a list of dicts with the following keys:
status: success
Expand All @@ -208,15 +211,15 @@ def get_mapping_status_task(collection: dict, paginated_mapped_pages: list):
]
"""
mapped_pages = []
for pages in paginated_mapped_pages:
mapped_pages.extend(pages)
for batch in mapped_page_batches:
mapped_pages.extend(batch)
mapping_status = get_mapping_status(collection, mapped_pages)

return mapping_status


@task()
def create_mapped_version_task(collection, vernacular_pages):
def create_mapped_version_task(collection, vernacular_page_batches) -> str:
"""
vernacular pages is a list of lists of the filepaths of the vernacular
metadata relative to the collection id, ex: [
Expand All @@ -226,10 +229,12 @@ def create_mapped_version_task(collection, vernacular_pages):
returns the path to a new mapped version, ex:
"3433/vernacular_metadata_2023-01-01T00:00:00/mapped_metadata_2023-01-01T00:00:00/"
"""
vernacular_version = get_version(collection.get('id'), vernacular_pages[0][0])
vernacular_page_batch = vernacular_page_batches[0]
vernacular_page = vernacular_page_batch[0]
vernacular_version = get_version(collection.get('id'), vernacular_page)
if not vernacular_version:
raise ValueError(
f"Vernacular version not found in {vernacular_pages[0][0]}")
f"Vernacular version not found in {vernacular_page}")
mapped_data_version = create_mapped_version(vernacular_version)
return mapped_data_version

Expand Down Expand Up @@ -264,10 +269,10 @@ def validate_collection_task(collection_id: int, mapped_metadata_pages: dict) ->


@task()
def create_with_content_urls_version_task(collection: dict, mapped_pages: list[list[dict]]):
mapped_page_path = [page['mapped_page_path'] for page in mapped_pages[0]
if page['mapped_page_path']][0]
mapped_version = get_version(collection['id'], mapped_page_path)
def create_with_content_urls_version_task(
collection: dict, mapped_page_batches: list[str]):
mapped_page_batch = json.loads(mapped_page_batches[0])
mapped_version = get_version(collection['id'], mapped_page_batch[0])
return create_with_content_urls_version(mapped_version)


Expand Down

0 comments on commit d833812

Please sign in to comment.