Coverage for ibllib/pipes/photometry_tasks.py: 76%

46 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""Extraction tasks for fibrephotometry""" 

2 

3import logging 

4from collections import OrderedDict 

5 

6from ibllib.pipes import tasks, base_tasks 

7import ibllib.pipes.training_preprocessing as tpp 

8from ibllib.io.extractors.fibrephotometry import FibrePhotometry 

9 

10_logger = logging.getLogger('ibllib') 

11 

12 

13class FibrePhotometryRegisterRaw(base_tasks.RegisterRawDataTask): 

14 

15 priority = 100 

16 job_size = 'small' 

17 

18 def __init__(self, *args, **kwargs): 

19 super().__init__(*args, **kwargs) 1ce

20 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1ce

21 self.device_collection = self.get_device_collection('photometry', device_collection='raw_photometry_data') 1ce

22 

23 @property 

24 def signature(self): 

25 signature = { 1e

26 'input_files': [], 

27 'output_files': [('_mcc_DAQdata.raw.tdms', self.device_collection, True), 

28 ('_neurophotometrics_fpData.raw.pqt', self.device_collection, True)] 

29 } 

30 return signature 1e

31 

32 

33class FibrePhotometryPreprocess(base_tasks.DynamicTask): 

34 @property 

35 def signature(self): 

36 signature = { 1db

37 'input_files': [('_mcc_DAQdata.raw.tdms', self.device_collection, True), 

38 ('_neurophotometrics_fpData.raw.pqt', self.device_collection, True)], 

39 'output_files': [('photometry.signal.pqt', 'alf/photometry', True)] 

40 } 

41 return signature 1db

42 

43 priority = 90 

44 level = 1 

45 

46 def __init__(self, session_path, regions=None, **kwargs): 

47 super().__init__(session_path, **kwargs) 1dcb

48 # Task collection (this needs to be specified in the task kwargs) 

49 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1dcb

50 self.device_collection = self.get_device_collection('photometry', device_collection='raw_photometry_data') 1dcb

51 self.regions = regions 1dcb

52 

53 def _run(self, **kwargs): 

54 _, out_files = FibrePhotometry(self.session_path, collection=self.device_collection).extract( 1b

55 regions=self.regions, path_out=self.session_path.joinpath('alf', 'photometry'), save=True) 

56 return out_files 1b

57 

58 

59# pipeline 

60class FibrePhotometryExtractionPipeline(tasks.Pipeline): 

61 """ 

62 This is a legacy pipeline not using the acquisition description file to acquire previous sessions at Princeton 

63 """ 

64 label = __name__ 

65 

66 def __init__(self, session_path=None, **kwargs): 

67 # FIXME This should be agnostic to task protocol, for now let's assume it's only training 

68 super().__init__(session_path, **kwargs) 

69 tasks = OrderedDict() 

70 self.session_path = session_path 

71 # level 0 

72 tasks['TrainingRegisterRaw'] = tpp.TrainingRegisterRaw(self.session_path) 

73 tasks['TrainingTrials'] = tpp.TrainingTrials(self.session_path) 

74 tasks['TrainingVideoCompress'] = tpp.TrainingVideoCompress(self.session_path) 

75 tasks['TrainingAudio'] = tpp.TrainingAudio(self.session_path) 

76 # level 1 

77 tasks['BiasedFibrePhotometry'] = FibrePhotometryPreprocess(self.session_path, parents=[tasks['TrainingTrials']]) 

78 tasks['TrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']]) 

79 tasks['TrainingDLC'] = tpp.TrainingDLC( 

80 self.session_path, parents=[tasks['TrainingVideoCompress']]) 

81 self.tasks = tasks