diff --git a/brainbox/io/one.py b/brainbox/io/one.py index 1445ea0d6..b813f1f76 100644 --- a/brainbox/io/one.py +++ b/brainbox/io/one.py @@ -1204,7 +1204,7 @@ def timesprobe2times(self, values, direction='forward'): elif direction == 'reverse': return self._sync['reverse'](values) / self._sync['fs'] - def samples2times(self, values, direction='forward'): + def samples2times(self, values, direction='forward', band='ap'): """ Converts ephys sample values to session main clock seconds :param values: numpy array of times in seconds or samples to resync @@ -1212,6 +1212,8 @@ def samples2times(self, values, direction='forward'): (seconds main time to samples probe time) :return: """ + if band == 'lf': + values *= 12 self._get_probe_info() return self._sync[direction](values) diff --git a/ibllib/pipes/dynamic_pipeline.py b/ibllib/pipes/dynamic_pipeline.py index a7f64e8e4..c8e6ea119 100644 --- a/ibllib/pipes/dynamic_pipeline.py +++ b/ibllib/pipes/dynamic_pipeline.py @@ -20,6 +20,7 @@ :class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module. """ + import logging import re from fnmatch import fnmatch @@ -71,7 +72,7 @@ def acquisition_description_legacy_session(session_path, save=False): def get_acquisition_description(protocol): - """" + """ " This is a set of example acquisition descriptions for experiments - choice_world_recording - choice_world_biased @@ -80,7 +81,7 @@ def get_acquisition_description(protocol): - choice_world_passive That are part of the IBL pipeline """ - if 'ephys' in protocol: # canonical ephys + if 'ephys' in protocol: # canonical ephys devices = { 'cameras': { 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, @@ -89,38 +90,32 @@ def get_acquisition_description(protocol): }, 'neuropixel': { 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, - 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} - }, - 'microphone': { - 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} + 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}, }, + 'microphone': {'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}}, } acquisition_description = { # this is the current ephys pipeline description 'devices': devices, 'tasks': [ {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, - {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}} + {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}, ], - 'sync': { - 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} - }, + 'sync': {'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}}, 'procedures': ['Ephys recording with acute probe(s)'], - 'projects': ['ibl_neuropixel_brainwide_01'] + 'projects': ['ibl_neuropixel_brainwide_01'], } else: devices = { 'cameras': { 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, }, - 'microphone': { - 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} - }, + 'microphone': {'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}}, } acquisition_description = { # this is the current ephys pipeline description 'devices': devices, 'sync': {'bpod': {'collection': 'raw_behavior_data'}}, 'procedures': ['Behavior training/tasks'], - 'projects': ['ibl_neuropixel_brainwide_01'] + 'projects': ['ibl_neuropixel_brainwide_01'], } if 'biased' in protocol: key = 'biasedChoiceWorld' @@ -130,10 +125,7 @@ def get_acquisition_description(protocol): key = 'habituationChoiceWorld' else: raise ValueError(f'Unknown protocol "{protocol}"') - acquisition_description['tasks'] = [{key: { - 'collection': 'raw_behavior_data', - 'sync_label': 'bpod' - }}] + acquisition_description['tasks'] = [{key: {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}] acquisition_description['version'] = '1.0.0' return acquisition_description @@ -224,7 +216,7 @@ def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=Non kwargs = {'session_path': session_path, 'one': one} # Syncing tasks - (sync, sync_args), = acquisition_description['sync'].items() + ((sync, sync_args),) = acquisition_description['sync'].items() sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments sync_args['sync_ext'] = sync_args.pop('extension', None) @@ -268,15 +260,16 @@ def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=Non else: # lookup in the project extraction repo if we find an extractor class import projects.extraction_tasks + if hasattr(projects.extraction_tasks, extractor): task = getattr(projects.extraction_tasks, extractor) elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()): task = getattr(btasks, extractor + sync_label.capitalize()) else: raise NotImplementedError( - f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects') - _logger.debug('%s (protocol #%i, task #%i) = %s.%s', - protocol, i, j, task.__module__, task.__name__) + f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects' + ) + _logger.debug('%s (protocol #%i, task #%i) = %s.%s', protocol, i, j, task.__module__, task.__name__) # Rename the class to something more informative task_name = f'{task.__name__}_{i:02}' if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')): @@ -314,13 +307,16 @@ def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=Non raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') compute_status = True tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( - **kwargs, **task_kwargs) + **kwargs, **task_kwargs + ) parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( - **kwargs, **sync_kwargs, **task_kwargs, parents=parents) + **kwargs, **sync_kwargs, **task_kwargs, parents=parents + ) if compute_status: - tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( - btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) + tasks[f'TrainingStatus_{protocol}_{i:02}'] = type( + f'TrainingStatus_{protocol}_{i:02}', (btasks.TrainingStatus,), {} + )(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) return tasks @@ -411,11 +407,12 @@ def make_pipeline(session_path, **pkwargs): kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} # Registers the experiment description file - tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', - (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) + tasks['ExperimentDescriptionRegisterRaw'] = type( + 'ExperimentDescriptionRegisterRaw', (bstasks.ExperimentDescriptionRegisterRaw,), {} + )(**kwargs) # Syncing tasks - (sync, sync_args), = acquisition_description['sync'].items() + ((sync, sync_args),) = acquisition_description['sync'].items() sync_args = sync_args.copy() # ensure acquisition_description unchanged sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments @@ -426,14 +423,16 @@ def make_pipeline(session_path, **pkwargs): if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( - **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) + **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']] + ) sync_tasks = [tasks[f'SyncPulses_{sync}']] elif sync_label == 'timeline': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) elif sync_label == 'nidq': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( - **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) + **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']] + ) sync_tasks = [tasks[f'SyncPulses_{sync}']] elif sync_label == 'tdms': tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) @@ -441,9 +440,7 @@ def make_pipeline(session_path, **pkwargs): pass # ATM we don't have anything for this; it may not be needed in the future # Behavior tasks - tasks.update( - _get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one')) - ) + tasks.update(_get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one'))) # Ephys tasks if 'neuropixel' in devices: @@ -463,38 +460,46 @@ def make_pipeline(session_path, **pkwargs): if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( - **kwargs, **ephys_kwargs, pname=pname) + **kwargs, **ephys_kwargs, pname=pname + ) all_probes.append(pname) register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) elif nptype == 'NP2.4' and nshanks > 1: tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( - **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) + **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks + ) register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] else: tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( - **kwargs, **ephys_kwargs, pname=pname) + **kwargs, **ephys_kwargs, pname=pname + ) register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) all_probes.append(pname) if nptype == '3A': tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( - **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) + **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks + ) for pname in all_probes: register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] if nptype != '3A': tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( - **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) + **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks + ) tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( - **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) + **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']] + ) else: tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( - **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) + **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']] + ) tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( - **kwargs, **ephys_kwargs, pname=pname, parents=register_task) + **kwargs, **ephys_kwargs, pname=pname, parents=register_task + ) # Video tasks if 'cameras' in devices: @@ -508,35 +513,33 @@ def make_pipeline(session_path, **pkwargs): tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) dlc_parent_task = tasks['VideoConvert'] tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( - **kwargs, **video_kwargs, **sync_kwargs) + **kwargs, **video_kwargs, **sync_kwargs + ) else: - tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( - **kwargs, **video_kwargs) - tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( - **kwargs, **video_kwargs, **sync_kwargs) + tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})(**kwargs, **video_kwargs) + tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})(**kwargs, **video_kwargs, **sync_kwargs) dlc_parent_task = tasks['VideoCompress'] if sync == 'bpod': tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( - **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']]) + **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + ) elif sync == 'nidq': # Here we restrict to videos that we support (left, right or body) video_kwargs['cameras'] = subset_cams tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( - **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) + **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks + ) if sync_kwargs['sync'] != 'bpod': # Here we restrict to videos that we support (left, right or body) # Currently there is no plan to run DLC on the belly cam subset_cams = [c for c in cams if c in ('left', 'right', 'body')] video_kwargs['cameras'] = subset_cams - tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( - **kwargs, **video_kwargs, parents=[dlc_parent_task]) + tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})(**kwargs, **video_kwargs, parents=[dlc_parent_task]) # The PostDLC plots require a trials object for QC # Find the first task that outputs a trials.table dataset - trials_task = ( - t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', [])) - ) + trials_task = (t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', []))) if trials_task := next(trials_task, None): parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] trials_collection = getattr(trials_task, 'output_collection', 'alf') @@ -544,55 +547,81 @@ def make_pipeline(session_path, **pkwargs): parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']] trials_collection = 'alf' tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( - **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents) + **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents + ) # Audio tasks if 'microphone' in devices: - (microphone, micro_kwargs), = devices['microphone'].items() + ((microphone, micro_kwargs),) = devices['microphone'].items() micro_kwargs['device_collection'] = micro_kwargs.pop('collection') if sync_kwargs['sync'] == 'bpod': tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( - **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) + **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'] + ) elif sync_kwargs['sync'] == 'nidq': tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) # Widefield tasks if 'widefield' in devices: - (_, wfield_kwargs), = devices['widefield'].items() + ((_, wfield_kwargs),) = devices['widefield'].items() wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( - **kwargs, **wfield_kwargs) + **kwargs, **wfield_kwargs + ) tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( - **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) + **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']] + ) tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( - **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) + **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']] + ) tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( - **kwargs, **wfield_kwargs, **sync_kwargs, - parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) + **kwargs, + **wfield_kwargs, + **sync_kwargs, + parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks, + ) tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( - **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) + **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']] + ) # Mesoscope tasks if 'mesoscope' in devices: - (_, mscope_kwargs), = devices['mesoscope'].items() + ((_, mscope_kwargs),) = devices['mesoscope'].items() mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( - **kwargs, **mscope_kwargs) + **kwargs, **mscope_kwargs + ) tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( - **kwargs, **mscope_kwargs) + **kwargs, **mscope_kwargs + ) tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( - **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) + **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']] + ) tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( - **kwargs, **mscope_kwargs, **sync_kwargs) + **kwargs, **mscope_kwargs, **sync_kwargs + ) tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( - **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) + **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']] + ) if 'neurophotometrics' in devices: - # {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000', - # 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1} - photometry_kwargs = devices['neurophotometrics'] - tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', ( - ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs) + # note: devices['neurophotometrics'] is the acquisition_description + sync_mode = devices['neurophotometrics'].get('sync_mode', 'bpod') # default to bpod for downward compatibility + match sync_mode: + case 'bpod': + # for synchronization with the BNC inputs of the neurophotometrics receiving the sync pulses + # from the individual bpods + tasks['FibrePhotometryBpodSync'] = type('FibrePhotometryBpodSync', (ptasks.FibrePhotometryBpodSync,), {})( + **devices['neurophotometrics'], + **kwargs, + ) + case 'daqami': + # for synchronization with the DAQami receiving the sync pulses from the individual bpods + # as well as the frame clock from the FP3002 + tasks['FibrePhotometryDAQSync'] = type('FibrePhotometryDAQSync', (ptasks.FibrePhotometryDAQSync,), {})( + **devices['neurophotometrics'], + **kwargs, + ) p = mtasks.Pipeline(session_path=session_path, **pkwargs) p.tasks = tasks diff --git a/ibllib/pipes/neurophotometrics.py b/ibllib/pipes/neurophotometrics.py index 18f558c59..bf0e3d43d 100644 --- a/ibllib/pipes/neurophotometrics.py +++ b/ibllib/pipes/neurophotometrics.py @@ -1,186 +1,286 @@ -"""Extraction tasks for fibrephotometry""" - import logging +from pathlib import Path import numpy as np import pandas as pd +from typing import Tuple import ibldsp.utils import ibllib.io.session_params from ibllib.pipes import base_tasks from iblutil.io import jsonable +from nptdms import TdmsFile + +from abc import abstractmethod +from iblphotometry import io as fpio + _logger = logging.getLogger('ibllib') -""" -Neurophotometrics FP3002 specific information. -The light source map refers to the available LEDs on the system. -The flags refers to the byte encoding of led states in the system. -""" -LIGHT_SOURCE_MAP = { - 'color': ['None', 'Violet', 'Blue', 'Green'], - 'wavelength': [0, 415, 470, 560], - 'name': ['None', 'Isosbestic', 'GCaMP', 'RCaMP'], -} - -LED_STATES = { - 'Condition': { - 0: 'No additional signal', - 1: 'Output 1 signal HIGH', - 2: 'Output 0 signal HIGH', - 3: 'Stimulation ON', - 4: 'GPIO Line 2 HIGH', - 5: 'GPIO Line 3 HIGH', - 6: 'Input 1 HIGH', - 7: 'Input 0 HIGH', - 8: 'Output 0 signal HIGH + Stimulation', - 9: 'Output 0 signal HIGH + Input 0 signal HIGH', - 10: 'Input 0 signal HIGH + Stimulation', - 11: 'Output 0 HIGH + Input 0 HIGH + Stimulation', - }, - 'No LED ON': {0: 0, 1: 8, 2: 16, 3: 32, 4: 64, 5: 128, 6: 256, 7: 512, 8: 48, 9: 528, 10: 544, 11: 560}, - 'L415': {0: 1, 1: 9, 2: 17, 3: 33, 4: 65, 5: 129, 6: 257, 7: 513, 8: 49, 9: 529, 10: 545, 11: 561}, - 'L470': {0: 2, 1: 10, 2: 18, 3: 34, 4: 66, 5: 130, 6: 258, 7: 514, 8: 50, 9: 530, 10: 546, 11: 562}, - 'L560': {0: 4, 1: 12, 2: 20, 3: 36, 4: 68, 5: 132, 6: 260, 7: 516, 8: 52, 9: 532, 10: 548, 11: 564} -} - - -def _channel_meta(light_source_map=None): - """ - Return table of light source wavelengths and corresponding colour labels. - - Parameters - ---------- - light_source_map : dict - An optional map of light source wavelengths (nm) used and their corresponding colour name. - - Returns - ------- - pandas.DataFrame - A sorted table of wavelength and colour name. - """ - light_source_map = light_source_map or LIGHT_SOURCE_MAP - meta = pd.DataFrame.from_dict(light_source_map) - meta.index.rename('channel_id', inplace=True) - return meta - - -class FibrePhotometrySync(base_tasks.DynamicTask): +def extract_timestamps_from_tdms_file(tdms_filepath: Path) -> dict: + # extractor for tdms files as written by the daqami software, configured + # for neurophotometrics experiments: Frameclock is in AI7, DI1-4 are the + # bpod sync signals + + tdms_file = TdmsFile.read(tdms_filepath) + groups = tdms_file.groups() + # this unfortunate hack is in here because there are a bunch of sessions where the frameclock is on DI0 + if len(groups) == 1: + has_analog_group = False + (digital_group,) = groups + if len(groups) == 2: + has_analog_group = True + analog_group, digital_group = groups + fs = digital_group.properties['ScanRate'] # this should be 10kHz + df = tdms_file.as_dataframe() + col = df.columns[-1] + vals = df[col].values.astype('int64') + columns = ['DI0', 'DI1', 'DI2', 'DI3'] + + # ugly but basically just a binary decoder for the binary data + # assumes 4 channels + data = np.array([list(bin(v)[2:].zfill(4)[::-1]) for v in vals], dtype='int64') + timestamps = {} + for i, name in enumerate(columns): + signal = data[:, i] + timestamps[name] = np.where(np.diff(signal) == 1)[0] / fs + + if has_analog_group: + # frameclock data is recorded on an analog channel + for channel in analog_group.channels(): + signal = (channel.data > 2.5).astype('int64') # assumes 0-5V + timestamps[channel.name] = np.where(np.diff(signal) == 1)[0] / fs + + return timestamps + + +class FibrePhotometryBaseSync(base_tasks.DynamicTask): + # base clas for syncing fibre photometry + # derived classes are: FibrePhotometryBpodSync and FibrePhotometryDAQSync priority = 90 job_size = 'small' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.device_collection = self.get_device_collection( - 'neurophotometrics', device_collection='raw_photometry_data') + self.photometry_collection = kwargs['collection'] # raw_photometry_data + self.kwargs = kwargs + # we will work with the first protocol here for task in self.session_params['tasks']: self.task_protocol = next(k for k in task) self.task_collection = ibllib.io.session_params.get_task_collection(self.session_params, self.task_protocol) break - @property - def signature(self): - signature = { - 'input_files': [('_neurophotometrics_fpData.raw.pqt', self.device_collection, True, True), - ('_iblrig_taskData.raw.jsonable', self.task_collection, True, True), - ('_neurophotometrics_fpData.channels.csv', self.device_collection, True, True), - ('_neurophotometrics_fpData.digitalIntputs.pqt', self.device_collection, True)], - 'output_files': [('photometry.signal.pqt', 'alf/photometry', True), - ('photometryROI.locations.pqt', 'alf/photometry', True)] - } - return signature - - def _sync_bpod_neurophotometrics(self): - """ - Perform the linear clock correction between bpod and neurophotometrics timestamps. - :return: interpolation function that outputs bpod timestamsp from neurophotometrics timestamps - """ - folder_raw_photometry = self.session_path.joinpath(self.device_collection) - df_digital_inputs = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt')) - # normally we should disregard the states and use the sync label. But bpod doesn't log TTL outs, - # only the states. This will change in the future but for now we are stuck with this. + def _get_bpod_timestamps(self) -> Tuple[np.ndarray, list]: + # the timestamps for syncing, in the time of the bpod if 'habituation' in self.task_protocol: sync_states_names = ['iti', 'reward'] else: sync_states_names = ['trial_start', 'reward', 'exit_state'] + # read in the raw behaviour data for syncing file_jsonable = self.session_path.joinpath(self.task_collection, '_iblrig_taskData.raw.jsonable') - trials_table, bpod_data = jsonable.load_task_jsonable(file_jsonable) + _, bpod_data = jsonable.load_task_jsonable(file_jsonable) + # we get the timestamps of the states from the bpod data - tbpod = [] - for sname in sync_states_names: - tbpod.append(np.array( - [bd['States timestamps'][sname][0][0] + bd['Trial start timestamp'] for bd in bpod_data if - sname in bd['States timestamps']])) - tbpod = np.sort(np.concatenate(tbpod)) - tbpod = tbpod[~np.isnan(tbpod)] - # we get the timestamps for the photometry data - tph = df_digital_inputs['SystemTimestamp'].values[df_digital_inputs['Channel'] == self.kwargs['sync_channel']] - tph = tph[15:] # TODO: we may want to detect the spacers before removing it, especially for successive sessions + timestamps_bpod = [] + for sync_name in sync_states_names: + timestamps_bpod.append( + np.array( + [ + data['States timestamps'][sync_name][0][0] + data['Trial start timestamp'] + for data in bpod_data + if sync_name in data['States timestamps'] + ] + ) + ) + timestamps_bpod = np.sort(np.concatenate(timestamps_bpod)) + timestamps_bpod = timestamps_bpod[~np.isnan(timestamps_bpod)] + return timestamps_bpod, bpod_data + + @abstractmethod + def _get_neurophotometrics_timestamps(self) -> np.ndarray: + # this function needs to be implemented in the derived classes: + # for bpod based syncing, the timestamps are in the digial inputs file + # for daq based syncing, the timestamps are extracted from the tdms file + ... + + def _get_sync_function(self) -> Tuple[callable, list]: + # returns the synchronization function + + # get the timestamps + timestamps_bpod, bpod_data = self._get_bpod_timestamps() + timestamps_nph = self._get_neurophotometrics_timestamps() + # sync the behaviour events to the photometry timestamps - fcn_nph_to_bpod_times, drift_ppm, iph, ibpod = ibldsp.utils.sync_timestamps( - tph, tbpod, return_indices=True, linear=True) - # then we check the alignment, should be less than the screen refresh rate - tcheck = fcn_nph_to_bpod_times(tph[iph]) - tbpod[ibpod] + sync_nph_to_bpod_fcn, drift_ppm, ix_nph, ix_bpod = ibldsp.utils.sync_timestamps( + timestamps_nph, timestamps_bpod, return_indices=True, linear=True + ) + # TODO log drift + + # then we check the alignment, should be less than the camera sampling rate + tcheck = sync_nph_to_bpod_fcn(timestamps_nph[ix_nph]) - timestamps_bpod[ix_bpod] _logger.info( - f'sync: n trials {len(bpod_data)}, n bpod sync {len(tbpod)}, n photometry {len(tph)}, n match {len(iph)}') + f'sync: n trials {len(bpod_data)}' + f'n bpod sync {len(timestamps_bpod)}' + f'n photometry {len(timestamps_nph)}, n match {len(ix_nph)}' + ) + # TODO the framerate here is hardcoded, infer it instead! assert np.all(np.abs(tcheck) < 1 / 60), 'Sync issue detected, residual above 1/60s' - assert len(iph) / len(tbpod) > 0.95, 'Sync issue detected, less than 95% of the bpod events matched' + assert len(ix_nph) / len(timestamps_bpod) > 0.95, 'Sync issue detected, less than 95% of the bpod events matched' valid_bounds = [bpod_data[0]['Trial start timestamp'] - 2, bpod_data[-1]['Trial end timestamp'] + 2] - return fcn_nph_to_bpod_times, valid_bounds - - def _run(self, **kwargs): - """ - Extract photometry data from the raw neurophotometrics data in parquet - The extraction has 3 main steps: - 1. Synchronise the bpod and neurophotometrics timestamps. - 2. Extract the photometry data from the raw neurophotometrics data. - 3. Label the fibers correspondance with brain regions in a small table - :param kwargs: - :return: - """ - # 1) sync: we check the synchronisation, right now we only have bpod but soon the daq will be used - match list(self.session_params['sync'].keys())[0]: - case 'bpod': - fcn_nph_to_bpod_times, valid_bounds = self._sync_bpod_neurophotometrics() - case _: - raise NotImplementedError('Syncing with daq is not supported yet.') - # 2) reformat the raw data with wavelengths and meta-data - folder_raw_photometry = self.session_path.joinpath(self.device_collection) - fp_data = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.raw.pqt')) - # Load channels and wavelength information - channel_meta_map = _channel_meta() - if (fn := folder_raw_photometry.joinpath('_neurophotometrics_fpData.channels.csv')).exists(): - led_states = pd.read_csv(fn) - else: - led_states = pd.DataFrame(LED_STATES) - led_states = led_states.set_index('Condition') - # Extract signal columns into 2D array - rois = list(self.kwargs['fibers'].keys()) - out_df = fp_data.filter(items=rois, axis=1).sort_index(axis=1) - out_df['times'] = fcn_nph_to_bpod_times(fp_data['SystemTimestamp']) - out_df['valid'] = np.logical_and(out_df['times'] >= valid_bounds[0], out_df['times'] <= valid_bounds[1]) - out_df['wavelength'] = np.nan - out_df['name'] = '' - out_df['color'] = '' - # Extract channel index - states = fp_data.get('LedState', fp_data.get('Flags', None)) - for state in states.unique(): - ir, ic = np.where(led_states == state) - if ic.size == 0: - continue - for cn in ['name', 'color', 'wavelength']: - out_df.loc[states == state, cn] = channel_meta_map.iloc[ic[0]][cn] - # 3) label the brain regions + + return sync_nph_to_bpod_fcn, valid_bounds + + def load_data(self) -> pd.DataFrame: + # loads the raw photometry data + raw_photometry_folder = self.session_path / self.photometry_collection + raw_neurophotometrics_df = pd.read_parquet(raw_photometry_folder / '_neurophotometrics_fpData.raw.pqt') + return raw_neurophotometrics_df + + def _run(self, **kwargs) -> Tuple[pd.DataFrame, pd.DataFrame]: + # 1) load photometry data + + # note: when loading daq based syncing, the SystemTimestamp column + # will be overridden with the timestamps from the tdms file + # the idea behind this is that the rest of the sync is then the same + # and handled by this base class + raw_df = self.load_data() + + # 2) get the synchronization function + sync_nph_to_bpod_fcn, valid_bounds = self._get_sync_function() + + # 3) convert to ibl_df + ibl_df = fpio.from_raw_neurophotometrics_df_to_ibl_df(raw_df, rois=self.kwargs['fibers'], drop_first=False) + + # 3) apply synchronization + ibl_df['times'] = sync_nph_to_bpod_fcn(raw_df['SystemTimestamp']) + ibl_df['valid'] = np.logical_and(ibl_df['times'] >= valid_bounds[0], ibl_df['times'] <= valid_bounds[1]) + + # 4) write to disk + output_folder = self.session_path.joinpath('alf', 'photometry') + output_folder.mkdir(parents=True, exist_ok=True) + + # writing the synced photometry signal + ibl_df_outpath = output_folder / 'photometry.signal.pqt' + ibl_df.to_parquet(ibl_df_outpath) + + # writing the locations rois = [] - c = 0 for k, v in self.kwargs['fibers'].items(): - rois.append({'ROI': k, 'fiber': f'fiber{c:02d}', 'brain_region': v['location']}) - df_rois = pd.DataFrame(rois).set_index('ROI') - # to finish we write the dataframes to disk - out_path = self.session_path.joinpath('alf', 'photometry') - out_path.mkdir(parents=True, exist_ok=True) - out_df.to_parquet(file_signal := out_path.joinpath('photometry.signal.pqt')) - df_rois.to_parquet(file_locations := out_path.joinpath('photometryROI.locations.pqt')) - return file_signal, file_locations + rois.append({'ROI': k, 'fiber': f'fiber_{v["location"]}', 'brain_region': v['location']}) + locations_df = pd.DataFrame(rois).set_index('ROI') + locations_df_outpath = output_folder / 'photometryROI.locations.pqt' + locations_df.to_parquet(locations_df_outpath) + return ibl_df, locations_df + + +class FibrePhotometryBpodSync(FibrePhotometryBaseSync): + priority = 90 + job_size = 'small' + + @property + def signature(self): + signature = { + 'input_files': [ + ('_neurophotometrics_fpData.raw.pqt', self.photometry_collection, True, True), + ('_iblrig_taskData.raw.jsonable', self.task_collection, True, True), + ('_neurophotometrics_fpData.channels.csv', self.photometry_collection, True, True), + ('_neurophotometrics_fpData.digitalIntputs.pqt', self.photometry_collection, True), + ], + 'output_files': [ + ('photometry.signal.pqt', 'alf/photometry', True), + ('photometryROI.locations.pqt', 'alf/photometry', True), + ], + } + return signature + + def _get_neurophotometrics_timestamps(self) -> np.ndarray: + # for bpod based syncing, the timestamps for syncing are in the digital inputs file + raw_photometry_folder = self.session_path / self.photometry_collection + digital_inputs_df = pd.read_parquet(raw_photometry_folder / '_neurophotometrics_fpData.digitalIntputs.pqt') + timestamps_nph = digital_inputs_df['SystemTimestamp'].values[digital_inputs_df['Channel'] == self.kwargs['sync_channel']] + + # TODO replace this rudimentary spacer removal + # to implement: detect spacer / remove spacer methods + timestamps_nph = timestamps_nph[15:] + return timestamps_nph + + +class FibrePhotometryDAQSync(FibrePhotometryBaseSync): + priority = 90 + job_size = 'small' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sync_kwargs = kwargs['sync_metadata'] + self.sync_channel = kwargs['sync_channel'] + + @property + def signature(self): + signature = { + 'input_files': [ + ('_neurophotometrics_fpData.raw.pqt', self.photometry_collection, True, True), + ('_iblrig_taskData.raw.jsonable', self.task_collection, True, True), + ('_neurophotometrics_fpData.channels.csv', self.photometry_collection, True, True), + ('_mcc_DAQdata.raw.tdms', self.sync_kwargs['collection'], True, True), + ], + 'output_files': [ + ('photometry.signal.pqt', 'alf/photometry', True), + ('photometryROI.locations.pqt', 'alf/photometry', True), + ], + } + return signature + + def load_data(self) -> pd.DataFrame: + # the point of this functions is to overwrite the SystemTimestamp column + # in the ibl_df with the values from the DAQ clock + # then syncing will work the same as for the bpod based syncing + raw_df = super().load_data() + + # get daqami timestamps + tdms_filepath = self.session_path / self.sync_kwargs['collection'] / '_mcc_DAQdata.raw.tdms' + self.timestamps = extract_timestamps_from_tdms_file(tdms_filepath) + # downward compatibility - frameclock moved around, now is back on the AI7 + # was specified with int before. if int + try: + int(self.sync_kwargs['frameclock_channel']) + sync_channel_name = f'DI{self.sync_kwargs["frameclock_channel"]}' + except ValueError: + sync_channel_name = self.sync_kwargs['frameclock_channel'] + frame_timestamps = self.timestamps[sync_channel_name] + + # compare number of frame timestamps + # and put them in the raw_df SystemTimestamp column + if raw_df.shape[0] == frame_timestamps.shape[0]: + raw_df['SystemTimestamp'] = frame_timestamps + elif raw_df.shape[0] == frame_timestamps.shape[0] + 1: + # there is one extra frame timestamp from the last incomplete frame + raw_df['SystemTimestamp'] = frame_timestamps[:-1] + elif raw_df.shape[0] > frame_timestamps.shape[0]: + # the daqami was stopped / closed before bonsai + # we discard all frames that can not be mapped + _logger.warning(f'#frames bonsai: {raw_df.shape[0]} > #frames daqami {frame_timestamps.shape[0]}, dropping excess') + raw_df = raw_df.iloc[: frame_timestamps.shape[0]] + + elif raw_df.shape[0] < frame_timestamps.shape[0]: + # this should not be possible + raise ValueError('more timestamps for frames recorded by the daqami than frames were recorded by bonsai.') + return raw_df + + def _get_neurophotometrics_timestamps(self) -> np.ndarray: + # get the sync channel + # again the ugly downward compatibility hack + try: + int(self.sync_kwargs['frameclock_channel']) + sync_channel_name = f'DI{self.sync_kwargs["frameclock_channel"]}' + except ValueError: + sync_channel_name = self.sync_kwargs['frameclock_channel'] + + # and the corresponding timestamps + timestamps_nph = self.timestamps[sync_channel_name] + + # TODO replace this rudimentary spacer removal + # to implement: detect spacer / remove spacer methods + timestamps_nph = timestamps_nph[15:] + return timestamps_nph diff --git a/ibllib/tests/fixtures/neurophotometrics/_ibl_experiment.description.yaml b/ibllib/tests/fixtures/neurophotometrics/_ibl_experiment.description.yaml new file mode 100644 index 000000000..8a39783cb --- /dev/null +++ b/ibllib/tests/fixtures/neurophotometrics/_ibl_experiment.description.yaml @@ -0,0 +1,35 @@ +devices: + cameras: + left: + collection: raw_video_data + sync_label: audio + microphone: + microphone: + collection: raw_task_data_00 + sync_label: audio + neurophotometrics: + collection: raw_photometry_data + datetime: '2025-05-26T15:08:40.237101' + fibers: + G0: + location: VTA + sync_channel: 2 + sync_metadata: + acquisition_software: daqami + collection: raw_photometry_data + frameclock_channel: 7 + sync_mode: daqami +procedures: +- Fiber photometry +projects: +- ibl_fibrephotometry +- practice +sync: + bpod: + acquisition_software: pybpod + collection: raw_task_data_00 + extension: .jsonable +tasks: +- _iblrig_tasks_advancedChoiceWorld: + collection: raw_task_data_00 +version: 1.0.0 diff --git a/ibllib/tests/test_neurophotometrics.py b/ibllib/tests/test_neurophotometrics.py new file mode 100644 index 000000000..fcad9d379 --- /dev/null +++ b/ibllib/tests/test_neurophotometrics.py @@ -0,0 +1,37 @@ +"""Tests for ibllib.pipes.mesoscope_tasks.""" + +import sys +import unittest +from unittest import mock +import tempfile +from pathlib import Path + + +from ibllib.io import session_params + +# Mock suit2p which is imported in MesoscopePreprocess +attrs = {'default_ops.return_value': {}} +sys.modules['suite2p'] = mock.MagicMock(**attrs) + + +class TestNeurophotometricsExtractor(unittest.TestCase): + """ + this class tests + that the correct extractor is run based on the experiment description file + this requires the setup to have + + """ + + def setUp(self) -> None: + self.tmp_folder = tempfile.TemporaryDirectory() + self.session_folder = Path(self.tmp_folder.name) / 'subject' / '2020-01-01' / '001' + self.raw_photometry_folder = self.session_folder / 'raw_photometry_data' + self.raw_photometry_folder.mkdir(parents=True) + + def test_bpod_extractor(self): + path = Path(__file__).parent / 'fixtures' / 'neurophotometrics' / '_ibl_experiment_description_bpod.yaml' + self.experiment_description = session_params.read_params(path) + + def test_daqami_extractor(self): + path = Path(__file__).parent / 'fixtures' / 'neurophotometrics' / '_ibl_experiment_description_bpod.yaml' + self.experiment_description = session_params.read_params(path) diff --git a/requirements.txt b/requirements.txt index 9649c1e1c..089e2bc9e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,3 +33,4 @@ seaborn>=0.9.0 slidingRP>=1.1.1 # steinmetz lab refractory period metrics sparse tqdm>=4.32.1 +ibl-photometry