-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MUR SST Kerchunk Recipe #259
Draft
sbquinlan
wants to merge
38
commits into
pangeo-forge:master
Choose a base branch
from
developmentseed:mursst-kerchunk
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 3 commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
8e77f78
MUR SST Kerchunk Recipe
sbquinlan 37000c6
Use filepattern for URLs and replace custom methods with WriteCombine…
abarciauskas-bgse 1f0218b
Fix linting
abarciauskas-bgse fe4f047
Make protocol an environment variable
abarciauskas-bgse ca7c666
Use protocol envionement variable
abarciauskas-bgse 7897490
Linting
abarciauskas-bgse b82e52b
Update recipe.py
abarciauskas-bgse e90707b
Update recipe.py
abarciauskas-bgse 32b7c48
Update recipe.py
abarciauskas-bgse 40e2885
Update recipe.py
abarciauskas-bgse 26af6c7
Update recipe.py
abarciauskas-bgse df6ec0e
Update recipe.py
abarciauskas-bgse 00f5992
Update recipe.py
abarciauskas-bgse 31d6b9e
Update recipe.py
abarciauskas-bgse 01f9ced
Update transforms
abarciauskas-bgse ece4650
Comment out failing transform
abarciauskas-bgse b4e2fbb
Add missing strings
abarciauskas-bgse 02d80c2
Update requirements.txt
abarciauskas-bgse d270f9b
Remove consolidate metadata
abarciauskas-bgse b56af98
Update recipe.py
abarciauskas-bgse 293c453
Update recipe.py
abarciauskas-bgse 78bb384
comment out validate
abarciauskas-bgse 7204f67
Update recipe.py
abarciauskas-bgse 30cad0a
Update recipe.py
abarciauskas-bgse bc075dc
Update recipe.py
abarciauskas-bgse 88394b4
Update recipe.py
abarciauskas-bgse f881b30
Update recipe.py
abarciauskas-bgse bd4a3d6
Update recipe.py
abarciauskas-bgse b97b227
Update recipe.py
abarciauskas-bgse 271d320
Update recipe.py
abarciauskas-bgse 2c411b0
Add requirements, recipe with scripts for running w/o runner
abarciauskas-bgse f62d830
Fix merge conflicts
abarciauskas-bgse 36f2e71
Remove code for running without runner
abarciauskas-bgse bc3f2df
Fix fsspec_open_kwargs
abarciauskas-bgse b5e1b18
remove validation
46faa71
Merge pull request #4 from developmentseed/mursst-kerchunk-gcorradini
abarciauskas-bgse a4da5b6
Update recipe.py
abarciauskas-bgse 644c753
Update recipe.py
abarciauskas-bgse File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
title: 'GHRSST Level 4 MUR Global Foundation Sea Surface Temperature Analysis (v4.1)' | ||
description: 'A Group for High Resolution Sea Surface Temperature (GHRSST) Level 4 sea surface temperature analysis produced as a retrospective dataset (four day latency) and near-real-time dataset (one day latency) at the JPL Physical Oceanography DAAC using wavelets as basis functions in an optimal interpolation approach on a global 0.01 degree grid' | ||
pangeo_forge_version: '0.9.2' | ||
recipes: | ||
- id: MUR-JPL-L4-GLOB-v4.1 | ||
object: 'recipe:recipe' | ||
provenance: | ||
providers: | ||
- name: 'NASA JPL PO.DAAC' | ||
description: 'Physical Oceanography Distributed Active Archive Center' | ||
roles: | ||
- producer | ||
- licensor | ||
url: https://podaac.jpl.nasa.gov/dataset/MUR-JPL-L4-GLOB-v4.1 | ||
license: 'Open Data' | ||
maintainers: | ||
- name: 'Development Seed' | ||
github: developmentseed | ||
bakery: | ||
id: 'pangeo-ldeo-nsf-earthcube' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
import base64 | ||
import json | ||
import os | ||
from dataclasses import dataclass, field | ||
from typing import Dict, Set | ||
|
||
import apache_beam as beam | ||
import pandas as pd | ||
import requests | ||
import xarray as xr | ||
import zarr | ||
from requests.auth import HTTPBasicAuth | ||
|
||
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern | ||
from pangeo_forge_recipes.transforms import OpenWithKerchunk, WriteCombinedReference | ||
|
||
HTTP_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#' | ||
S3_REL = 'http://esipfed.org/ns/fedsearch/1.1/s3#' | ||
|
||
ED_USERNAME = os.environ['EARTHDATA_USERNAME'] | ||
ED_PASSWORD = os.environ['EARTHDATA_PASSWORD'] | ||
|
||
CREDENTIALS_API = 'https://archive.podaac.earthdata.nasa.gov/s3credentials' | ||
SHORT_NAME = 'MUR-JPL-L4-GLOB-v4.1' | ||
CONCAT_DIMS = ['time'] | ||
IDENTICAL_DIMS = ['lat', 'lon'] | ||
SELECTED_VARS = ['analysed_sst', 'analysis_error', 'mask', 'sea_ice_fraction'] | ||
|
||
# use HTTP_REL if S3 access is not possible. S3_REL is faster. | ||
selected_rel = HTTP_REL # S3_REL | ||
|
||
dates = [ | ||
d.to_pydatetime().strftime('%Y%m%d') | ||
for d in pd.date_range('2002-06-01', '2002-06-30', freq='D') | ||
] | ||
|
||
|
||
def make_filename(time): | ||
if selected_rel == HTTP_REL: | ||
base_url = ( | ||
f'https://archive.podaac.earthdata.nasa.gov/podaac-ops-cumulus-protected/{SHORT_NAME}/' | ||
) | ||
else: | ||
base_url = f's3://podaac-ops-cumulus-protected/{SHORT_NAME}/' | ||
# example file: "/20020601090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc" | ||
return f'{base_url}{time}090000-JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1.nc' | ||
|
||
|
||
concat_dim = ConcatDim('time', dates, nitems_per_file=1) | ||
pattern = FilePattern(make_filename, concat_dim) | ||
|
||
|
||
def get_earthdata_token(username, password): | ||
# URL for the Earthdata login endpoint | ||
login_url = 'https://urs.earthdata.nasa.gov/api/users/token' | ||
auth = HTTPBasicAuth(username, password) | ||
|
||
# Request a new token | ||
response = requests.get(f'{login_url}s', auth=auth) | ||
|
||
# Check if the request was successful | ||
if response.status_code == 200: | ||
if len(response.json()) == 0: | ||
# create new token | ||
response = requests.post(login_url, auth=auth) | ||
if response.status_code == 200: | ||
token = response.json()['access_token'] | ||
else: | ||
raise Exception('Error: Unable to generate Earthdata token.') | ||
else: | ||
# Token is usually in the response's JSON data | ||
token = response.json()[0]['access_token'] | ||
return token | ||
else: | ||
raise Exception('Error: Unable to retrieve Earthdata token.') | ||
|
||
|
||
def get_s3_creds(username, password, credentials_api=CREDENTIALS_API): | ||
login_resp = requests.get(CREDENTIALS_API, allow_redirects=False) | ||
login_resp.raise_for_status() | ||
|
||
encoded_auth = base64.b64encode(f'{username}:{password}'.encode('ascii')) | ||
auth_redirect = requests.post( | ||
login_resp.headers['location'], | ||
data={'credentials': encoded_auth}, | ||
headers={'Origin': credentials_api}, | ||
allow_redirects=False, | ||
) | ||
auth_redirect.raise_for_status() | ||
|
||
final = requests.get(auth_redirect.headers['location'], allow_redirects=False) | ||
|
||
results = requests.get(CREDENTIALS_API, cookies={'accessToken': final.cookies['accessToken']}) | ||
results.raise_for_status() | ||
|
||
creds = json.loads(results.content) | ||
return { | ||
'key': creds['accessKeyId'], | ||
'secret': creds['secretAccessKey'], | ||
'token': creds['sessionToken'], | ||
'anon': False, | ||
} | ||
|
||
|
||
def earthdata_auth(username: str, password: str): | ||
if selected_rel == S3_REL: | ||
return get_s3_creds(username, password) | ||
else: | ||
token = get_earthdata_token(username, password) | ||
return {'headers': {'Authorization': f'Bearer {token}'}} | ||
|
||
|
||
fsspec_open_kwargs = earthdata_auth(ED_USERNAME, ED_PASSWORD) | ||
|
||
|
||
@dataclass | ||
class FilterVars(beam.PTransform): | ||
"""Filter kerchunk variables by name.""" | ||
|
||
keep: Set[str] = field(default_factory=set) | ||
|
||
@staticmethod | ||
def _filter(item: list[dict], keep: Set[str]) -> list[dict]: | ||
filtered = [] | ||
for translated in item: | ||
# see Kerchunk docs for dropping a variable example | ||
translated['refs'] = { | ||
k: v | ||
for k, v in translated['refs'].items() | ||
if '/' not in k or k.split('/')[0] in keep | ||
} | ||
filtered.append(translated) | ||
return filtered | ||
|
||
def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ||
return pcoll | beam.Map(self._filter, keep=self.keep) | ||
|
||
|
||
# Remove method when https://github.com/pangeo-forge/pangeo-forge-recipes/pull/556/files is merged. | ||
@beam.ptransform_fn | ||
def ConsolidateMetadata(pcoll: beam.PCollection) -> beam.PCollection: | ||
"""Consolidate metadata into a single .zmetadata field. | ||
See zarr.consolidate_metadata() for details. | ||
""" | ||
|
||
def _consolidate(store: zarr.storage.FSStore) -> zarr.storage.FSStore: | ||
zarr.consolidate_metadata(store, path=None) | ||
return store | ||
|
||
return pcoll | beam.Map(_consolidate) | ||
|
||
|
||
@dataclass | ||
class ValidateDatasetDimensions(beam.PTransform): | ||
"""Open the reference.json in xarray and validate dimensions.""" | ||
|
||
expected_dims: Dict = field(default_factory=dict) | ||
|
||
@staticmethod | ||
def _validate(zarr_store: zarr.storage.FSStore, expected_dims: Dict) -> None: | ||
ds = xr.open_dataset(zarr_store, engine='zarr') | ||
if set(ds.dims) != expected_dims.keys(): | ||
raise ValueError(f'Expected dimensions {expected_dims.keys()}, got {ds.dims}') | ||
for dim, bounds in expected_dims.items(): | ||
if bounds is None: | ||
continue | ||
lo, hi = bounds | ||
actual_lo, actual_hi = round(ds[dim].data.min()), round(ds[dim].data.max()) | ||
if actual_lo != lo or actual_hi != hi: | ||
raise ValueError(f'Expected {dim} range [{lo}, {hi}], got {actual_lo, actual_hi}') | ||
return ds | ||
|
||
def expand( | ||
self, | ||
pcoll: beam.PCollection, | ||
) -> beam.PCollection: | ||
return pcoll | beam.Map(self._validate, expected_dims=self.expected_dims) | ||
|
||
|
||
recipe = ( | ||
beam.Create(pattern.items()) | ||
| OpenWithKerchunk( | ||
remote_protocol='s3' if selected_rel == S3_REL else 'https', | ||
file_type=pattern.file_type, | ||
# lat/lon are around 5k, this is the best option for forcing kerchunk to inline them | ||
inline_threshold=6000, | ||
storage_options=fsspec_open_kwargs, | ||
) | ||
| FilterVars(keep={*pattern.concat_dims, *IDENTICAL_DIMS, *SELECTED_VARS}) | ||
| WriteCombinedReference( | ||
concat_dims=CONCAT_DIMS, | ||
identical_dims=IDENTICAL_DIMS, | ||
store_name=SHORT_NAME, | ||
target_options=fsspec_open_kwargs, | ||
remote_options=fsspec_open_kwargs, | ||
remote_protocol='s3' if selected_rel == S3_REL else 'https', | ||
) | ||
| ConsolidateMetadata() | ||
| ValidateDatasetDimensions(expected_dims={'time': None, 'lat': (-90, 90), 'lon': (-180, 180)}) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
s3fs==2023.10.0 | ||
boto3==1.28.71 | ||
python-cmr==0.9.0 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This depends on the main branch of pangeo-forge-recipes