Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add detrend function #188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions src/array_ops.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ extern "C" {
}

#include <boost/python.hpp>
#include <omp.h>

#include <gsl/gsl_spline.h>
#include <gsl/gsl_statistics.h>

#include <pybindings.h>
#include "so3g_numpy.h"
Expand Down Expand Up @@ -993,6 +996,158 @@ void interp1d_linear(const bp::object & x, const bp::object & y,
}
}

template <typename T>
T _calculate_median(const T* data, const int n)
{
// Copy to prevent overwriting input with gsl median
// Explicitly cast to double here due to gsl
std::vector<double> data_copy(n);
std::transform(data, data + n, data_copy.begin(), [](double val) {
return static_cast<double>(val);
});

// GSL is much faster than a naive std::sort implementation
return gsl_stats_median(data_copy.data(), 1, n);
}

template <typename T>
void _detrend(T* data, const int ndets, const int nsamps, const int row_stride,
const std::string & method, const int linear_ncount,
const int nthreads)
{
if (method == "mean") {
#pragma omp parallel for num_threads(nthreads)
for (int i = 0; i < ndets; ++i) {
int ioff = i * row_stride;

T* data_row = data + ioff;

// This is significantly faster than gsl_stats_mean
T det_mean = 0.;
for (int si = 0; si < nsamps; ++si) {
det_mean += data_row[si];
}

det_mean /= nsamps;

for (int si = 0; si < nsamps; ++si) {
data_row[si] -= det_mean;
}
}
}
else if (method == "median") {
#pragma omp parallel for num_threads(nthreads)
for (int i = 0; i < ndets; ++i) {
int ioff = i * row_stride;

T* data_row = data + ioff;

T det_median = _calculate_median(data_row, nsamps);

for (int si = 0; si < nsamps; ++si) {
data_row[si] -= det_median;
}
}
}
else if (method == "linear") {
// Default ncount
int ncount = linear_ncount;
if (ncount == -1) {
ncount = nsamps / 2;
}

T x[nsamps];
T step = 1.0 / (nsamps - 1);

// Equivalent to np.linspace(0.,1.,nsamp)
for (int si = 0; si < nsamps; ++si) {
x[si] = si * step;
}

ncount = std::max(1, std::min(ncount, nsamps / 2));

int last_offset = nsamps - ncount;

#pragma omp parallel for num_threads(nthreads)
for (int i = 0; i < ndets; ++i) {
int ioff = i * row_stride;

T* data_row = data + ioff;

// Mean of first and last ncount samples
T det_mean_first = 0.;
T det_mean_last = 0.;

for (int si = 0; si < ncount; ++si) {
det_mean_last += data_row[si + last_offset];
det_mean_first += data_row[si];
}

T slope = (det_mean_last - det_mean_first) / ncount;

T det_mean = 0.;
for (int si = 0; si < nsamps; ++si) {
data_row[si] -= slope * x[si];
det_mean += data_row[si];
}

det_mean /= nsamps;
for (int si = 0; si < nsamps; ++si) {
data_row[si] -= det_mean;
}
}
}
else {
throw ValueError_exception("Unupported detrend method. Supported methods "
"are 'mean', 'median', and 'linear'");
}
}

template <typename T>
void _detrend_buffer(bp::object & tod, const std::string & method,
const int linear_ncount)
{
BufferWrapper<T> tod_buf ("tod", tod, false, std::vector<int>{-1, -1});
if (tod_buf->strides[1] != tod_buf->itemsize)
throw ValueError_exception("Argument 'tod' must be contiguous in last axis.");
T* tod_data = (T*)tod_buf->buf;
const int ndets = tod_buf->shape[0];
const int nsamps = tod_buf->shape[1];

int row_stride = tod_buf->strides[0] / sizeof(T);

// _detrend may be called from within a parallel loop internally, so manage
// parallelization explicitly
int nthreads = 1;
#pragma omp parallel
{
#ifdef _OPENMP
if (omp_get_thread_num() == 0)
nthreads = omp_get_num_threads();
#endif
}

// We want _detrend to accept C++ types so it can be used internally
// for Welch psd calculations, hence the hierarchical function calls
_detrend<T>(tod_data, ndets, nsamps, row_stride, method, linear_ncount, nthreads);
}

