Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patches and Updates #368

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ jobs:
jupyter nbconvert --to notebook --execute docs/tutorials/processing_configuration.ipynb
jupyter nbconvert --to notebook --execute docs/tutorials/process_cas04_multiple_station.ipynb
jupyter nbconvert --to notebook --execute docs/tutorials/synthetic_data_processing.ipynb
jupyter nbconvert --to notebook --execute tests/test_run_on_commit.ipynb
# Replace "notebook.ipynb" with your notebook's filename

# - name: Commit changes (if any)
Expand Down
48 changes: 26 additions & 22 deletions aurora/pipelines/fourier_coefficients.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
# Imports
# =============================================================================

import mt_metadata.timeseries.time_period
import mth5.mth5
import pathlib

Expand All @@ -71,6 +70,7 @@
from loguru import logger
from mth5.mth5 import MTH5
from mth5.utils.helpers import path_or_mth5_object
from mt_metadata.timeseries.time_period import TimePeriod
from mt_metadata.transfer_functions.processing.fourier_coefficients import (
Decimation as FCDecimation,
)
Expand All @@ -82,10 +82,10 @@

def fc_decimations_creator(
initial_sample_rate: float,
decimation_factors: Optional[Union[list, None]] = None,
decimation_factors: Optional[list] = None,
max_levels: Optional[int] = 6,
time_period: mt_metadata.timeseries.TimePeriod = None,
) -> list:
time_period: Optional[TimePeriod] = None,
) -> List[FCDecimation]:
"""

Creates mt_metadata FCDecimation objects that parameterize Fourier coefficient decimation levels.
Expand All @@ -97,11 +97,12 @@ def fc_decimations_creator(
----------
initial_sample_rate: float
Sample rate of the "level0" data -- usually the sample rate during field acquisition.
decimation_factors: list (or other iterable)
decimation_factors: Optional[list]
The decimation factors that will be applied at each FC decimation level
max_levels: int
max_levels: Optional[int]
The maximum number of decimation levels to allow
time_period:
time_period: Optional[TimePeriod]
Provides the start and end times

Returns
-------
Expand Down Expand Up @@ -137,7 +138,7 @@ def fc_decimations_creator(
fc_dec.sample_rate_decimation = current_sample_rate

if time_period:
if isinstance(time_period, mt_metadata.timeseries.time_period.TimePeriod):
if isinstance(time_period, TimePeriod):
fc_dec.time_period = time_period
else:
msg = (
Expand All @@ -152,26 +153,26 @@ def fc_decimations_creator(


@path_or_mth5_object
def add_fcs_to_mth5(
m: MTH5, fc_decimations: Optional[Union[list, None]] = None
) -> None:
def add_fcs_to_mth5(m: MTH5, fc_decimations: Optional[Union[str, list]] = None) -> None:
"""
Add Fourier Coefficient Levels ot an existing MTH5.

**Notes:**

- This module computes the FCs differently than the legacy aurora pipeline. It uses scipy.signal.spectrogram. There is a test in Aurora to confirm that there are equivalent if we are not using fancy pre-whitening.
- This module computes the FCs differently than the legacy aurora pipeline. It uses scipy.signal.spectrogram.
There is a test in Aurora to confirm that there are equivalent if we are not using fancy pre-whitening.

- Nomenclature: "usssr_grouper" is the output of a group-by on unique {survey, station, sample_rate} tuples.

