Source code for sensormotion.peak

Peak detection algorithms.

This modules contains functions for detecting peaks and valleys in signals.
Signals can also be detrended by estimating a baseline prior to peak

Based on work by
`Lucas Hermann Negri <>`_.

from __future__ import print_function, division

import sensormotion.signal
import matplotlib.pyplot as plt
import numpy as np

[docs]def find_peaks( time, signal, peak_type="peak", min_val=0.5, min_dist=25, detrend=0, plot=False, show_grid=True, fig_size=(10, 5), ): """ Find peaks in a signal. Calculate and return the peaks and/or valleys of a signal. Can optionally detrend a signal before peak detection. A plot can be created that displays the original signal with overlaid peaks and valleys. Parameters ---------- time : ndarray Time vector (X-axis) component of the input signal. signal : ndarray Value (Y-axis) of the signal over time. peak_type : {'peak', 'valley', 'both'}, optional Type of peaks to be detected. `peak` will return positive peaks. `valley` will return negative peaks. `both` will return both peaks and valleys. Peak indices are calculated by calling :func:`sensormotion.peak.indexes`. min_val : float between [0., 1.], optional Normalized threshold. Only the peaks with amplitude higher than the threshold will be detected. min_dist : int, optional Minimum distance between each detected peak. The peak with the highest amplitude is preferred to satisfy this constraint. detrend : int, optional Degree of the polynomial that will estimate the data baseline. A low degree may fail to detect all the baseline present, while a high degree may make the data too oscillatory, especially at the edges. A value of 0 will not apply any baseline detrending. The baseline for detrending is calculated by :func:`sensormotion.signal.baseline`. plot : bool, optional Toggle to create a plot of the signal with peaks/valleys overlaid. show_grid : bool, optional If creating a plot, toggle to show grid lines fig_size : tuple, optional If creating a plot, set the width and height of the resulting figure. Returns ------- peak_times : ndarray Array containing the time of each peak. peak_values : ndarray Array containing the value of each peak. signal_detrended : ndarray, optional If detrend has been selected (`detrend` > 0), an additional array is returned containing the detrended signal. """ time = np.array(time) signal = np.array(signal) # Check for detrend if detrend == 0: # No detrending - don't calculate baseline new_signal = signal else: # Detrend the signal new_signal = sensormotion.signal.detrend_signal(signal, detrend) # Check peak type if peak_type == "peak": # Original input signal peaks = indexes(new_signal, thres=min_val, min_dist=min_dist) elif peak_type == "valley": # Flip the input signal for valleys peaks = indexes(np.negative(new_signal), thres=min_val, min_dist=min_dist) elif peak_type == "both": peaks = indexes(new_signal, thres=min_val, min_dist=min_dist) valleys = indexes(np.negative(new_signal), thres=min_val, min_dist=min_dist) peaks = np.sort(np.append(peaks, valleys)) if plot: if detrend == 0: f, axarr = plt.subplots(1, 1, figsize=fig_size) axarr.plot(time, signal, "k") axarr.plot( time[peaks], signal[peaks], "r+", ms=15, mew=2, label="{} peaks".format(len(peaks)), ) axarr.set_xlim(min(time), max(time)) axarr.set_xlabel("Time") axarr.grid(show_grid) axarr.legend(loc="lower right") else: f, axarr = plt.subplots(2, 1, figsize=fig_size) axarr[0].plot(time, signal, "k") axarr[0].title.set_text("Original") axarr[0].set_xlim(min(time), max(time)) axarr[0].set_xlabel("Time") axarr[0].grid(show_grid) axarr[1].plot(time, new_signal, "k") axarr[1].plot( time[peaks], new_signal[peaks], "r+", ms=15, mew=2, label="{} peaks".format(len(peaks)), ) axarr[1].title.set_text("Detrended (degree: {})".format(detrend)) axarr[1].set_xlim(min(time), max(time)) axarr[1].set_xlabel("Time") axarr[1].grid(show_grid) axarr[1].legend(loc="lower right") f.subplots_adjust(hspace=0.5) suptitle_string = "Peak Detection (val: {}, dist: {})" plt.suptitle(suptitle_string.format(min_val, min_dist), y=1.01, size=16) if detrend == 0: return time[peaks], signal[peaks] else: return time[peaks], new_signal[peaks], new_signal
[docs]def indexes(y, thres=0.3, min_dist=1): """ Peak detection routine. Finds the numeric index of the peaks in *y* by taking its first order difference. By using *thres* and *min_dist* parameters, it is possible to reduce the number of detected peaks. *y* must be signed. Parameters ---------- y : ndarray (signed) 1D amplitude data to search for peaks. thres : float between [0., 1.] Normalized threshold. Only the peaks with amplitude higher than the threshold will be detected. min_dist : int Minimum distance between each detected peak. The peak with the highest amplitude is preferred to satisfy this constraint. Returns ------- peak_indexes : ndarray Array containing the numeric indexes of the peaks that were detected """ if isinstance(y, np.ndarray) and np.issubdtype(y.dtype, np.unsignedinteger): raise ValueError("y must be signed") thres = thres * (np.max(y) - np.min(y)) + np.min(y) min_dist = int(min_dist) # compute first order difference dy = np.diff(y) # propagate left and right values successively to fill all # plateau pixels (0-value) zeros, = np.where(dy == 0) # check if the signal is totally flat if len(zeros) == len(y) - 1: return np.array([]) while len(zeros): # add pixels 2 by 2 to propagate left and right value onto the # zero-value pixel zerosr = np.hstack([dy[1:], 0.0]) zerosl = np.hstack([0.0, dy[:-1]]) # replace 0 with right value if non zero dy[zeros] = zerosr[zeros] zeros, = np.where(dy == 0) # replace 0 with left value if non zero dy[zeros] = zerosl[zeros] zeros, = np.where(dy == 0) # find the peaks by using the first order difference peaks = np.where( (np.hstack([dy, 0.0]) < 0.0) & (np.hstack([0.0, dy]) > 0.0) & (y > thres) )[0] # handle multiple peaks, respecting the minimum distance if peaks.size > 1 and min_dist > 1: highest = peaks[np.argsort(y[peaks])][::-1] rem = np.ones(y.size, dtype=bool) rem[peaks] = False for peak in highest: if not rem[peak]: sl = slice(max(0, peak - min_dist), peak + min_dist + 1) rem[sl] = True rem[peak] = False peaks = np.arange(y.size)[~rem] return peaks