Skip to content

Commit

Permalink
MUR SST Kerchunk Recipe
Browse files Browse the repository at this point in the history
  • Loading branch information
sbquinlan committed Nov 16, 2023
1 parent 6c899ad commit 8e77f78
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 0 deletions.
20 changes: 20 additions & 0 deletions recipes/mursst/meta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
title: 'GHRSST Level 4 MUR Global Foundation Sea Surface Temperature Analysis (v4.1)'
description: 'A Group for High Resolution Sea Surface Temperature (GHRSST) Level 4 sea surface temperature analysis produced as a retrospective dataset (four day latency) and near-real-time dataset (one day latency) at the JPL Physical Oceanography DAAC using wavelets as basis functions in an optimal interpolation approach on a global 0.01 degree grid'
pangeo_forge_version: '0.9.2'
recipes:
- id: MUR-JPL-L4-GLOB-v4.1
object: 'recipe:recipe'
provenance:
providers:
- name: 'NASA JPL PO.DAAC'
description: 'Physical Oceanography Distributed Active Archive Center'
roles:
- producer
- licensor
url: https://podaac.jpl.nasa.gov/dataset/MUR-JPL-L4-GLOB-v4.1
license: 'Open Data'
maintainers:
- name: 'Development Seed'
github: developmentseed
bakery:
id: 'pangeo-ldeo-nsf-earthcube'
248 changes: 248 additions & 0 deletions recipes/mursst/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import base64
import json
import os
from dataclasses import dataclass, field
from typing import Dict, Set, Union

import apache_beam as beam
import requests
from cmr import GranuleQuery
from kerchunk.combine import MultiZarrToZarr
from xarray import Dataset

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.storage import FSSpecTarget
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
RequiredAtRuntimeDefault,
WriteCombinedReference,
)
from pangeo_forge_recipes.writers import ZarrWriterMixin

HTTP_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#'
S3_REL = 'http://esipfed.org/ns/fedsearch/1.1/s3#'

# This recipe requires the following environment variables from Earthdata
ED_TOKEN = os.environ['EARTHDATA_TOKEN']
ED_USERNAME = os.environ['EARTHDATA_USERNAME']
ED_PASSWORD = os.environ['EARTHDATA_PASSWORD']

CREDENTIALS_API = 'https://archive.podaac.earthdata.nasa.gov/s3credentials'
SHORT_NAME = 'MUR-JPL-L4-GLOB-v4.1'
CONCAT_DIM = 'time'
IDENTICAL_DIMS = ['lat', 'lon']
SELECTED_VARS = ['analysed_sst', 'analysis_error', 'mask', 'sea_ice_fraction']

# use HTTP_REL if S3 access is not possible. S3_REL is faster.
selected_rel = S3_REL


def earthdata_auth(username: str, password: str):
login_resp = requests.get(CREDENTIALS_API, allow_redirects=False)
login_resp.raise_for_status()

encoded_auth = base64.b64encode(f'{username}:{password}'.encode('ascii'))
auth_redirect = requests.post(
login_resp.headers['location'],
data={'credentials': encoded_auth},
headers={'Origin': CREDENTIALS_API},
allow_redirects=False,
)
auth_redirect.raise_for_status()

final = requests.get(auth_redirect.headers['location'], allow_redirects=False)

results = requests.get(CREDENTIALS_API, cookies={'accessToken': final.cookies['accessToken']})
results.raise_for_status()

creds = json.loads(results.content)
return {
'aws_access_key_id': creds['accessKeyId'],
'aws_secret_access_key': creds['secretAccessKey'],
'aws_session_token': creds['sessionToken'],
}


def filter_data_links(links, rel):
return filter(
lambda link: link['rel'] == rel
and (link['href'].endswith('.nc') or link['href'].endswith('.nc4')),
links,
)


def gen_data_links(rel):
granules = GranuleQuery().short_name(SHORT_NAME).downloadable(True).get_all()
for granule in granules:
s3_links = filter_data_links(granule['links'], rel)
first = next(s3_links, None)
# throw if CMR does not have exactly one S3 link for an item
if not first or next(s3_links, None) is not None:
raise ValueError(f"Expected 1 link of type {rel} on {granule['title']}")
yield first['href']


@dataclass
class FilterVars(beam.PTransform):
"""Filter kerchunk variables by name."""

keep: Set[str] = field(default_factory=set)

@staticmethod
def _filter(item: list[dict], keep: Set[str]) -> list[dict]:
filtered = []
for translated in item:
# see Kerchunk docs for dropping a variable example
translated['refs'] = {
k: v
for k, v in translated['refs'].items()
if '/' not in k or k.split('/')[0] in keep
}
filtered.append(translated)
return filtered

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._filter, keep=self.keep)


