Coverage for ibllib/qc/base.py: 93%
133 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1import logging
2from abc import abstractmethod
3from pathlib import Path
4from itertools import chain
6import numpy as np
7from one.api import ONE
8from one.alf import spec
10"""dict: custom sign off categories"""
11SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']}
14class QC:
15 """A base class for data quality control."""
17 def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'):
18 """
19 A base class for data quality control.
21 :param endpoint_id: Eid for endpoint. If using sessions can also be a session path
22 :param log: A logging.Logger instance, if None the 'ibllib' logger is used
23 :param one: An ONE instance for fetching and setting the QC on Alyx
24 :param endpoint: The endpoint name to apply qc to. Default is 'sessions'
25 """
26 self.one = one or ONE() 1cghiejfmdanpqorsb
27 self.log = log or logging.getLogger(__name__) 1cghiejfmdanpqorsb
28 if endpoint == 'sessions': 1cghiejfmdanpqorsb
29 self.endpoint = endpoint 1cmdanpqorsb
30 self._set_eid_or_path(endpoint_id) 1cmdanpqorsb
31 self.json = False 1cmdanpqorsb
32 else:
33 self.endpoint = endpoint 1cghiejfdab
34 self._confirm_endpoint_id(endpoint_id) 1cghiejfdab
36 # Ensure outcome attribute matches Alyx record
37 updatable = self.eid and self.one and not self.one.offline 1cghiejfmdanpqorsb
38 self._outcome = self.update('NOT_SET', namespace='') if updatable else spec.QC.NOT_SET 1cghiejfmdanpqorsb
39 self.log.debug(f'Current QC status is {self.outcome}') 1cghiejfmdanpqorsb
41 @abstractmethod
42 def run(self):
43 """Run the QC tests and return the outcome.
45 :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS"
46 """
47 pass
49 @abstractmethod
50 def load_data(self):
51 """Load the data required to compute the QC.
53 Subclasses may implement this for loading raw data.
54 """
55 pass
57 @property
58 def outcome(self):
59 """one.alf.spec.QC: The overall session outcome."""
60 return self._outcome 1cghiejfmldanpqorskb
62 @outcome.setter
63 def outcome(self, value):
64 value = spec.QC.validate(value) # ensure valid enum 1cmanpqob
65 if self._outcome < value: 1cmanpqob
66 self._outcome = value 1m
68 @staticmethod
69 def overall_outcome(outcomes: iter, agg=max) -> spec.QC:
70 """
71 Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome.
73 Example:
74 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL'
76 Parameters
77 ----------
78 outcomes : iterable of one.alf.spec.QC, str or int
79 An iterable of QC outcomes.
80 agg : function
81 Outcome code aggregate function, default is max (i.e. worst).
83 Returns
84 -------
85 one.alf.spec.QC
86 The overall outcome.
87 """
88 outcomes = filter(lambda x: x not in (None, np.NaN), outcomes) 1wxyzABCDEdFanoGrsHkb
89 return agg(map(spec.QC.validate, outcomes)) 1wxyzABCDEdFanoGrsHkb
91 def _set_eid_or_path(self, session_path_or_eid):
92 """Parse a given eID or session path.
94 If a session UUID is given, resolves and stores the local path and vice versa
95 :param session_path_or_eid: A session eid or path
96 :return:
97 """
98 self.eid = None 1cvmdanpqorsb
99 if spec.is_uuid_string(str(session_path_or_eid)): 1cvmdanpqorsb
100 self.eid = session_path_or_eid 1cvm
101 # Try to set session_path if data is found locally
102 self.session_path = self.one.eid2path(self.eid) 1cvm
103 elif spec.is_session_path(session_path_or_eid): 1cvdanpqorsb
104 self.session_path = Path(session_path_or_eid) 1cvdanpqorsb
105 if self.one is not None: 1cvdanpqorsb
106 self.eid = self.one.path2eid(self.session_path) 1cvdanpqorsb
107 if not self.eid: 1cvdanpqorsb
108 self.log.warning('Failed to determine eID from session path')
109 else:
110 self.log.error('Cannot run QC: an experiment uuid or session path is required') 1v
111 raise ValueError("'session' must be a valid session path or uuid") 1v
113 def _confirm_endpoint_id(self, endpoint_id):
114 # Have as read for now since 'list' isn't working
115 target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None 1cghiejfdab
116 default_data = {} 1cghiejfdab
117 if target_obj: 1cghiejfdab
118 self.json = 'qc' not in target_obj 1cghiejfdab
119 self.eid = endpoint_id 1cghiejfdab
120 if self.json: 1cghiejfdab
121 default_data['qc'] = 'NOT_SET' 1cghiejf
122 if 'extended_qc' not in target_obj: 1cghiejfdab
123 default_data['extended_qc'] = {} 1cghiejfdab
125 if not default_data: 1cghiejfdab
126 return # No need to set up JSON for QC
127 json_field = target_obj.get('json') 1cghiejfdab
128 if not json_field or (self.json and not json_field.get('qc', None)): 1cghiejfdab
129 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1cghiab
130 field_name='json', data=default_data)
131 else:
132 self.log.error('Cannot run QC: endpoint id is not recognised')
133 raise ValueError("'endpoint_id' must be a valid uuid")
135 def update(self, outcome=None, namespace='experimenter', override=False):
136 """Update the qc field in Alyx.
138 Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value.
140 Parameters
141 ----------
142 outcome : str, int, one.alf.spec.QC
143 A QC outcome; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET".
144 namespace : str
145 The extended QC key specifying the type of QC associated with the outcome.
146 override : bool
147 If True the QC field is updated even if new value is better than previous.
149 Returns
150 -------
151 one.alf.spec.QC
152 The current QC outcome on Alyx.
154 Example
155 -------
156 >>> qc = QC('path/to/session')
157 >>> qc.update('PASS') # Update current QC field to 'PASS' if not set
158 """
159 assert self.one, 'instance of one should be provided' 1cghiejfmldakb
160 if self.one.offline: 1cghiejfmldakb
161 self.log.warning('Running on OneOffline instance, unable to update remote QC')
162 return
163 outcome = spec.QC.validate(self.outcome if outcome is None else outcome) 1cghiejfmldakb
164 assert self.eid, 'Unable to update Alyx; eID not set' 1cghiejfmldakb
165 if namespace: # Record in extended qc 1cghiejfmldakb
166 self.update_extended_qc({namespace: outcome.name}) 1eflakb
167 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1cghiejfmldakb
168 current_status = (details['json'] if self.json else details)['qc'] 1cghiejfmldakb
169 current_status = spec.QC.validate(current_status) 1cghiejfmldakb
171 if current_status < outcome or override: 1cghiejfmldakb
172 r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1efldakb
173 field_name='json', data={'qc': outcome.name}) \
174 if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid,
175 data={'qc': outcome.name})
177 current_status = spec.QC.validate(r['qc']) 1efldakb
178 assert current_status == outcome, 'Failed to update session QC' 1efldakb
179 self.log.info(f'QC field successfully updated to {outcome.name} for {self.endpoint[:-1]} ' 1efldakb
180 f'{self.eid}')
181 self._outcome = current_status 1cghiejfmldakb
182 return self.outcome 1cghiejfmldakb
184 def update_extended_qc(self, data):
185 """Update the extended_qc field in Alyx.
187 Subclasses should chain a call to this.
188 :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1.
189 :return: the updated extended_qc field
190 """
191 assert self.eid, 'Unable to update Alyx; eID not set' 1ghiejfuldakb
192 assert self.one, 'instance of one should be provided' 1ghiejfuldakb
193 if self.one.offline: 1ghiejfuldakb
194 self.log.warning('Running on OneOffline instance, unable to update remote QC')
195 return
197 # Ensure None instead of NaNs
198 for k, v in data.items(): 1ghiejfuldakb
199 if v is not None and not isinstance(v, str): 1ghiejfuldakb
200 if isinstance(v, tuple): 1ghiejfudakb
201 data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v) 1ab
202 else:
203 data[k] = None if np.isnan(v).all() else v 1ghiejfudakb
205 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1ghiejfuldakb
206 if 'extended_qc' not in details: 1ghiejfuldakb
207 extended_qc = details['json']['extended_qc'] or {} 1ghiejfdab
208 extended_qc.update(data) 1ghiejfdab
209 extended_qc_dict = {'extended_qc': extended_qc} 1ghiejfdab
210 out = self.one.alyx.json_field_update( 1ghiejfdab
211 endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict)
212 else:
213 extended_qc = details['extended_qc'] or {} 1ulakb
214 extended_qc.update(data) 1ulakb
215 out = self.one.alyx.json_field_update( 1ulakb
216 endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc)
218 self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} ' 1ghiejfuldakb
219 f'{self.eid}')
220 return out 1ghiejfuldakb
222 def compute_outcome_from_extended_qc(self) -> str:
223 """Return the session outcome computed from aggregating the extended QC."""
224 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1w
225 extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc'] 1w
226 return self.overall_outcome(v for k, v in extended_qc.items() or {} if k[0] != '_') 1w
229def sign_off_dict(exp_dec, sign_off_categories=None):
230 """
231 Create sign off dictionary.
233 Creates a dict containing 'sign off' keys for each device and task protocol in the provided
234 experiment description.
236 Parameters
237 ----------
238 exp_dec : dict
239 A loaded experiment description file.
240 sign_off_categories : dict of list
241 A dictionary of custom JSON keys for a given device in the acquisition description file.
243 Returns
244 -------
245 dict of dict
246 The sign off dictionary with the main key 'sign_off_checklist' containing keys for each
247 device and task protocol.
248 """
249 # Note this assumes devices each contain a dict of dicts
250 # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},}
251 sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES 1tab
252 sign_off_keys = set() 1tab
253 for k, v in exp_dec.get('devices', {}).items(): 1tab
254 assert isinstance(v, dict) and v 1tab
255 if len(v.keys()) == 1 and next(iter(v.keys())) == k: 1tab
256 if k in sign_off_categories: 1tab
257 for subkey in sign_off_categories[k]: 1t
258 sign_off_keys.add(f'{k}_{subkey}') 1t
259 else:
260 sign_off_keys.add(k) 1tab
261 else:
262 for kk in v.keys(): 1tab
263 if k in sign_off_categories: 1tab
264 for subkey in sign_off_categories[k]: 1t
265 sign_off_keys.add(f'{k}_{subkey}_{kk}') 1t
266 else:
267 sign_off_keys.add(f'{k}_{kk}') 1tab
269 # Add keys for each protocol
270 for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))): 1tab
271 sign_off_keys.add(f'{v}_{i:02}') 1tab
273 return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))} 1tab