"""
General zero-phase filter. Thanks to Ed Gross for most
of this. It's mostly a python translation of matlab code he
provided. (this is a generalization of the code in tidal_filter.py)
"""
from __future__ import print_function
import numpy as np
from scipy.signal.filter_design import butter
import scipy.signal
from scipy.signal import filtfilt, lfilter
import warnings
[docs]def lowpass(data,in_t=None,cutoff=None,order=4,dt=None,axis=-1,causal=False):
"""
data: vector of data
in_t: sample times
cutoff: cutoff period in the same units as in_t
returns vector same as data, but with high frequencies removed
"""
# Step 1: Determine dt from data or from user if specified
if dt is None:
dt=np.median(np.diff(in_t))
dt=float(dt) # make sure it's not an int
cutoff=float(cutoff)
Wn = dt / cutoff
B,A = butter(order, Wn)
if not causal:
# scipy filtfilt triggers some warning message about tuple
# indices.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
data_filtered = filtfilt(B,A,data,axis=axis)
else:
data_filtered = lfilter(B,A,data,axis=axis)
return data_filtered
[docs]def lowpass_gotin(data,in_t_days,*args,**kwargs):
print("Use lowpass_godin() instead of lowpass_gotin()")
return lowpass_godin(data,in_t_days,*args,**kwargs)
[docs]def lowpass_godin(data,in_t_days=None,ends='pass',
mean_dt_h = None,
*args,**kwargs):
""" Approximate Godin's tidal filter
Note that in preserving the length of the dataset, the ends aren't really
valid
data: array suitable to pass to np.convolve
in_t_days: timestamps in decimal days. This is only used to establish
the time step, which is assumed to be constant.
dt_h: directly specify timestep in hours.
ends:
'pass' no special treatment at the ends. The first and last ~37
hours will be contaminated by end-effects.
'nan' will replace potentially contaminated end samples with nan
*args,**kwargs are allowed but ignored. They are present to make it
easier to slip this method in to replace others without having to change
the call signature
"""
if mean_dt_h is None:
mean_dt_h = 24*np.mean(np.diff(in_t_days))
# how many samples are in 24 hours?
N24 = int(round(24. / mean_dt_h))
# and in 25 hours?
N25 = int(round(25. / mean_dt_h))
A24 = np.ones(N24) / float(N24)
A25 = np.ones(N25) / float(N25)
if ends=='nan':
# Add nan at start/end, which will carry through
# the convolution to mark any samples affected
# by the ends
data=np.concatenate( ( [np.nan],data,[np.nan] ) )
data = np.convolve(data,A24,'same')
data = np.convolve(data,A24,'same')
data = np.convolve(data,A25,'same')
if ends=='nan':
data=data[1:-1]
return data
[docs]def lowpass_fir(x,winsize,ignore_nan=True,axis=-1,mode='same',use_fft=False,
nan_weight_threshold=0.49999):
"""
In the absence of exact filtering needs, choose the window
size to match the cutoff period. Signals with a frequency corresponding to
that cutoff period will be attenuated to about 50% of their original
amplitude, corresponding to the -6dB point.
Rolloff is about 18dB/octave, though highly scalloped so it's not as
simple as with a Butterworth filter. That 18dB/octave is roughly the same as
a 3rd order butterworth, but it's a bit unclear on exactly where the 18dB/octave
rolloff holds.
x: ndarray
winsize: integer - how long the hanning window is
axis: the axis along which to apply the filter
mode: same as for scipy.signal convolve operations
use_fft: using the fft is faster, but sometimes less robust
nan_weight_threshold: items with a weight less than this will be marked nan
the default value is slightly less than half, to avoid numerical roundoff
issues with 0.49999999 < 0.5
N.B. In the case of an input with nan only at the beginning and end, to
guarantee that the output will have the same nans as the input winsize must
be odd. Otherwise there can be nan-weights exactly==0.500 on both ends.
"""
# hanning windows have first/last elements==0.
# but it's counter-intuitive - so force a window with nonzero
# elements matching the requested size
win=np.hanning(winsize+2)[1:-1]
win/=win.sum()
if use_fft:
convolve=scipy.signal.fftconvolve
else:
convolve=scipy.signal.convolve
slices=[None]*x.ndim
slices[axis]=slice(None)
win=win[tuple(slices)] # expand to get the right broadcasting
if ignore_nan:
x=x.copy()
valid=np.isfinite(x)
x[~valid]=0.0
result=convolve( x, win, mode)
if ignore_nan:
weights=convolve( valid.astype('f4'),win,mode)
has_weight=(weights>0)
result[has_weight] = result[has_weight] / weights[has_weight]
result[~has_weight]=0 # redundant, but in case of roundoff.
result[weights<nan_weight_threshold]=np.nan
return result
# xarray time series versions:
[docs]def lowpass_xr(da,cutoff,**kw):
"""
Like lowpass(), but ds is a data array with a time coordinate,
and cutoff is a timedelta64.
"""
data=da.values
time_secs=(da.time.values-da.time.values[0])/np.timedelta64(1,'s')
cutoff_secs=cutoff/np.timedelta64(1,'s')
axis=da.get_axis_num('time')
data_lp=lowpass(data,time_secs,cutoff_secs,axis=axis,**kw)
da_lp=da.copy(deep=True)
da_lp.values[:]=data_lp
da_lp.attrs['comment']="lowpass at %g seconds"%(cutoff_secs)
return da_lp