Parameters
----------
m: MTH5 object
The mth5 file, open in append mode.
fc_decimations: Union[str, None, List]
fc_decimations: Optional[Union[str, list]]
This specifies the scheme to use for decimating the time series when building the FC layer.
None: Just use default (something like four decimation levels, decimated by 4 each time say.
String: Controlled Vocabulary, values are a work in progress, that will allow custom definition of the fc_decimations for some common cases. For example, say you have stored already decimated time
None: Just use default (something like four decimation levels, decimated by 4 each time say.)
String: Controlled Vocabulary, values are a work in progress, that will allow custom definition of
the fc_decimations for some common cases. For example, say you have stored already decimated time
series, then you want simply the zeroth decimation for each run, because the decimated time series live
under another run container, and that will get its own FCs. This is experimental.
List: (**UNTESTED**) -- This means that the user thought about the decimations that they want to create and is
Expand Down Expand Up @@ -263,7 +264,7 @@ def get_degenerate_fc_decimation(sample_rate: float) -> list:

Makes a default fc_decimation list. WIP
This "degnerate" config will only operate on the first decimation level.
This is useful for testing but could be used in future if an MTH5 stored time series in decimation
This is useful for testing. It could also be used in future on an MTH5 stored time series in decimation
levels already as separate runs.

Parameters
Expand All @@ -274,7 +275,8 @@ def get_degenerate_fc_decimation(sample_rate: float) -> list:
Returns
-------
output: list
List has only one element which is of type mt_metadata.transfer_functions.processing.fourier_coefficients.Decimation.
List has only one element which is of type FCDecimation, aka.
mt_metadata.transfer_functions.processing.fourier_coefficients.Decimation.
"""
output = fc_decimations_creator(
sample_rate,
Expand All @@ -287,17 +289,19 @@ def get_degenerate_fc_decimation(sample_rate: float) -> list:


@path_or_mth5_object
def read_back_fcs(m: Union[MTH5, pathlib.Path, str], mode="r"):
def read_back_fcs(m: Union[MTH5, pathlib.Path, str], mode: str = "r") -> None:
"""
This is mostly a helper function for tests. It was used as a sanity check while debugging the FC files, and

This is a helper function for tests. It was used as a sanity check while debugging the FC files, and
also is a good example for how to access the data at each level for each channel.

The Time axis of the FC array will change from level to level, but the frequency axis will stay the same shape
(for now -- storing all fcs by default)

Args:
m: pathlib.Path, str or an MTH5 object
The path to an h5 file that we will scan the fcs from
Parameters
----------
m: Union[MTH5, pathlib.Path, str]
Either a path to an mth5, or an MTH5 object that the FCs will be read back from.


"""
Expand Down
64 changes: 25 additions & 39 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@
# =============================================================================


def make_stft_objects(
processing_config, i_dec_level, run_obj, run_xrds, units="MT"
):
def make_stft_objects(processing_config, i_dec_level, run_obj, run_xrds, units="MT"):
"""
Operates on a "per-run" basis. Applies STFT to all time series in the input run.

Expand Down Expand Up @@ -103,9 +101,7 @@ def make_stft_objects(
].channel_scale_factors
elif run_obj.station_metadata.id == processing_config.stations.remote[0].id:
scale_factors = (
processing_config.stations.remote[0]
.run_dict[run_id]
.channel_scale_factors
processing_config.stations.remote[0].run_dict[run_id].channel_scale_factors
)

stft_obj = calibrate_stft_obj(
Expand Down Expand Up @@ -152,9 +148,7 @@ def process_tf_decimation_level(
The transfer function values packed into an object
"""
frequency_bands = config.decimations[i_dec_level].frequency_bands_obj()
transfer_function_obj = TTFZ(
i_dec_level, frequency_bands, processing_config=config
)
transfer_function_obj = TTFZ(i_dec_level, frequency_bands, processing_config=config)
dec_level_config = config.decimations[i_dec_level]
# segment_weights = coherence_weights(dec_level_config, local_stft_obj, remote_stft_obj)
transfer_function_obj = process_transfer_functions(
Expand Down Expand Up @@ -183,9 +177,7 @@ def triage_issue_289(local_stfts: list, remote_stfts: list):
for i_chunk in range(n_chunks):
ok = local_stfts[i_chunk].time.shape == remote_stfts[i_chunk].time.shape
if not ok:
logger.warning(
"Mismatch in FC array lengths detected -- Issue #289"
)
logger.warning("Mismatch in FC array lengths detected -- Issue #289")
glb = max(
local_stfts[i_chunk].time.min(),
remote_stfts[i_chunk].time.min(),
Expand All @@ -196,18 +188,13 @@ def triage_issue_289(local_stfts: list, remote_stfts: list):
)
cond1 = local_stfts[i_chunk].time >= glb
cond2 = local_stfts[i_chunk].time <= lub
local_stfts[i_chunk] = local_stfts[i_chunk].where(
cond1 & cond2, drop=True
)
local_stfts[i_chunk] = local_stfts[i_chunk].where(cond1 & cond2, drop=True)
cond1 = remote_stfts[i_chunk].time >= glb
cond2 = remote_stfts[i_chunk].time <= lub
remote_stfts[i_chunk] = remote_stfts[i_chunk].where(
cond1 & cond2, drop=True
)
assert (
local_stfts[i_chunk].time.shape
== remote_stfts[i_chunk].time.shape
)
assert local_stfts[i_chunk].time.shape == remote_stfts[i_chunk].time.shape
return local_stfts, remote_stfts


Expand Down Expand Up @@ -306,9 +293,7 @@ def load_stft_obj_from_mth5(
An STFT from mth5.
"""
station_obj = station_obj_from_row(row)
fc_group = station_obj.fourier_coefficients_group.get_fc_group(
run_obj.metadata.id
)
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_obj.metadata.id)
fc_decimation_level = fc_group.get_decimation_level(f"{i_dec_level}")
stft_obj = fc_decimation_level.to_xarray(channels=channels)

Expand Down Expand Up @@ -369,10 +354,7 @@ def save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj) -> None:
raise NotImplementedError(msg)

# Get FC group (create if needed)
if (
run_obj.metadata.id
in station_obj.fourier_coefficients_group.groups_list
):
if run_obj.metadata.id in station_obj.fourier_coefficients_group.groups_list:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(
run_obj.metadata.id
)
Expand All @@ -393,9 +375,7 @@ def save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj) -> None:
dec_level_name,
decimation_level_metadata=decimation_level_metadata,
)
fc_decimation_level.from_xarray(
stft_obj, decimation_level_metadata.sample_rate
)
fc_decimation_level.from_xarray(stft_obj, decimation_level_metadata.sample_rate)
fc_decimation_level.update_metadata()
fc_group.update_metadata()
else:
Expand Down Expand Up @@ -535,9 +515,7 @@ def process_mth5_legacy(
local_merged_stft_obj,
remote_merged_stft_obj,
)
ttfz_obj.apparent_resistivity(
tfk.config.channel_nomenclature, units=units
)
ttfz_obj.apparent_resistivity(tfk.config.channel_nomenclature, units=units)
tf_dict[i_dec_level] = ttfz_obj

