"""
Signal-processing functions.
Functions for pre-processing signals (e.g. filtering, cross-correlation).
Mostly wrappers around numpy/scipy functions, but with some sane defaults and
calculation of required values (e.g. Nyquist frequency and associated cutoff).
"""
from __future__ import print_function, division
import math
import matplotlib.pyplot as plt
import numpy as np
import sensormotion.plot
import scipy.linalg as la
from scipy.signal import butter, filtfilt
[docs]def baseline(y, deg=None, max_it=None, tol=None):
"""
Computes the baseline of a given data.
Iteratively performs a polynomial fitting in the data to detect its
baseline. At every iteration, the fitting weights on the regions with
peaks are reduced to identify the baseline only.
Parameters
----------
y : ndarray
Data to detect the baseline.
deg : int
Degree of the polynomial that will estimate the data baseline. A low
degree may fail to detect all the baseline present, while a high
degree may make the data too oscillatory, especially at the edges.
max_it : int
Maximum number of iterations to perform.
tol : float
Tolerance to use when comparing the difference between the current
fit coefficients and the ones from the last iteration. The iteration
procedure will stop when the difference between them is lower than
*tol*.
Returns
-------
baseline : ndarray
Array with the baseline amplitude for every original point in *y*
"""
# for not repeating ourselves in `envelope`
if deg is None:
deg = 3
if max_it is None:
max_it = 100
if tol is None:
tol = 1e-3
order = deg + 1
coeffs = np.ones(order)
# try to avoid numerical issues
cond = math.pow(y.max(), 1.0 / order)
x = np.linspace(0.0, cond, y.size)
base = y.copy()
vander = np.vander(x, order)
vander_pinv = la.pinv2(vander)
for _ in range(max_it):
coeffs_new = np.dot(vander_pinv, y)
if la.norm(coeffs_new - coeffs) / la.norm(coeffs) < tol:
break
coeffs = coeffs_new
base = np.dot(vander, coeffs)
y = np.minimum(y, base)
return base
[docs]def build_filter(frequency, sample_rate, filter_type, filter_order):
"""
Build a butterworth filter with specified parameters.
Calculates the Nyquist frequency and associated frequency cutoff, and
builds a Butterworth filter from the parameters.
Parameters
----------
frequency : int or tuple of ints
The cutoff frequency for the filter. If `filter_type` is set as
'bandpass' then this needs to be a tuple of integers representing
the lower and upper bound frequencies. For example, for a bandpass
filter with range of 2Hz and 10Hz, you would pass in the tuple (2, 10).
For filter types with a single cutoff frequency then a single integer
should be used.
sample_rate : float
Sampling rate of the signal.
filter_type : {'lowpass', 'highpass', 'bandpass', 'bandstop'}
Type of filter to build.
filter_order: int, optional
Order of the filter.
Returns
-------
b : ndarray
Numerator polynomials of the IIR filter.
a : ndarray
Denominator polynomials of the IIR filter.
"""
nyq = 0.5 * sample_rate
if filter_type == "bandpass":
nyq_cutoff = (frequency[0] / nyq, frequency[1] / nyq)
else:
nyq_cutoff = frequency / nyq
b, a = butter(filter_order, nyq_cutoff, btype=filter_type, analog=False)
return b, a
[docs]def detrend_signal(signal, degree):
"""Detrend a signal.
Detrends a signal using a polynomial fit with the specified degree.
Parameters
----------
signal : ndarray
Signal values to detrend.
degree : int
Degree of the polynomial that will estimate the data baseline. A low
degree may fail to detect all the baseline present, while a high
degree may make the data too oscillatory, especially at the edges. A
value of 0 will not apply any baseline detrending. The baseline for
detrending is calculated by :func:`sensormotion.signal.baseline`.
Returns
-------
detrended_signal : ndarray
Detrended form of the original signal.
"""
signal_baseline = baseline(signal, deg=degree)
return signal - signal_baseline
[docs]def fft(signal, sampling_rate, plot=False, show_grid=True, fig_size=(10, 5)):
"""
Perform FFT on signal.
Compute 1D Discrete Fourier Transform using Fast Fourier Transform.
Optionally, plot the power spectrum of the frequency domain.
Parameters
----------
signal : ndarray
Input array to be transformed.
sampling_rate : float
Sampling rate of the input signal.
plot : bool, optional
Toggle to display a plot of the power spectrum.
show_grid : bool, optional
If creating a plot, toggle to show grid lines on the figure.
fig_size : tuple, optional
If plotting, set the width and height of the resulting figure.
Returns
-------
signal_fft : ndarray
Transformation of the original input signal.
"""
n = len(signal)
t = 1.0 / sampling_rate
time = range(n) # Time vector
xf = np.linspace(0.0, 1.0 / (2.0 * t), n // 2)
yf = np.fft.fft(signal) / n # FFT and normalize
if plot:
f, axarr = plt.subplots(2, 1, figsize=fig_size)
axarr[0].plot(time, signal)
axarr[0].set_xlim(min(time), max(time))
axarr[0].set_xlabel("Time Steps")
axarr[0].set_ylabel("Amplitude")
axarr[0].grid(show_grid)
axarr[1].plot(xf, abs(yf[0 : n // 2]), "r") # Plot the spectrum
axarr[1].set_xlabel("Freq (Hz)")
axarr[1].set_ylabel("|Y(freq)|")
axarr[1].grid(show_grid)
f.subplots_adjust(hspace=0.5)
plt.suptitle("Power Spectrum", size=16)
plt.show()
return yf
[docs]def filter_signal(b, a, signal):
"""
Filter a signal.
Simple wrapper around :func:`scipy.signal.filtfilt` to apply a
foward-backward filter to preserve phase of the input. Requires the
numerator and denominator polynomials from
:func:`sensormotion.signal.build_filter`.
Parameters
----------
b : ndarray
Numerator polynomial coefficients of the filter.
a : ndarray
Denominator polynomial coefficients of the filter.
signal : ndarray
Input array to be filtered.
Returns
-------
signal_filtered : ndarray
Filtered output of the original input signal.
"""
return filtfilt(b, a, signal)
[docs]def rectify_signal(
signal, rectifier_type="full", plot=False, show_grid=True, fig_size=(10, 5)
):
"""
Rectify a signal.
Run a signal through a full or half-wave rectifier. Optionally plot the
resulting signal.
Parameters
----------
signal : ndarray
Input signal to be rectified.
rectifier_type : {'full', 'half'}, optional
Type of rectifier to use. Full-wave rectification turns all negative
values into positive ones. Half-wave rectification sets all negative
values to zero.
plot : bool, optional
Toggle to display a plot of the rectified signal.
show_grid : bool, optional
If creating a plot, toggle to show grid lines on the figure.
fig_size : tuple, optional
If plotting, set the width and height of the resulting figure.
Returns
-------
output : ndarray
Rectified signal.
"""
if rectifier_type == "half":
output = signal * (signal > 0)
elif rectifier_type == "full":
output = np.abs(signal)
if plot:
f, ax = plt.subplots(1, 1, figsize=fig_size)
time = np.arange(len(signal))
ax.plot(time, signal, color="k", linewidth=1, alpha=0.5, label="Original")
ax.plot(time, output, color="r", linewidth=0.9, label="Rectified")
ax.set_xlim(min(time), max(time))
ax.grid(show_grid)
ax.legend()
plt.suptitle("Rectified Signal ({})".format(rectifier_type), size=16)
plt.show()
return output
[docs]def vector_magnitude(*args):
"""
Calculate the vector magnitude/euclidean norm of multiple vectors.
Given an arbitrary number of input vectors, calculate the vector
magnitude/euclidean norm using the Pythagorean theorem.
Parameters
----------
*args : ndarray
Each parameter is a numpy array representing a single vector. Multiple
vectors can be passed in, for example, `vector_magnitude(x, y, z)`
Returns
-------
vm : ndarray
Vector magnitude across all input vectors.
"""
n = len(args[0])
assert all(len(x) == n for x in args), "Vectors have different lengths"
vm = np.sqrt(sum(x ** 2 for x in args))
return vm
[docs]def xcorr(x, y, scale="none", plot=False, show_grid=True, fig_size=(10, 5)):
"""
Cross-correlation between two 1D signals.
Calculate the cross-correlation between two signals for all time lags
(forwards and backwards). If the inputs are different lengths, zeros will
be appended to the shorter input.
All 4 scaling options (`none`, `biased`, `unbiased`, and `coeff`)
reproduce the output from MATLAB's `xcorr()` function.
Optionally, plots can be created to visualize the cross-correlation values
at each lag.
Parameters
----------
x : ndarray
First input signal.
y : ndarray
Second input signal. Pass in `x` again for autocorrelation.
scale : {'none', 'biased', 'unbiased', 'coeff'}, optional
Scaling options for the cross-correlation values. Replicates MATLAB's
options for scaling.
plot : bool, optional
Toggle to display a plot of the cross-correlations.
show_grid : bool, optional
If creating a plot, toggle to show grid lines on the figure.
fig_size : tuple, optional
If plotting, set the width and height of the resulting figure.
Returns
-------
corr : ndarray
Cross-correlation values.
lags : ndarray
Lags for the cross-correlations.
"""
x = np.array(x)
y = np.array(y)
# Pad shorter array if signals are different lengths
if x.size > y.size:
pad_amount = x.size - y.size
y = np.append(y, np.repeat(0, pad_amount))
elif y.size > x.size:
pad_amount = y.size - x.size
x = np.append(x, np.repeat(0, pad_amount))
corr = np.correlate(x, y, mode="full")
lags = np.arange(-(x.size - 1), x.size)
# Scale the correlation values
# Equivalent to xcorr scaling options in MATLAB
if scale == "biased":
corr = corr / x.size
elif scale == "unbiased":
corr /= x.size - abs(lags)
elif scale == "coeff":
corr /= np.sqrt(np.dot(x, x) * np.dot(y, y))
if plot:
sensormotion.plot.plot_signal(
lags,
corr,
title="Cross-correlation (scale: {})".format(scale),
xlab="Lag",
ylab="Correlation",
show_grid=show_grid,
fig_size=fig_size,
)
return corr, lags