Coverage for ibllib/qc/base.py: 93%
133 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1import logging
2from abc import abstractmethod
3from pathlib import Path
4from itertools import chain
6import numpy as np
7from one.api import ONE
8from one.alf import spec
10"""dict: custom sign off categories"""
11SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']}
14class QC:
15 """A base class for data quality control."""
17 def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'):
18 """
19 A base class for data quality control.
21 :param endpoint_id: Eid for endpoint. If using sessions can also be a session path
22 :param log: A logging.Logger instance, if None the 'ibllib' logger is used
23 :param one: An ONE instance for fetching and setting the QC on Alyx
24 :param endpoint: The endpoint name to apply qc to. Default is 'sessions'
25 """
26 self.one = one or ONE() 1bfghdielcamopnqr
27 self.log = log or logging.getLogger(__name__) 1bfghdielcamopnqr
28 if endpoint == 'sessions': 1bfghdielcamopnqr
29 self.endpoint = endpoint 1blcamopnqr
30 self._set_eid_or_path(endpoint_id) 1blcamopnqr
31 self.json = False 1blcamopnqr
32 else:
33 self.endpoint = endpoint 1bfghdieca
34 self._confirm_endpoint_id(endpoint_id) 1bfghdieca
36 # Ensure outcome attribute matches Alyx record
37 updatable = self.eid and self.one and not self.one.offline 1bfghdielcamopnqr
38 self._outcome = self.update('NOT_SET', namespace='') if updatable else spec.QC.NOT_SET 1bfghdielcamopnqr
39 self.log.debug(f'Current QC status is {self.outcome}') 1bfghdielcamopnqr
41 @abstractmethod
42 def run(self):
43 """Run the QC tests and return the outcome.
45 :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS"
46 """
47 pass
49 @abstractmethod
50 def load_data(self):
51 """Load the data required to compute the QC.
53 Subclasses may implement this for loading raw data.
54 """
55 pass
57 @property
58 def outcome(self):
59 """one.alf.spec.QC: The overall session outcome."""
60 return self._outcome 1bfghdielkcamopnqrj
62 @outcome.setter
63 def outcome(self, value):
64 value = spec.QC.validate(value) # ensure valid enum 1blamopn
65 if self._outcome < value: 1blamopn
66 self._outcome = value 1l
68 @staticmethod
69 def overall_outcome(outcomes: iter, agg=max) -> spec.QC:
70 """
71 Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome.
73 Example:
74 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL'
76 Parameters
77 ----------
78 outcomes : iterable of one.alf.spec.QC, str or int
79 An iterable of QC outcomes.
80 agg : function
81 Outcome code aggregate function, default is max (i.e. worst).
83 Returns
84 -------
85 one.alf.spec.QC
86 The overall outcome.
87 """
88 outcomes = filter(lambda x: x not in (None, np.nan), outcomes) 1bvwxyzABCDcEamnFqrGj
89 return agg(map(spec.QC.validate, outcomes)) 1bvwxyzABCDcEamnFqrGj
91 def _set_eid_or_path(self, session_path_or_eid):
92 """Parse a given eID or session path.
94 If a session UUID is given, resolves and stores the local path and vice versa
95 :param session_path_or_eid: A session eid or path
96 :return:
97 """
98 self.eid = None 1bulcamopnqr
99 if spec.is_uuid_string(str(session_path_or_eid)): 1bulcamopnqr
100 self.eid = session_path_or_eid 1bul
101 # Try to set session_path if data is found locally
102 self.session_path = self.one.eid2path(self.eid) 1bul
103 elif spec.is_session_path(session_path_or_eid): 1bucamopnqr
104 self.session_path = Path(session_path_or_eid) 1bucamopnqr
105 if self.one is not None: 1bucamopnqr
106 self.eid = self.one.path2eid(self.session_path) 1bucamopnqr
107 if not self.eid: 1bucamopnqr
108 self.log.warning('Failed to determine eID from session path')
109 else:
110 self.log.error('Cannot run QC: an experiment uuid or session path is required') 1u
111 raise ValueError("'session' must be a valid session path or uuid") 1u
113 def _confirm_endpoint_id(self, endpoint_id):
114 # Have as read for now since 'list' isn't working
115 target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None 1bfghdieca
116 default_data = {} 1bfghdieca
117 if target_obj: 1bfghdieca
118 self.json = 'qc' not in target_obj 1bfghdieca
119 self.eid = endpoint_id 1bfghdieca
120 if self.json: 1bfghdieca
121 default_data['qc'] = 'NOT_SET' 1bfghdie
122 if 'extended_qc' not in target_obj: 1bfghdieca
123 default_data['extended_qc'] = {} 1bfghdieca
125 if not default_data: 1bfghdieca
126 return # No need to set up JSON for QC
127 json_field = target_obj.get('json') 1bfghdieca
128 if not json_field or (self.json and not json_field.get('qc', None)): 1bfghdieca
129 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1bfgha
130 field_name='json', data=default_data)
131 else:
132 self.log.error('Cannot run QC: endpoint id is not recognised')
133 raise ValueError("'endpoint_id' must be a valid uuid")
135 def update(self, outcome=None, namespace='experimenter', override=False):
136 """Update the qc field in Alyx.
138 Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value.
140 Parameters
141 ----------
142 outcome : str, int, one.alf.spec.QC
143 A QC outcome; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET".
144 namespace : str
145 The extended QC key specifying the type of QC associated with the outcome.
146 override : bool
147 If True the QC field is updated even if new value is better than previous.
149 Returns
150 -------
151 one.alf.spec.QC
152 The current QC outcome on Alyx.
154 Example
155 -------
156 >>> qc = QC('path/to/session')
157 >>> qc.update('PASS') # Update current QC field to 'PASS' if not set
158 """
159 assert self.one, 'instance of one should be provided' 1bfghdielkcaj
160 if self.one.offline: 1bfghdielkcaj
161 self.log.warning('Running on OneOffline instance, unable to update remote QC')
162 return
163 outcome = spec.QC.validate(self.outcome if outcome is None else outcome) 1bfghdielkcaj
164 assert self.eid, 'Unable to update Alyx; eID not set' 1bfghdielkcaj
165 if namespace: # Record in extended qc 1bfghdielkcaj
166 self.update_extended_qc({namespace: outcome.name}) 1dekaj
167 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1bfghdielkcaj
168 current_status = (details['json'] if self.json else details)['qc'] 1bfghdielkcaj
169 current_status = spec.QC.validate(current_status) 1bfghdielkcaj
171 if current_status < outcome or override: 1bfghdielkcaj
172 r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1dekcaj
173 field_name='json', data={'qc': outcome.name}) \
174 if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid,
175 data={'qc': outcome.name})
177 current_status = spec.QC.validate(r['qc']) 1dekcaj
178 assert current_status == outcome, 'Failed to update session QC' 1dekcaj
179 self.log.info(f'QC field successfully updated to {outcome.name} for {self.endpoint[:-1]} ' 1dekcaj
180 f'{self.eid}')
181 self._outcome = current_status 1bfghdielkcaj
182 return self.outcome 1bfghdielkcaj
184 def update_extended_qc(self, data):
185 """Update the extended_qc field in Alyx.
187 Subclasses should chain a call to this.
188 :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1.
189 :return: the updated extended_qc field
190 """
191 assert self.eid, 'Unable to update Alyx; eID not set' 1fghdietkcaj
192 assert self.one, 'instance of one should be provided' 1fghdietkcaj
193 if self.one.offline: 1fghdietkcaj
194 self.log.warning('Running on OneOffline instance, unable to update remote QC')
195 return
197 # Ensure None instead of NaNs
198 for k, v in data.items(): 1fghdietkcaj
199 if v is not None and not isinstance(v, str): 1fghdietkcaj
200 if isinstance(v, tuple): 1fghdietcaj
201 data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v) 1a
202 else:
203 data[k] = None if np.isnan(v).all() else v 1fghdietcaj
205 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1fghdietkcaj
206 if 'extended_qc' not in details: 1fghdietkcaj
207 extended_qc = details['json']['extended_qc'] or {} 1fghdieca
208 extended_qc.update(data) 1fghdieca
209 extended_qc_dict = {'extended_qc': extended_qc} 1fghdieca
210 out = self.one.alyx.json_field_update( 1fghdieca
211 endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict)
212 else:
213 extended_qc = details['extended_qc'] or {} 1tkaj
214 extended_qc.update(data) 1tkaj
215 out = self.one.alyx.json_field_update( 1tkaj
216 endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc)
218 self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} ' 1fghdietkcaj
219 f'{self.eid}')
220 return out 1fghdietkcaj
222 def compute_outcome_from_extended_qc(self) -> str:
223 """Return the session outcome computed from aggregating the extended QC."""
224 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1v
225 extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc'] 1v
226 return self.overall_outcome(v for k, v in extended_qc.items() or {} if k[0] != '_') 1v
229def sign_off_dict(exp_dec, sign_off_categories=None):
230 """
231 Create sign off dictionary.
233 Creates a dict containing 'sign off' keys for each device and task protocol in the provided
234 experiment description.
236 Parameters
237 ----------
238 exp_dec : dict
239 A loaded experiment description file.
240 sign_off_categories : dict of list
241 A dictionary of custom JSON keys for a given device in the acquisition description file.
243 Returns
244 -------
245 dict of dict
246 The sign off dictionary with the main key 'sign_off_checklist' containing keys for each
247 device and task protocol.
248 """
249 # Note this assumes devices each contain a dict of dicts
250 # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},}
251 sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES 1sa
252 sign_off_keys = set() 1sa
253 for k, v in exp_dec.get('devices', {}).items(): 1sa
254 assert isinstance(v, dict) and v 1sa
255 if len(v.keys()) == 1 and next(iter(v.keys())) == k: 1sa
256 if k in sign_off_categories: 1sa
257 for subkey in sign_off_categories[k]: 1s
258 sign_off_keys.add(f'{k}_{subkey}') 1s
259 else:
260 sign_off_keys.add(k) 1sa
261 else:
262 for kk in v.keys(): 1sa
263 if k in sign_off_categories: 1sa
264 for subkey in sign_off_categories[k]: 1s
265 sign_off_keys.add(f'{k}_{subkey}_{kk}') 1s
266 else:
267 sign_off_keys.add(f'{k}_{kk}') 1sa
269 # Add keys for each protocol
270 for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))): 1sa
271 sign_off_keys.add(f'{v}_{i:02}') 1sa
273 return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))} 1sa