Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MARC fetcher and mappers #674

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dags/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ opensearch-py
requests
sickle
python-dotenv
beautifulsoup4
lxml
pymarc
MarkupSafe
apache-airflow-providers-docker
apache-airflow-providers-google
apache-airflow-providers-amazon
6 changes: 6 additions & 0 deletions dags/utils_by_mapper_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def map_endpoint_task(endpoint, fetched_versions, params=None):
mapper_job_results = map_endpoint(endpoint, fetched_versions, limit)
mapped_versions = {}
for mapper_job_result in mapper_job_results:
if not mapper_job_result.get('mapped_page_paths'):
logging.warning(
f"{mapper_job_result['collection_id']:<6}: "
"no mapped pages written"
)
continue
mapped_version = get_version(
mapper_job_result['collection_id'],
mapper_job_result['mapped_page_paths'][0]
Expand Down
4 changes: 3 additions & 1 deletion metadata_fetcher/fetch_registry_collections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import logging
import sys
import traceback

import requests

Expand Down Expand Up @@ -75,7 +76,8 @@ def fetch_endpoint(url, limit=None, job_logger=logger):
print(f"ERROR fetching collection { collection_id }: {e}")
results[collection_id] = {
'status': 'error',
'error_message': e
'error_message': e,
'traceback': traceback.format_exc()
}
continue

Expand Down
8 changes: 7 additions & 1 deletion metadata_fetcher/fetchers/Fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from requests.adapters import HTTPAdapter, Retry
from rikolti.utils.versions import put_vernacular_page

import time

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,8 +52,14 @@ def fetch_page(self):
f"[{self.collection_id}]: fetching page {self.write_page} "
f"at {page.get('url')}"
)

