#!/usr/bin/env python
# tank.py ---
# Copyright (C) 2012 Copyright (C) 2012 Phillip Cloud <cpcloud@gmail.com>
# Author: Phillip Cloud <cpcloud@gmail.com>
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Examples
--------
>>> import span
>>> path = 'some/path/to/a/tank/file'
>>> tank = span.tdt.PandasTank(path)
"""
import numbers
import os
import re
import warnings
import numpy as np
import pandas as pd
from numpy import nan as NA
from pandas import DataFrame, DatetimeIndex, Series
from pandas.util.decorators import cache_readonly
from span.tdt._read_tev import _read_tev_raw
from span.tdt.spikedataframe import SpikeDataFrame
from span.tdt.spikeglobals import TdtEventTypes, TdtDataTypes
from span.utils import (thunkify, cached_property, fromtimestamp,
assert_nonzero_existing_file, ispower2, OrderedDict,
num2name, LOCAL_TZ, remove_first_pc)
def _python_read_tev_raw(filename, fp_locs, block_size, spikes):
dt = spikes.dtype
with open(filename, 'rb') as f:
for i, loc in enumerate(fp_locs):
f.seek(loc)
spikes[i] = np.fromfile(f, dt, block_size)
def _first_int_group(regex, name):
try:
return int(regex.search(name).group(1))
except (TypeError, ValueError): # pragma: no cover
return None
[docs]class TdtTank(object):
"""Base class encapsulating methods for reading a TDT Tank.
Parameters
----------
path : str
The path to the tank file sans extension.
Attributes
----------
dtype
path (``str``) : Full path of the tank sans extensions
name (``str``) : basename of self.path
age (``int``) : The postnatal day age of the animal
site (``int``) : The site number of the recording, can be ``None``
datetime (``datetime.datetime``) : Date and time of the recording
time (``datetime.time``) : Time of the recording
date (``datetime.date``) : Date of the recording
fs (``float``) : sampling rate
start (``Timestamp``) : Start time of the recording
end (``Timestamp``) : End time of the recording
duration (``timedelta64[us]``) : Duration of the recording
"""
_names = ('size', 'type', 'name', 'channel', 'sort_code', 'timestamp',
'fp_loc', 'strobe', 'format', 'fs')
_formats = 'i4', 'i4', 'u4', 'u2', 'u2', 'f8', 'i8', 'f8', 'i4', 'f4'
_offsets = 0, 4, 8, 12, 14, 16, 24, 24, 32, 36
dtype = np.dtype({'names': _names, 'formats': _formats,
'offsets': _offsets}, align=True)
_site_re = re.compile(r'(?:.*s(?:ite)?_?(\d{1,2}))?')
_age_re = re.compile(r'(?<=_)[pP](\d+)')
_header_ext = 'tsq'
_raw_ext = 'tev'
def __init__(self, path, electrode_map, clean=False):
super(TdtTank, self).__init__()
self.electrode_map = electrode_map
tank_with_ext = path + os.extsep
tev_path = tank_with_ext + self._raw_ext
tsq_path = tank_with_ext + self._header_ext
assert_nonzero_existing_file(tev_path)
assert_nonzero_existing_file(tsq_path)
self.path = path
self.name = os.path.basename(path)
self.age = _first_int_group(self._age_re, self.name)
self.site = _first_int_group(self._site_re, self.name)
not_na_ts = self.raw.timestamp.dropna()
tstart = pd.datetime.fromtimestamp(not_na_ts.head(1).item())
tend = pd.datetime.fromtimestamp(not_na_ts.tail(1).item())
self.__datetime = pd.Timestamp(tstart)
self.time = self.__datetime.time()
self.date = self.__datetime.date()
self.start = self.__datetime
self.end = pd.Timestamp(tend)
self.duration = np.timedelta64(self.end - self.start)
unames = self.raw.name.unique()
raw_names = map(lambda x: x or NA, map(num2name, unames))
self.names = Series(raw_names, index=unames)
names = self.names
self.raw.name = names[self.raw.name].reset_index(drop=True)
self._name_mapper = dict(zip(names.str.lower(), names))
def _try_get_na(x):
try:
return x.item()
except (ValueError, IndexError):
return NA
fs_nona = self.raw.fs.dropna()
name_nona = self.raw.name.dropna()
diter = ((name, _try_get_na(fs_nona[name_nona == num].head(1)))
for num, name in self.names.dropna().iteritems())
self.fs = Series(dict(diter))
self.data_names = self.fs.dropna().index.values.astype(np.str_)
self.clean = clean
def __repr__(self):
objr = repr(self.__class__)
params = dict(age=self.age, name=self.name, site=self.site, obj=objr,
fs=self.fs.to_dict(), datetime=str(self.datetime),
duration=self.duration / np.timedelta64(1, 'm'))
fmt = ('{obj}\nname: {name}\ndatetime: {datetime}\nage: '
'P{age}\nsite: {site}\nfs: {fs}\n'
'duration: {duration:.2f} min')
return fmt.format(**params)
@property
def values(self):
return self.raw.values
@property
def datetime(self):
return self.__datetime.to_pydatetime()
def __getattr__(self, name):
mapper = self._name_mapper
# check to see if something similar was given
lowered_name = name.lower()
if lowered_name != name and lowered_name in mapper:
raise AttributeError('Tried to retrieve the attribute '
'\'%s\', did you mean \'%s\'?'
% (name, lowered_name))
return self._tev(mapper[name], self.clean)
@thunkify
def _raw_tsq(self):
# create the path name
tsq_name = self.path + os.extsep + self._header_ext
# read in the raw data as a numpy rec array and convert to
# DataFrame
tsq = DataFrame(np.fromfile(tsq_name, self.dtype))
inds = tsq.strobe <= np.finfo(np.float64).eps
tsq.strobe[inds] = NA
# zero based indexing
tsq.channel -= 1.0
# -1s are invalid
tsq.channel[tsq.channel == -1.0] = NA
ind = Series(self.electrode_map.shank, self.electrode_map.channel)
tsq['shank'] = ind[tsq.channel].reset_index(tsq.index, drop=True)
tsq.type = TdtEventTypes[tsq.type].values
tsq.format = TdtDataTypes[tsq.format].values
tsq.timestamp[np.logical_not(tsq.timestamp)] = NA
tsq.fs[np.logical_not(tsq.fs)] = NA
# trim the fat
dt = self.dtype
stream = tsq.type == 'stream'
tsq.size.ix[stream] -= dt.itemsize / dt['size'].itemsize
not_null_strobe = tsq.strobe.notnull()
for key in ('channel', 'sort_code', 'fp_loc'):
try:
tsq[key][not_null_strobe] = NA
except ValueError:
tsq[key] = tsq[key].astype(float)
tsq[key][not_null_strobe] = NA
with warnings.catch_warnings():
warnings.simplefilter('ignore', FutureWarning)
tsq.sort_index(axis=1, inplace=True)
return tsq
@thunkify
def _get_tsq_event(self, event_name):
"""Read the metadata (TSQ) file of a TDT Tank.
Parameters
----------
event_name : str
Returns
-------
b : pandas.DataFrame
Recording metadata
"""
tsq = self.raw
# make sure there's at least one event
p = self.path
# get the row of the metadata where its value equals the name-number
num_names = tsq.name
row = self.names[num_names].isin([event_name]).values
assert row.any(), 'no event named %s in tank: %s' % (event_name, p)
# get all the metadata for those events
tsq = tsq[row]
# convert to integer where possible
try:
tsq.channel = tsq.channel.astype(int)
tsq.shank = tsq.shank.astype(int)
except ValueError:
pass
first_row = row.argmax()
return tsq, tsq.format[first_row], tsq.size[first_row]
def tsq(self, event_name):
return self._get_tsq_event(event_name)()
@property
def raw(self):
return self._raw_tsq()()
def _tev(self, event_name, clean=True):
"""Return the data from a particular event.
Parameters
----------
event_name : str
The name of the event whose data you'd like to retrieve.
Returns
-------
tev : SpikeDataFrame
The raw data from the TEV file
"""
return self._read_tev(event_name, clean)()
@thunkify
def _read_tev(self, event_name, clean):
"""Read an event from a TDT Tank tev file.
Parameters
----------
event_name : str
Returns
-------
d : SpikeDataFrame
Raises
------
ValueError
If there are duplicate file pointer locations
See Also
--------
span.tdt.SpikeDataFrame
"""
meta, dtype, block_size = self.tsq(event_name)
nchannels = meta.channel.dropna().nunique()
nblocks = meta.shape[0]
nsamples = nblocks * block_size // nchannels
# raw ndarray for data
spikes = DataFrame(np.empty((nblocks, block_size), dtype=dtype))
tev_name = self.path + os.extsep + self._raw_ext
with warnings.catch_warnings():
warnings.simplefilter('ignore', FutureWarning)
meta.reset_index(drop=True, inplace=True)
# convert timestamps to datetime objects
meta.timestamp = fromtimestamp(meta.timestamp)
meta.fp_loc = meta.fp_loc.astype(int)
index = _create_ns_datetime_index(self.datetime, self.fs[event_name],
nsamples)
sdf = _read_tev(tev_name, meta, block_size, spikes, index,
self.electrode_map, clean)
sdf.isclean = clean
return sdf
PandasTank = TdtTank
def _create_ns_datetime_index(start, fs, nsamples, name='datetime'):
"""Create a DatetimeIndex in nanoseconds
Parameters
----------
start : datetime
fs : float
nsamples : int
name : str, optional
returns
-------
index : DatetimeIndex
"""
ns = int(1e9 / fs)
dtstart = np.datetime64(start)
dt = dtstart + np.arange(nsamples) * np.timedelta64(ns, 'ns')
freq = ns * pd.datetools.Nano()
return DatetimeIndex(dt, freq=freq, name=name, tz=LOCAL_TZ)
def _reshape_spikes(df, group_inds):
out = df.take(group_inds, axis=0)
shp = out.shape
shpsrt = np.argsort(shp)[::-1]
nchannels = shp[shpsrt[-1]]
return out.transpose(shpsrt).reshape(out.size // nchannels, -1)
def _read_tev(filename, meta, block_size, spikes, index, electrode_map, clean):
assert isinstance(filename, basestring), 'filename must be a string'
assert isinstance(block_size, (numbers.Integral, np.integer)), \
'block_size must be an integer'
assert ispower2(block_size), 'block_size must be a power of 2'
assert isinstance(spikes, DataFrame), 'spikes must be a DataFrame'
assert spikes.shape[1] == block_size, \
'number of columns of spikes must equal block_size'
assert isinstance(index, pd.Index), 'index must be an instance of Index'
assert clean in (0, 1, False, True), 'clean must be a boolean or 0 or 1'
return _read_tev_impl(filename, meta, block_size, spikes, index,
electrode_map, clean)
_raw_reader = _read_tev_raw
def _read_tev_impl(filename, meta, block_size, spikes, index, electrode_map,
clean):
fp_loc, channel = meta.fp_loc, meta.channel
_raw_reader(filename, fp_loc, block_size, spikes.values)
items = spikes.groupby(channel).indices.items()
items.sort()
d = OrderedDict(items)
group_inds = np.column_stack(d.itervalues())
reshaped = _reshape_spikes(spikes.values, group_inds)
raw = reshaped.take(electrode_map.channel, axis=1)
df = SpikeDataFrame(raw, index, electrode_map.index, dtype=float)
return remove_first_pc(df) if clean else df
if __name__ == '__main__':
from span import ElectrodeMap, NeuroNexusMap
span_data_path = os.environ['SPAN_DATA_PATH']
elec_map = ElectrodeMap(NeuroNexusMap.values, 50, 125)
f = os.path.join(span_data_path, 'Spont_Spikes_091210_p17rat_s4_657umV')
tank = PandasTank(f, elec_map)
sp = tank.spik