Coverage for ibllib/pipes/photometry_tasks.py: 76%
46 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""Extraction tasks for fibrephotometry"""
3import logging
4from collections import OrderedDict
6from ibllib.pipes import tasks, base_tasks
7import ibllib.pipes.training_preprocessing as tpp
8from ibllib.io.extractors.fibrephotometry import FibrePhotometry
10_logger = logging.getLogger('ibllib')
13class FibrePhotometryRegisterRaw(base_tasks.RegisterRawDataTask):
15 priority = 100
16 job_size = 'small'
18 def __init__(self, *args, **kwargs):
19 super().__init__(*args, **kwargs) 1ce
20 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1ce
21 self.device_collection = self.get_device_collection('photometry', device_collection='raw_photometry_data') 1ce
23 @property
24 def signature(self):
25 signature = { 1e
26 'input_files': [],
27 'output_files': [('_mcc_DAQdata.raw.tdms', self.device_collection, True),
28 ('_neurophotometrics_fpData.raw.pqt', self.device_collection, True)]
29 }
30 return signature 1e
33class FibrePhotometryPreprocess(base_tasks.DynamicTask):
34 @property
35 def signature(self):
36 signature = { 1db
37 'input_files': [('_mcc_DAQdata.raw.tdms', self.device_collection, True),
38 ('_neurophotometrics_fpData.raw.pqt', self.device_collection, True)],
39 'output_files': [('photometry.signal.pqt', 'alf/photometry', True)]
40 }
41 return signature 1db
43 priority = 90
44 level = 1
46 def __init__(self, session_path, regions=None, **kwargs):
47 super().__init__(session_path, **kwargs) 1dcb
48 # Task collection (this needs to be specified in the task kwargs)
49 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1dcb
50 self.device_collection = self.get_device_collection('photometry', device_collection='raw_photometry_data') 1dcb
51 self.regions = regions 1dcb
53 def _run(self, **kwargs):
54 _, out_files = FibrePhotometry(self.session_path, collection=self.device_collection).extract( 1b
55 regions=self.regions, path_out=self.session_path.joinpath('alf', 'photometry'), save=True)
56 return out_files 1b
59# pipeline
60class FibrePhotometryExtractionPipeline(tasks.Pipeline):
61 """
62 This is a legacy pipeline not using the acquisition description file to acquire previous sessions at Princeton
63 """
64 label = __name__
66 def __init__(self, session_path=None, **kwargs):
67 # FIXME This should be agnostic to task protocol, for now let's assume it's only training
68 super().__init__(session_path, **kwargs)
69 tasks = OrderedDict()
70 self.session_path = session_path
71 # level 0
72 tasks['TrainingRegisterRaw'] = tpp.TrainingRegisterRaw(self.session_path)
73 tasks['TrainingTrials'] = tpp.TrainingTrials(self.session_path)
74 tasks['TrainingVideoCompress'] = tpp.TrainingVideoCompress(self.session_path)
75 tasks['TrainingAudio'] = tpp.TrainingAudio(self.session_path)
76 # level 1
77 tasks['BiasedFibrePhotometry'] = FibrePhotometryPreprocess(self.session_path, parents=[tasks['TrainingTrials']])
78 tasks['TrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']])
79 tasks['TrainingDLC'] = tpp.TrainingDLC(
80 self.session_path, parents=[tasks['TrainingVideoCompress']])
81 self.tasks = tasks