diff --git a/src/array_ops.cxx b/src/array_ops.cxx index d6dec7f..2319e9e 100644 --- a/src/array_ops.cxx +++ b/src/array_ops.cxx @@ -15,7 +15,10 @@ extern "C" { } #include +#include + #include +#include #include #include "so3g_numpy.h" @@ -993,6 +996,158 @@ void interp1d_linear(const bp::object & x, const bp::object & y, } } +template +T _calculate_median(const T* data, const int n) +{ + // Copy to prevent overwriting input with gsl median + // Explicitly cast to double here due to gsl + std::vector data_copy(n); + std::transform(data, data + n, data_copy.begin(), [](double val) { + return static_cast(val); + }); + + // GSL is much faster than a naive std::sort implementation + return gsl_stats_median(data_copy.data(), 1, n); +} + +template +void _detrend(T* data, const int ndets, const int nsamps, const int row_stride, + const std::string & method, const int linear_ncount, + const int nthreads) +{ + if (method == "mean") { + #pragma omp parallel for num_threads(nthreads) + for (int i = 0; i < ndets; ++i) { + int ioff = i * row_stride; + + T* data_row = data + ioff; + + // This is significantly faster than gsl_stats_mean + T det_mean = 0.; + for (int si = 0; si < nsamps; ++si) { + det_mean += data_row[si]; + } + + det_mean /= nsamps; + + for (int si = 0; si < nsamps; ++si) { + data_row[si] -= det_mean; + } + } + } + else if (method == "median") { + #pragma omp parallel for num_threads(nthreads) + for (int i = 0; i < ndets; ++i) { + int ioff = i * row_stride; + + T* data_row = data + ioff; + + T det_median = _calculate_median(data_row, nsamps); + + for (int si = 0; si < nsamps; ++si) { + data_row[si] -= det_median; + } + } + } + else if (method == "linear") { + // Default ncount + int ncount = linear_ncount; + if (ncount == -1) { + ncount = nsamps / 2; + } + + T x[nsamps]; + T step = 1.0 / (nsamps - 1); + + // Equivalent to np.linspace(0.,1.,nsamp) + for (int si = 0; si < nsamps; ++si) { + x[si] = si * step; + } + + ncount = std::max(1, std::min(ncount, nsamps / 2)); + + int last_offset = nsamps - ncount; + + #pragma omp parallel for num_threads(nthreads) + for (int i = 0; i < ndets; ++i) { + int ioff = i * row_stride; + + T* data_row = data + ioff; + + // Mean of first and last ncount samples + T det_mean_first = 0.; + T det_mean_last = 0.; + + for (int si = 0; si < ncount; ++si) { + det_mean_last += data_row[si + last_offset]; + det_mean_first += data_row[si]; + } + + T slope = (det_mean_last - det_mean_first) / ncount; + + T det_mean = 0.; + for (int si = 0; si < nsamps; ++si) { + data_row[si] -= slope * x[si]; + det_mean += data_row[si]; + } + + det_mean /= nsamps; + for (int si = 0; si < nsamps; ++si) { + data_row[si] -= det_mean; + } + } + } + else { + throw ValueError_exception("Unupported detrend method. Supported methods " + "are 'mean', 'median', and 'linear'"); + } +} + +template +void _detrend_buffer(bp::object & tod, const std::string & method, + const int linear_ncount) +{ + BufferWrapper tod_buf ("tod", tod, false, std::vector{-1, -1}); + if (tod_buf->strides[1] != tod_buf->itemsize) + throw ValueError_exception("Argument 'tod' must be contiguous in last axis."); + T* tod_data = (T*)tod_buf->buf; + const int ndets = tod_buf->shape[0]; + const int nsamps = tod_buf->shape[1]; + + int row_stride = tod_buf->strides[0] / sizeof(T); + + // _detrend may be called from within a parallel loop internally, so manage + // parallelization explicitly + int nthreads = 1; + #pragma omp parallel + { + #ifdef _OPENMP + if (omp_get_thread_num() == 0) + nthreads = omp_get_num_threads(); + #endif + } + + // We want _detrend to accept C++ types so it can be used internally + // for Welch psd calculations, hence the hierarchical function calls + _detrend(tod_data, ndets, nsamps, row_stride, method, linear_ncount, nthreads); +} + +void detrend(bp::object & tod, const std::string & method, const int linear_ncount) +{ + // Get data type + int dtype = get_dtype(tod); + + if (dtype == NPY_FLOAT) { + _detrend_buffer(tod, method, linear_ncount); + } + else if (dtype == NPY_DOUBLE) { + _detrend_buffer(tod, method, linear_ncount); + } + else { + throw TypeError_exception("Only float32 or float64 arrays are supported."); + } +} + PYBINDINGS("so3g") { @@ -1097,4 +1252,17 @@ PYBINDINGS("so3g") " with shape (nsamp_interp,)\n" " y_interp: interpolated data array (float32/float64) output buffer to be modified " " with shape (ndet, nsamp_interp)\n"); + bp::def("detrend", detrend, + "detrend(tod, method, ncount)" + "\n" + "Detrend each row of an array (float32/float64). This function uses OMP to parallelize " + "over the dets (rows) axis." + "\n" + "Args:\n" + " tod: input array (float32/float64) buffer with shape (ndet, nsamp) that is to be detrended. " + " The data is modified in place.\n" + " method: how to detrend data. Options are 'mean', 'median', and 'linear'. Linear calculates " + " and subtracts the slope from either end of each row as determined from 'linear_ncount'.\n" + " linear_ncount: Number (int) of samples to use, on each end, when measuring mean level for 'linear'" + " detrend. Values larger than 1 suppress the influence of white noise.\n"); } \ No newline at end of file diff --git a/test/test_array_ops.py b/test/test_array_ops.py index a755365..5bfcc8c 100644 --- a/test/test_array_ops.py +++ b/test/test_array_ops.py @@ -260,5 +260,82 @@ def test_04_array_slicing(self): np.testing.assert_allclose(scipy_sig, so3g_sig[:,interp_slice_offset:], rtol=tolerance) +class TestDetrend(unittest.TestCase): + """ + Test detrending. + """ + + def test_00_mean_detrending(self): + nsamps = 1000 + ndets = 3 + dtype = "float32" + order = "C" + + x = np.linspace(0., 1., nsamps, dtype=dtype) + signal = np.array([(i + 1) * np.sin(2*np.pi*x + i) for i in range(ndets)], dtype=dtype, order=order) + + signal_copy = signal.copy(order=order) + signal_copy -= np.mean(signal_copy, axis=-1, dtype=dtype)[..., None] + + method = "mean" + count = 0 # not used for mean detrending + so3g.detrend(signal, method, count) + + rtol = 0 + atol = 1e-5 + np.testing.assert_allclose(signal_copy, signal, rtol=rtol, atol=atol) + + def test_01_median_detrending(self): + nsamps = 1000 + ndets = 3 + dtype = "float32" + order = "C" + + x = np.linspace(0, 1, nsamps, dtype=dtype) + signal = np.array([(i + 1) * np.sin(2*np.pi*x + i) for i in range(ndets)], dtype=dtype, order=order) + + signal_copy = signal.copy(order=order) + signal_copy -= np.median(signal_copy, axis=-1)[..., None] + + method = "median" + count = 0 # not used for median detrending + so3g.detrend(signal, method, count) + + rtol = 0. + atol = 1e-5 + np.testing.assert_allclose(signal_copy, signal, rtol=rtol, atol=atol) + + def test_02_linear_detrending(self): + nsamps = 1000 + ndets = 10 + dtype = "float32" + order = "C" + count = nsamps // 3 + + x = np.linspace(0., 1., nsamps, dtype=dtype) + signal = np.array([(i + 1) * np.sin(2*np.pi*x + i) for i in range(ndets)], dtype=dtype, order=order) + + signal_copy = signal.copy(order=order) + + # this is the sotodlib "linear" detrending algorithm copied exactly + count_copy = max(1, min(count, signal_copy.shape[-1] // 2)) + slopes = signal_copy[..., -count_copy:].mean(axis=-1, dtype=dtype) - signal[ + ..., :count_copy + ].mean(axis=-1, dtype=dtype) + + # ignore shape != 2 case as c++ approach only supports 1D or 2D + for i in range(signal_copy.shape[0]): + signal_copy[i, :] -= slopes[i] * x + + signal_copy -= np.mean(signal_copy, axis=-1)[..., None] + + method = "linear" + so3g.detrend(signal, method, count) + + rtol = 0. + atol = 1e-5 + np.testing.assert_allclose(signal_copy, signal, rtol=rtol, atol=atol) + + if __name__ == "__main__": unittest.main()