#!/usr/bin/env python
# xcorr.py ---
# Copyright (C) 2012 Copyright (C) 2012 Phillip Cloud <cpcloud@gmail.com>
# Author: Phillip Cloud <cpcloud@gmail.com>
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
from pandas import Series, DataFrame
from six.moves import xrange
from span.utils import get_fft_funcs, isvector, nextpow2, compose
from span.utils import create_repeating_multi_index, _diag_inds_n
from span.xcorr._mult_mat_xcorr import _mult_mat_xcorr_parallel
def _mult_mat_xcorr_cython_parallel(X, Xc, c, n):
"""Perform the necessary matrix-vector multiplication and fill the cross-
correlation array. Slightly faster than pure Python.
Parameters
----------
X, Xc, c : c16[:, :]
n : ip
Raises
------
AssertionError
If n <= 0 or nx <= 0
"""
nx = c.shape[1]
_mult_mat_xcorr_parallel(X, Xc, c, n, nx)
def _mult_mat_xcorr_python(X, Xc, c, n):
"""Perform the necessary matrix-vector multiplication and fill the cross-
correlation array. Slightly slower than cython.
Parameters
----------
X, Xc, c : c16[:, :]
n : ip
Raises
------
AssertionError
If n <= 0 or nx <= 0
"""
for i in xrange(n):
c[i * n:(i + 1) * n] = X[i] * Xc
def _mult_mat_xcorr(X, Xc):
"""Perform the necessary matrix-vector multiplication and fill the cross-
correlation array. Slightly faster than pure Python.
Parameters
----------
X, Xc, c : c16[:, :]
n : ip
Raises
------
AssertionError
If n <= 0 or nx <= 0
"""
assert X is not None, '1st argument "X" must not be None'
assert Xc is not None, '2nd argument "Xc" must not be None'
n, nx = X.shape
c = np.empty((n * n, nx), dtype=X.dtype)
_mult_mat_xcorr_cython_parallel(X, Xc, c, n)
return c
def _autocorr(x, nfft):
"""Compute the autocorrelation of `x` using a FFT.
Parameters
----------
x : array_like
Input array.
nfft : int
Number of FFT points.
Returns
-------
r : array_like
The autocorrelation of `x`.
"""
ifft, fft = get_fft_funcs(x)
a = np.abs(fft(x, nfft))
a *= a
return ifft(a, nfft)
def _crosscorr(x, y, nfft):
"""Compute the cross correlation of `x` and `y` using an FFT.
Parameters
----------
x, y : array_like
The arrays of which to compute the cross correlation.
nfft : int
The number of fft points.
Returns
-------
c : array_like
Cross correlation of `x` and `y`.
"""
ifft, fft = get_fft_funcs(x, y)
return ifft(fft(x, nfft) * fft(y, nfft).conj(), nfft)
def _matrixcorr(x, nfft):
"""Cross-correlation of the columns of a matrix.
Parameters
----------
x : array_like
The matrix from which to compute the cross correlations of each column
with the others
nfft : int
The number of points used to compute the FFT (faster when this number
is a power of 2).
Returns
-------
c : array_like
The cross correlation of the columns `x`.
"""
_, n = x.shape
ifft, fft = get_fft_funcs(x)
X = fft(x.T, nfft)
Xc = X.conj()
c = _mult_mat_xcorr(X, Xc)
return ifft(c, nfft).T
def _unbiased(c, x, y, lags, lsize):
r"""Compute an unbiased estimate of `c`.
This function returns `c` scaled by the number of data points
available at each lag.
Parameters
----------
c : array_like
The cross correlation array
x, y : array_like
lags : array_like
The lags array, e.g., :math:`\left[\ldots, -2, -1, 0, 1, 2,
\ldots\right]`
lsize : int
The size of the largest of the inputs to the cross correlation
function.
Returns
-------
c : array_like
The unbiased estimate of the cross correlation.
"""
# max number of observations minus observations at each lag
d = lsize - np.abs(lags)
# protect divison by zero
d[np.logical_not(d)] = 1.0
# make the denominator repeat over the correct dimension
denom = np.tile(d[:, np.newaxis], (1, c.shape[1])) if c.ndim == 2 else d
return c / denom
def _biased(c, x, y, lags, lsize):
"""Compute a biased estimate of `c`.
Parameters
----------
c : array_like
The unscaled cross correlation array
x, y, lags : array_like
Unused; here to keep the API sane
lsize : int
The size of the largest of the inputs to the cross correlation
function.
Returns
-------
csc : array_like
The biased estimate of the cross correlation.
Notes
-----
Conceptually, when you choose this scaling procedure you are
ignoring the fact that there is a different amount of data at each
of the different lags, thus this procedure is called "biased".
Only the lag 0 cross-correlation is an unbiased estimate of
thecross-correlation function.
See Also
--------
span.xcorr.xcorr.unbiased
span.xcorr.xcorr.normalize
"""
return c / lsize
def _normalize(c, x, y, lags, lsize):
"""Normalize `c` by the lag 0 cross correlation
Parameters
----------
c : array_like
The cross correlation array to normalize
x : array_like
y : array_like
lags : array_like
lsize : int
The size of the largest of the inputs to the cross correlation
function
Raises
------
AssertionError
* If `c` is not a 1D or 2D array
Returns
-------
c : array_like
The normalized cross correlation.
"""
assert c.ndim in (1, 2), ('invalid number of dimensions of cross '
'correlation array, INPUT: %d, EXPECTED: 1 or 2'
', i.e., vector or matrix' % c.ndim)
def _sumsqr(x):
ax = np.abs(x)
ax *= ax
return ax.sum()
# vector
if c.ndim == 1:
# need this for either cross or auto
cdiv = _sumsqr(x)
# y is given so we computed a cross correlation
if y is not None:
cdiv *= _sumsqr(y)
cdiv = np.sqrt(cdiv)
else: # matrix case
# locations of lag 0 in a flat arrangement
vals = c[lags.max(), _diag_inds_n(int(np.sqrt(c.shape[1])))]
# scale by lag 0 of each column pair
np.sqrt(vals, vals)
cdiv = np.outer(vals, vals).ravel()
return c / cdiv
def _none(c, x, y, lags, lsize):
"""Do nothing with the input and return `c`.
Parameters
----------
c, x, y, lags : array_like
lsize : int
"""
return c
_SCALE_FUNCTIONS = {
None: _none,
'none': _none,
'unbiased': _unbiased,
'biased': _biased,
'normalize': _normalize
}
_SCALE_KEYS = tuple(_SCALE_FUNCTIONS.keys())
[docs]def xcorr(x, y=None, maxlags=None, detrend=None, scale_type=None):
"""Compute the cross correlation of `x` and `y`.
This function computes the cross correlation of `x` and `y`. It uses the
equivalence of the cross correlation with the negative convolution computed
using a FFT to achieve much faster cross correlation than is possible with
the signal processing definition.
By default it computes the unnormalized cross correlation.
Parameters
----------
x : array_like
The array to correlate.
y : array_like, optional
If y is None or equal to `x` or x and y reference the same object,
the autocorrelation is computed.
maxlags : int, optional
The maximum lag at which to compute the cross correlation. Must be less
than or equal to the max(x.size, y.size) if not None.
detrend : callable, optional
A callable to detrend the data. It must take a single parameter and
return an array
scale_type : {None, 'none', 'unbiased', 'biased', 'normalize'}, optional
* The type of scaling to perform on the data
* The default of 'normalize' returns the cross correlation scaled by
the lag 0 cross correlation i.e., the cross correlation scaled by the
product of the standard deviations of the arrays at lag 0.
Raises
------
AssertionError
* If `y` is not None and `x` is a matrix
* If `x` is not a vector when `y` is None or `y` is `x` or
``all(x == y)``.
* If `detrend` is not callable
* If `scale_type` is not a string or ``None``
* If `scale_type` is not in ``(None, 'none', 'unbiased', 'biased',
'normalize')``
* If `maxlags` ``>`` `lsize`, see source for details.
Returns
-------
c : Series or DataFrame or array_like
Autocorrelation of `x` if `y` is ``None``, cross-correlation of `x` if
`x` is a matrix and `y` is ``None``, or the cross-correlation of `x`
and `y` if both `x` and `y` are vectors.
"""
assert x.ndim in (1, 2), 'x must be a 1D or 2D array'
assert callable(detrend) or detrend is None, \
'detrend must be a callable object or None'
assert isinstance(scale_type, basestring) or scale_type is None, \
'"scale_type" must be a string or None'
assert scale_type in _SCALE_KEYS, ('"scale_type" must be one of '
'{0}'.format(_SCALE_KEYS))
if detrend is None:
detrend = lambda x: x
x = detrend(x)
if x.ndim == 2 and np.greater(x.shape, 1).all():
assert y is None, 'y argument not allowed when x is a 2D array'
lsize = x.shape[0]
inputs = x,
corrfunc = _matrixcorr
elif (y is None or y is x or np.array_equal(x, y) or
(x.shape == y.shape and np.allclose(x, y))):
assert isvector(x), 'x must be 1D'
lsize = x.shape[0]
inputs = x,
corrfunc = _autocorr
else:
lsize = max(x.size, y.size)
y = detrend(y)
inputs = x, y
corrfunc = _crosscorr
nfft = 2 ** nextpow2(2 * lsize - 1)
ctmp = corrfunc(*inputs, nfft=nfft)
if maxlags is None:
maxlags = lsize
assert maxlags <= lsize, ('max lags must be less than or equal to %i'
% lsize)
lags = np.r_[1 - maxlags:maxlags]
if isinstance(x, Series):
return_type = lambda y: Series(y, lags)
elif isinstance(x, DataFrame):
columns = create_repeating_multi_index(x.columns)
return_type = lambda y: DataFrame(y, lags, columns)
elif isinstance(x, np.ndarray):
return_type = lambda x: np.asanyarray(x)
scale_function = _SCALE_FUNCTIONS[scale_type]
ret_func = compose(return_type, scale_function)
return ret_func(ctmp.take(lags, axis=0), x, y, lags, lsize)
if __name__ == '__main__':
from span import PandasTank
import os
span_data_path = os.environ['SPAN_DATA_PATH'] # pragma: no cover
f = os.path.join(span_data_path, 'correlation_paper', 'Spont_Spikes_'
'091210_p17rat_s4_657umV')
tank = PandasTank(f) # pragma: no cover
sp = tank.spik
thr = sp.threshold(4 * sp.std()) # pragma: no cover
clr = thr.clear_refrac(thr) # pragma: no cover
binned = clr.resample('S', how='sum')
xc = clr.xcorr(binned) # pragma: no cover