try:
response = requests.get(**page)
if response.status_code == 503:
# TIND sometimes throws a 503 error, so we'll sleep & try again;
# 28011 was most notorious
time.sleep(1)
response = requests.get(**page)
response.raise_for_status()
except requests.exceptions.HTTPError:
raise FetchError(
Expand Down
23 changes: 23 additions & 0 deletions metadata_fetcher/fetchers/marc_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import requests

from .Fetcher import Fetcher
import json
import pymarc


class MarcFetcher(Fetcher):
def __init__(self, params: dict[str]):
super(MarcFetcher, self).__init__(params)
self.url = params.get("harvest_data").get("url")

def build_fetch_request(self) -> dict[str]:
return {"url": self.url}

def check_page(self, http_resp: requests.Response) -> int:
return sum(1 for _ in pymarc.MARCReader(http_resp.content,
to_unicode=True,
utf8_handling="replace"))

def json(self) -> str:
return json.dumps({"finished": True})

1 change: 1 addition & 0 deletions metadata_fetcher/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ requests
sickle
python-dotenv
beautifulsoup4
pymarc
220 changes: 220 additions & 0 deletions metadata_mapper/mappers/marc/marc_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
from ..oai.oai_mapper import OaiVernacular
from ..mapper import Record

from typing import Callable
import re
from itertools import chain


class MarcRecord(Record):
def UCLDC_map(self):
return {
}

def get_marc_control_field(self, field_tag: str, index: int = None) -> list:
"""

See: https://www.loc.gov/marc/bibliographic/bd00x.html

Get MARC control field. Returns an empty string if:
* Control field isn't set
* No value exists at the requested index
Otherwise it returns a value

:param field_tag: Field tag to retrieve.
:param index: A specific index to fetch
:return: List of values for the control fields.
"""

# Don't let any data tags sneak in! They have subfields.
data_field_tag = field_tag if field_tag.isnumeric() and int(
field_tag) < 100 else ""

values = [v[0].value() for (k, v)
in self.marc_tags_as_dict([data_field_tag]).items()
if len(v) > 0]

if not values:
return ""

value = values[0]

if index and len(value) > index + 1:
return value[index]

if index:
return ""

return value

def get_marc_data_fields(self, field_tags: list, subfield_codes=[], recurse=True,
**kwargs) -> list:
"""
TODO: Variable name meaning becomes quite fuzzy in the heart of this
function. Most variables could stand to be renamed.

`Data fields` is not a specific term in MARC. This function really will accept
any field tags. See https://www.loc.gov/marc/bibliographic/ for all fields.

In most cases, this returns the Cartesian product of the provided `field_tags`
and `subfield codes`. If `recurse` is true, it will augment to include values
from field 880. Note the special handling of code `6`.

Set the `exclude_subfields` kwarg to exclude the specified subfield_codes.

Set the `process_value` kwarg to pass the value through your own code to
do transformations based on the field tag, code and value. There isn't an
example of this in use currently, but it could be useful for debugging or
for context-based transformations.

:param recurse: Indicates whether alternate graphic representations (field 880)
should be sought. This is used here to prevent infinite loops
when this function is called to get field 880. It would also be
possible (and maybe preferable) to remove this argument and set
a `recurse` variable to false if "880" is included among
`field_tags`.
:param field_tags: A list of MARC fields.
:param subfield_codes: A list of subfield codes to filter the values. If empty,
all subfields will be included.
:return: A list of values of the specified subfields.
"""
def subfield_matches(check_code: str, subfield_codes: list,
exclude_subfields: bool) -> bool:
"""
:param check_code: The code to check against the subfield codes.
:param subfield_codes: A list of subfield codes to include / exclude
:param exclude_subfields: A boolean value indicating whether to exclude the
specified subfield codes.
:return: A boolean value indicating whether the check_code is included or
excluded based on the subfield_codes and exclude_subfields parameters.
"""

# Always exclude subfield 6 (Linkage,
# see: https://www.loc.gov/marc/bibliographic/ecbdcntf.html) unless it is
# explicitly listed. Not excluding this was producing results that
# were not expected. Note the explicit inclusion of 6 in
# `get_alternate_graphic_representation()`, below.
if check_code == "6" and "6" not in subfield_codes:
return False
if not subfield_codes:
return True
if exclude_subfields:
return check_code not in subfield_codes
else:
return check_code in subfield_codes

def get_alternate_graphic_representation(tag: str, code: str, index: int,
recurse=True) -> list:
"""
This is where field 880 is handled.
See: https://www.loc.gov/marc/bibliographic/bd880.html

:param tag:
:param code:
:param index:
:param recurse:
:return:
"""
if not recurse:
return []

subfield_6 = self.get_marc_data_fields([tag], ["6"], False)
if not subfield_6 or index >= len(subfield_6):
return []

match = re.match(r"^880\-([0-9]+)$", subfield_6[index])
if not match:
return []

all_880 = self.marc_tags_as_dict(["880"])["880"]
index_880 = int(match.group(1)) - 1 # 880 indices start at 1

if not all_880 or index_880 >= len(all_880):
return []

field = all_880[index_880]
subfields = field.subfields_as_dict()

if code not in subfields:
return []

return subfields[code]

if "process_value" in kwargs and isinstance(kwargs["process_value"], Callable):
process_value = kwargs["process_value"]
else:
process_value = None

exclude_subfields = "exclude_subfields" in kwargs and kwargs[
"exclude_subfields"]

# Do we want process_value to have access to the 880 field values as well?
# If so, call process_value with value + the output of
# get_alternate_graphic_representation
value_list = [[(process_value(value, field_tag, subfield[0])
if process_value else value)] +
get_alternate_graphic_representation(field_tag, subfield[0], field_index, recurse)

# Iterate the fields that have tags matching those requested
for (field_tag, matching_fields) in
self.marc_tags_as_dict(field_tags).items()

# Iterate the individual matches, tracking order in index
for field_index, matching_field in enumerate(matching_fields)

# Iterate the subfield codes in those fields
for subfield in list(matching_field.subfields_as_dict().items())

# Iterate the values in those subfields
for value in subfield[1]
if

# Ensure we're including only requested subfields
subfield_matches(subfield[0], subfield_codes, exclude_subfields)]

# Dedupe the output
deduped_values = []
[deduped_values.append(value) for value in value_list
if value not in deduped_values]

# Flatten the output
flattened_values = list(chain.from_iterable(deduped_values)) if (
isinstance(deduped_values, list)) else []

return flattened_values

def marc_tags_as_dict(self, field_tags: list) -> dict:
"""
Get the specified MARC fields from the source_metadata, mapping by field tag

:param field_tags: List of MARC fields to retrieve.
:return: List of MARC fields from the source_metadata.
"""
return {field_tag: self.source_metadata.get("marc").get_fields(field_tag) for
field_tag in field_tags}

def get_marc_leader(self, leader_key: str):
"""
Retrieve the value of specified leader key from the MARC metadata.
See: https://www.loc.gov/marc/bibliographic/bdleader.html

We're not accommodating passing a slice, which pymarc can handle should it be necessary

:param leader_key: The key of the leader field to retrieve.
:type leader_key: str
:return: The value of the specified leader key.
:rtype: str or None
"""
leader = self.source_metadata.get("marc").leader

if str(leader_key).isnumeric():
return leader[int(leader_key)]

if hasattr(leader, leader_key):
return leader.getattr(leader_key, "")

return ""


class MarcVernacular(OaiVernacular):
pass
Loading
Loading