void detrend(bp::object & tod, const std::string & method, const int linear_ncount)
{
// Get data type
int dtype = get_dtype(tod);

if (dtype == NPY_FLOAT) {
_detrend_buffer<float>(tod, method, linear_ncount);
}
else if (dtype == NPY_DOUBLE) {
_detrend_buffer<double>(tod, method, linear_ncount);
}
else {
throw TypeError_exception("Only float32 or float64 arrays are supported.");
}
}


PYBINDINGS("so3g")
{
Expand Down Expand Up @@ -1097,4 +1252,17 @@ PYBINDINGS("so3g")
" with shape (nsamp_interp,)\n"
" y_interp: interpolated data array (float32/float64) output buffer to be modified "
" with shape (ndet, nsamp_interp)\n");
bp::def("detrend", detrend,
"detrend(tod, method, ncount)"
"\n"
"Detrend each row of an array (float32/float64). This function uses OMP to parallelize "
"over the dets (rows) axis."
"\n"
"Args:\n"
" tod: input array (float32/float64) buffer with shape (ndet, nsamp) that is to be detrended. "
" The data is modified in place.\n"
" method: how to detrend data. Options are 'mean', 'median', and 'linear'. Linear calculates "
" and subtracts the slope from either end of each row as determined from 'linear_ncount'.\n"
" linear_ncount: Number (int) of samples to use, on each end, when measuring mean level for 'linear'"
" detrend. Values larger than 1 suppress the influence of white noise.\n");
}
77 changes: 77 additions & 0 deletions test/test_array_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,82 @@ def test_04_array_slicing(self):
np.testing.assert_allclose(scipy_sig, so3g_sig[:,interp_slice_offset:], rtol=tolerance)


class TestDetrend(unittest.TestCase):
"""
Test detrending.
"""

def test_00_mean_detrending(self):
nsamps = 1000
ndets = 3
dtype = "float32"
order = "C"

x = np.linspace(0., 1., nsamps, dtype=dtype)
signal = np.array([(i + 1) * np.sin(2*np.pi*x + i) for i in range(ndets)], dtype=dtype, order=order)

signal_copy = signal.copy(order=order)
signal_copy -= np.mean(signal_copy, axis=-1, dtype=dtype)[..., None]

method = "mean"
count = 0 # not used for mean detrending
so3g.detrend(signal, method, count)

rtol = 0
atol = 1e-5
np.testing.assert_allclose(signal_copy, signal, rtol=rtol, atol=atol)

def test_01_median_detrending(self):
nsamps = 1000
ndets = 3
dtype = "float32"
order = "C"

x = np.linspace(0, 1, nsamps, dtype=dtype)
signal = np.array([(i + 1) * np.sin(2*np.pi*x + i) for i in range(ndets)], dtype=dtype, order=order)

signal_copy = signal.copy(order=order)
signal_copy -= np.median(signal_copy, axis=-1)[..., None]

method = "median"
count = 0 # not used for median detrending
so3g.detrend(signal, method, count)

rtol = 0.
atol = 1e-5
np.testing.assert_allclose(signal_copy, signal, rtol=rtol, atol=atol)

def test_02_linear_detrending(self):
nsamps = 1000
ndets = 10
dtype = "float32"
order = "C"
count = nsamps // 3

x = np.linspace(0., 1., nsamps, dtype=dtype)
signal = np.array([(i + 1) * np.sin(2*np.pi*x + i) for i in range(ndets)], dtype=dtype, order=order)

signal_copy = signal.copy(order=order)

# this is the sotodlib "linear" detrending algorithm copied exactly
count_copy = max(1, min(count, signal_copy.shape[-1] // 2))
slopes = signal_copy[..., -count_copy:].mean(axis=-1, dtype=dtype) - signal[
..., :count_copy
].mean(axis=-1, dtype=dtype)

# ignore shape != 2 case as c++ approach only supports 1D or 2D
for i in range(signal_copy.shape[0]):
signal_copy[i, :] -= slopes[i] * x

signal_copy -= np.mean(signal_copy, axis=-1)[..., None]

method = "linear"
so3g.detrend(signal, method, count)

rtol = 0.
atol = 1e-5
np.testing.assert_allclose(signal_copy, signal, rtol=rtol, atol=atol)


if __name__ == "__main__":
unittest.main()
Loading