if show_plot:
Expand All @@ -549,10 +527,20 @@ def process_mth5_legacy(
tf_dict=tf_dict, processing_config=tfk.config
)

tf_cls = tfk.export_tf_collection(tf_collection)

if z_file_path:
tf_cls.write(z_file_path)
try:
tf_cls = tfk.export_tf_collection(tf_collection)
if z_file_path:
tf_cls.write(z_file_path)
except Exception as e:
msg = "TF collection could not export to mt_metadata TransferFunction\n"
msg += f"Failed with exception {e}\n"
msg += "Perhaps an unconventional mixture of input/output channels was used\n"
msg += f"Input channels were {tfk.config.decimations[0].input_channels}\n"
msg += f"Output channels were {tfk.config.decimations[0].output_channels}\n"
msg += "No z-file will be written in this case\n"
msg += "Will return a legacy TransferFunctionCollection object, not mt_metadata object."
logger.error(msg)
return_collection = True

tfk.dataset.close_mth5s()
if return_collection:
Expand Down Expand Up @@ -602,9 +590,7 @@ def process_mth5(
The transfer function object
"""
if processing_type not in SUPPORTED_PROCESSINGS:
raise NotImplementedError(
f"Processing type {processing_type} not supported"
)
raise NotImplementedError(f"Processing type {processing_type} not supported")

if processing_type == "legacy":
try:
Expand Down
Loading
Loading