@dataclass
class ConsolidateMetadata(beam.PTransform):
"""Consolidate metadata into a single .zmetadata file.
See zarr.consolidate_metadata() for details.
"""

storage_options: Dict = field(default_factory=dict)

@staticmethod
def _consolidate(mzz: MultiZarrToZarr, storage_options: Dict) -> Dict:
import fsspec
import zarr
from kerchunk.utils import consolidate

out = mzz.translate()
fs = fsspec.filesystem(
'reference',
fo=out,
remote_options=storage_options,
)
mapper = fs.get_mapper()
metadata_key = '.zmetadata'
zarr.consolidate_metadata(mapper, metadata_key=metadata_key)
double_consolidated = consolidate(dict([(metadata_key, mapper[metadata_key])]))
out['refs'] = out['refs'] | double_consolidated['refs']
return out

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._consolidate, storage_options=self.storage_options)


@dataclass
class WriteReferences(beam.PTransform, ZarrWriterMixin):
"""Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object.
:param store_name: Zarr store will be created with this name under ``target_root``.
:param output_file_name: Name to give the output references file
(``.json`` or ``.parquet`` suffix).
"""

store_name: str
output_file_name: str = 'reference.json'
target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)
storage_options: Dict = field(default_factory=dict)

@staticmethod
def _write(
refs: Dict, full_target: FSSpecTarget, output_file_name: str, storage_options: Dict
) -> Dataset:
import fsspec
import ujson
import xarray as xr

outpath = full_target._full_path(output_file_name)
with full_target.fs.open(outpath, 'wb') as f:
f.write(ujson.dumps(refs).encode())

fs = fsspec.filesystem(
'reference', fo=full_target._full_path(output_file_name), remote_options=storage_options
)
return xr.open_dataset(
fs.get_mapper(), engine='zarr', backend_kwargs={'consolidated': True}
)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(
self._write,
full_target=self.get_full_target(),
output_file_name=self.output_file_name,
storage_options=self.storage_options,
)


@dataclass
class ValidateDatasetDimensions(beam.PTransform):
"""Open the reference.json in xarray and validate dimensions."""

expected_dims: Dict = field(default_factory=dict)

@staticmethod
def _validate(ds: Dataset, expected_dims: Dict) -> None:
if set(ds.dims) != expected_dims.keys():
raise ValueError(f'Expected dimensions {expected_dims.keys()}, got {ds.dims}')
for dim, bounds in expected_dims.items():
if bounds is None:
continue
lo, hi = bounds
actual_lo, actual_hi = round(ds[dim].data.min()), round(ds[dim].data.max())
if actual_lo != lo or actual_hi != hi:
raise ValueError(f'Expected {dim} range [{lo}, {hi}], got {actual_lo, actual_hi}')
return ds

def expand(
self,
pcoll: beam.PCollection,
) -> beam.PCollection:
return pcoll | beam.Map(self._validate, expected_dims=self.expected_dims)


fsspec_auth_kwargs = (
{'headers': {'Authorization': f'Bearer {ED_TOKEN}'}}
if selected_rel == HTTP_REL
else {'client_kwargs': earthdata_auth(ED_USERNAME, ED_PASSWORD)}
)
pattern = pattern_from_file_sequence(
list(gen_data_links(selected_rel)), CONCAT_DIM, fsspec_open_kwargs=fsspec_auth_kwargs
)

# target_root is injected only into certain transforms in pangeo-forge-recipes
# this is a hacky way to pull it out of the WriteCombinedReference transform
hacky_way_to_pull = WriteCombinedReference(
store_name=SHORT_NAME,
concat_dims=pattern.concat_dims,
identical_dims=IDENTICAL_DIMS,
)
recipe = (
beam.Create(pattern.items())
| OpenWithKerchunk(
remote_protocol='s3' if selected_rel == S3_REL else 'https',
file_type=pattern.file_type,
# lat/lon are around 5k, this is the best option for forcing kerchunk to inline them
inline_threshold=6000,
storage_options=pattern.fsspec_open_kwargs,
)
| FilterVars(keep={*pattern.concat_dims, *IDENTICAL_DIMS, *SELECTED_VARS})
| CombineReferences(
concat_dims=pattern.concat_dims,
identical_dims=IDENTICAL_DIMS,
)
| ConsolidateMetadata(storage_options=pattern.fsspec_open_kwargs)
| WriteReferences(
store_name=SHORT_NAME,
target_root=hacky_way_to_pull.target_root,
storage_options=pattern.fsspec_open_kwargs,
)
| ValidateDatasetDimensions(expected_dims={'time': None, 'lat': (-90, 90), 'lon': (-180, 180)})
)
3 changes: 3 additions & 0 deletions recipes/mursst/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
s3fs==2023.10.0
boto3==1.28.71
python-cmr==0.9.0

0 comments on commit 8e77f78

Please sign in to comment.