Source code for stompy.nanpsd

import numpy as np
try:
    from numpy import nanmean
except ImportError:
    # obsolete location
    from scipy.stats import nanmean

# initial implementation 90ms for 1132 sample window,
# compared to 0.258ms for holey_psd
# changing to nanmean -> 6ms.
# mean takes just 3ms.

[docs]def nan_detrend(x,order=1): valid=np.isfinite(x) coord=np.nonzero(valid)[0] order=min(order,len(coord)-1) if order>=0: coeffs=np.polyfit(coord,x[valid],order) return x-np.polyval(coeffs,np.arange(len(x))) else: return x
[docs]def autocorrcoef(x,maxlags=None): N = len(x) #default lag is N-1 if maxlags == None: maxlags = N - 1 else: assert maxlags < N, 'lag must be less than len(x)' assert(np.isrealobj(x)) #create an autocorrelation array with same length as lag r = np.zeros(1+maxlags, dtype=float) x=x-np.nanmean(x) for k in range(0, maxlags+1): nk = N - k - 1 a=x[0:nk+1] b=x[k:k+nk+1] valid=np.isfinite(a*b) denom=np.sqrt( np.mean( a[valid]**2 ) ) * np.sqrt( (b[valid]**2).mean() ) r[k]=np.mean( a[valid]*b[valid] ) / denom return r
[docs]def autocovariance(x, maxlags=None, normalize=None): """ Correlation function - similar to xcorr. Copied from spectrum.correlation.CORRELATION, and simplified for the present application. assumes autocorrelation, not cross-correlation. norm is unbiased,x is real. By default this is autocovariance. normalize: 'corr' - divide by var(x). 'corrcoef' - calculate a correlation coefficient for each lag """ N = len(x) if np.all(np.isfinite(x)): my_mean=np.mean else: my_mean=nanmean #default lag is N-1 if maxlags == None: maxlags = N - 1 else: assert maxlags < N, 'lag must be less than len(x)' assert(np.isrealobj(x)) #create an autocorrelation array with same length as lag r = np.zeros(1+maxlags, dtype=float) for k in range(0, maxlags+1): nk = N - k - 1 # for an unbiased estimate would have to get fancier, # counting the number of missing samples. # for biased, though, it's easy: # biased - doesn't really make a big difference if normalize=='corrcoef': valid=np.isfinite(x[0:nk+1]*x[k:k+nk+1]) r[k]=np.corrcoef(x[0:nk+1],x[k:k+nk+1])[1,0] else: r[k]=my_mean(x[0:nk+1]*x[k:k+nk+1]) if normalize=='corr': r/=r[0] return r
[docs]def correlogrampsd(X, lag, NFFT=None): """ PSD estimate using correlogram method. taken from spectrum, simplified for real-valued autocorrelation """ N = len(X) assert lag<N, 'lag must be < size of input data' if NFFT == None: NFFT = N psd = np.zeros(NFFT, dtype=complex) # Window should be centered around zero. Moreover, we want only the # positive values. So, we need to use 2*lag + 1 window and keep values on # the right side. w = np.hanning(2*lag+1)[lag+1:] # compute the cross covariance rxy = autocovariance(X, lag) # keep track of the first elt. psd[0] = rxy[0] # create the first part of the PSD psd[1:lag+1] = rxy[1:] * w # create the second part. psd[-1:NFFT-lag-1:-1] = rxy[1:].conjugate() * w # real, and not abs?? # probably because this is the spectrum of the autocorrelation - # the phase is very important psd = np.real(np.fft.fft(psd)) return psd
[docs]def psd_correl(data,Fs=1,NFFT=None,scale_by_freq=True,lag=None,detrend=1): """ a mlab.psd workalike, but based on the correlogram. """ lag=lag or len(data)/10 if detrend is not None: data=nan_detrend(data,order=detrend) Pxx=correlogrampsd(X=data,lag=lag,NFFT=NFFT) NFFT=len(Pxx) # since real valued: # though this is probably where it should *not* get flipped dc_comp=Pxx[0] Pxx = Pxx[NFFT/2:]*2 Pxx[0] /= 2. Pxx = np.append(Pxx, dc_comp) # putting the DC part at the end?? if scale_by_freq: Pxx /= Fs N=NFFT # not entirely sure about this... df=float(Fs)/N kx=df*np.arange(len(Pxx)) # was 0,1+np.floor(N/2.0) # in a few cases, it seems that we get negative values, # at least in the spectrum code on which this is based # (usually for very sparse signals). # Also, as written, Pxx is backwards, with the DC component # at the last index. Pxx=np.abs(Pxx[::-1]) return Pxx,kx
# input size 1131 # kx 566 # Pxx: 567 # Pxx from correlogrampsd is 1131 long