Coverage for ibllib/qc/base.py: 93%

133 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1import logging 

2from abc import abstractmethod 

3from pathlib import Path 

4from itertools import chain 

5 

6import numpy as np 

7from one.api import ONE 

8from one.alf import spec 

9 

10"""dict: custom sign off categories""" 

11SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']} 

12 

13 

14class QC: 

15 """A base class for data quality control.""" 

16 

17 def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'): 

18 """ 

19 A base class for data quality control. 

20 

21 :param endpoint_id: Eid for endpoint. If using sessions can also be a session path 

22 :param log: A logging.Logger instance, if None the 'ibllib' logger is used 

23 :param one: An ONE instance for fetching and setting the QC on Alyx 

24 :param endpoint: The endpoint name to apply qc to. Default is 'sessions' 

25 """ 

26 self.one = one or ONE() 1cghiejfmdanpqorsb

27 self.log = log or logging.getLogger(__name__) 1cghiejfmdanpqorsb

28 if endpoint == 'sessions': 1cghiejfmdanpqorsb

29 self.endpoint = endpoint 1cmdanpqorsb

30 self._set_eid_or_path(endpoint_id) 1cmdanpqorsb

31 self.json = False 1cmdanpqorsb

32 else: 

33 self.endpoint = endpoint 1cghiejfdab

34 self._confirm_endpoint_id(endpoint_id) 1cghiejfdab

35 

36 # Ensure outcome attribute matches Alyx record 

37 updatable = self.eid and self.one and not self.one.offline 1cghiejfmdanpqorsb

38 self._outcome = self.update('NOT_SET', namespace='') if updatable else spec.QC.NOT_SET 1cghiejfmdanpqorsb

39 self.log.debug(f'Current QC status is {self.outcome}') 1cghiejfmdanpqorsb

40 

41 @abstractmethod 

42 def run(self): 

43 """Run the QC tests and return the outcome. 

44 

45 :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS" 

46 """ 

47 pass 

48 

49 @abstractmethod 

50 def load_data(self): 

51 """Load the data required to compute the QC. 

52 

53 Subclasses may implement this for loading raw data. 

54 """ 

55 pass 

56 

57 @property 

58 def outcome(self): 

59 """one.alf.spec.QC: The overall session outcome.""" 

60 return self._outcome 1cghiejfmldanpqorskb

61 

62 @outcome.setter 

63 def outcome(self, value): 

64 value = spec.QC.validate(value) # ensure valid enum 1cmanpqob

65 if self._outcome < value: 1cmanpqob

66 self._outcome = value 1m

67 

68 @staticmethod 

69 def overall_outcome(outcomes: iter, agg=max) -> spec.QC: 

70 """ 

71 Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome. 

72 

73 Example: 

74 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL' 

75 

76 Parameters 

77 ---------- 

78 outcomes : iterable of one.alf.spec.QC, str or int 

79 An iterable of QC outcomes. 

80 agg : function 

81 Outcome code aggregate function, default is max (i.e. worst). 

82 

83 Returns 

84 ------- 

85 one.alf.spec.QC 

86 The overall outcome. 

87 """ 

88 outcomes = filter(lambda x: x not in (None, np.NaN), outcomes) 1wxyzABCDEdFanoGrsHkb

89 return agg(map(spec.QC.validate, outcomes)) 1wxyzABCDEdFanoGrsHkb

90 

91 def _set_eid_or_path(self, session_path_or_eid): 

92 """Parse a given eID or session path. 

93 

94 If a session UUID is given, resolves and stores the local path and vice versa 

95 :param session_path_or_eid: A session eid or path 

96 :return: 

97 """ 

98 self.eid = None 1cvmdanpqorsb

99 if spec.is_uuid_string(str(session_path_or_eid)): 1cvmdanpqorsb

100 self.eid = session_path_or_eid 1cvm

101 # Try to set session_path if data is found locally 

102 self.session_path = self.one.eid2path(self.eid) 1cvm

103 elif spec.is_session_path(session_path_or_eid): 1cvdanpqorsb

104 self.session_path = Path(session_path_or_eid) 1cvdanpqorsb

105 if self.one is not None: 1cvdanpqorsb

106 self.eid = self.one.path2eid(self.session_path) 1cvdanpqorsb

107 if not self.eid: 1cvdanpqorsb

108 self.log.warning('Failed to determine eID from session path') 

109 else: 

110 self.log.error('Cannot run QC: an experiment uuid or session path is required') 1v

111 raise ValueError("'session' must be a valid session path or uuid") 1v

112 

113 def _confirm_endpoint_id(self, endpoint_id): 

114 # Have as read for now since 'list' isn't working 

115 target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None 1cghiejfdab

116 default_data = {} 1cghiejfdab

117 if target_obj: 1cghiejfdab

118 self.json = 'qc' not in target_obj 1cghiejfdab

119 self.eid = endpoint_id 1cghiejfdab

120 if self.json: 1cghiejfdab

121 default_data['qc'] = 'NOT_SET' 1cghiejf

122 if 'extended_qc' not in target_obj: 1cghiejfdab

123 default_data['extended_qc'] = {} 1cghiejfdab

124 

125 if not default_data: 1cghiejfdab

126 return # No need to set up JSON for QC 

127 json_field = target_obj.get('json') 1cghiejfdab

128 if not json_field or (self.json and not json_field.get('qc', None)): 1cghiejfdab

129 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1cghiab

130 field_name='json', data=default_data) 

131 else: 

132 self.log.error('Cannot run QC: endpoint id is not recognised') 

133 raise ValueError("'endpoint_id' must be a valid uuid") 

134 

135 def update(self, outcome=None, namespace='experimenter', override=False): 

136 """Update the qc field in Alyx. 

137 

138 Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value. 

139 

140 Parameters 

141 ---------- 

142 outcome : str, int, one.alf.spec.QC 

143 A QC outcome; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET". 

144 namespace : str 

145 The extended QC key specifying the type of QC associated with the outcome. 

146 override : bool 

147 If True the QC field is updated even if new value is better than previous. 

148 

149 Returns 

150 ------- 

151 one.alf.spec.QC 

152 The current QC outcome on Alyx. 

153 

154 Example 

155 ------- 

156 >>> qc = QC('path/to/session') 

157 >>> qc.update('PASS') # Update current QC field to 'PASS' if not set 

158 """ 

159 assert self.one, 'instance of one should be provided' 1cghiejfmldakb

160 if self.one.offline: 1cghiejfmldakb

161 self.log.warning('Running on OneOffline instance, unable to update remote QC') 

162 return 

163 outcome = spec.QC.validate(self.outcome if outcome is None else outcome) 1cghiejfmldakb

164 assert self.eid, 'Unable to update Alyx; eID not set' 1cghiejfmldakb

165 if namespace: # Record in extended qc 1cghiejfmldakb

166 self.update_extended_qc({namespace: outcome.name}) 1eflakb

167 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1cghiejfmldakb

168 current_status = (details['json'] if self.json else details)['qc'] 1cghiejfmldakb

169 current_status = spec.QC.validate(current_status) 1cghiejfmldakb

170 

171 if current_status < outcome or override: 1cghiejfmldakb

172 r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1efldakb

173 field_name='json', data={'qc': outcome.name}) \ 

174 if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid, 

175 data={'qc': outcome.name}) 

176 

177 current_status = spec.QC.validate(r['qc']) 1efldakb

178 assert current_status == outcome, 'Failed to update session QC' 1efldakb

179 self.log.info(f'QC field successfully updated to {outcome.name} for {self.endpoint[:-1]} ' 1efldakb

180 f'{self.eid}') 

181 self._outcome = current_status 1cghiejfmldakb

182 return self.outcome 1cghiejfmldakb

183 

184 def update_extended_qc(self, data): 

185 """Update the extended_qc field in Alyx. 

186 

187 Subclasses should chain a call to this. 

188 :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1. 

189 :return: the updated extended_qc field 

190 """ 

191 assert self.eid, 'Unable to update Alyx; eID not set' 1ghiejfuldakb

192 assert self.one, 'instance of one should be provided' 1ghiejfuldakb

193 if self.one.offline: 1ghiejfuldakb

194 self.log.warning('Running on OneOffline instance, unable to update remote QC') 

195 return 

196 

197 # Ensure None instead of NaNs 

198 for k, v in data.items(): 1ghiejfuldakb

199 if v is not None and not isinstance(v, str): 1ghiejfuldakb

200 if isinstance(v, tuple): 1ghiejfudakb

201 data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v) 1ab

202 else: 

203 data[k] = None if np.isnan(v).all() else v 1ghiejfudakb

204 

205 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1ghiejfuldakb

206 if 'extended_qc' not in details: 1ghiejfuldakb

207 extended_qc = details['json']['extended_qc'] or {} 1ghiejfdab

208 extended_qc.update(data) 1ghiejfdab

209 extended_qc_dict = {'extended_qc': extended_qc} 1ghiejfdab

210 out = self.one.alyx.json_field_update( 1ghiejfdab

211 endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict) 

212 else: 

213 extended_qc = details['extended_qc'] or {} 1ulakb

214 extended_qc.update(data) 1ulakb

215 out = self.one.alyx.json_field_update( 1ulakb

216 endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc) 

217 

218 self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} ' 1ghiejfuldakb

219 f'{self.eid}') 

220 return out 1ghiejfuldakb

221 

222 def compute_outcome_from_extended_qc(self) -> str: 

223 """Return the session outcome computed from aggregating the extended QC.""" 

224 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1w

225 extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc'] 1w

226 return self.overall_outcome(v for k, v in extended_qc.items() or {} if k[0] != '_') 1w

227 

228 

229def sign_off_dict(exp_dec, sign_off_categories=None): 

230 """ 

231 Create sign off dictionary. 

232 

233 Creates a dict containing 'sign off' keys for each device and task protocol in the provided 

234 experiment description. 

235 

236 Parameters 

237 ---------- 

238 exp_dec : dict 

239 A loaded experiment description file. 

240 sign_off_categories : dict of list 

241 A dictionary of custom JSON keys for a given device in the acquisition description file. 

242 

243 Returns 

244 ------- 

245 dict of dict 

246 The sign off dictionary with the main key 'sign_off_checklist' containing keys for each 

247 device and task protocol. 

248 """ 

249 # Note this assumes devices each contain a dict of dicts 

250 # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},} 

251 sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES 1tab

252 sign_off_keys = set() 1tab

253 for k, v in exp_dec.get('devices', {}).items(): 1tab

254 assert isinstance(v, dict) and v 1tab

255 if len(v.keys()) == 1 and next(iter(v.keys())) == k: 1tab

256 if k in sign_off_categories: 1tab

257 for subkey in sign_off_categories[k]: 1t

258 sign_off_keys.add(f'{k}_{subkey}') 1t

259 else: 

260 sign_off_keys.add(k) 1tab

261 else: 

262 for kk in v.keys(): 1tab

263 if k in sign_off_categories: 1tab

264 for subkey in sign_off_categories[k]: 1t

265 sign_off_keys.add(f'{k}_{subkey}_{kk}') 1t

266 else: 

267 sign_off_keys.add(f'{k}_{kk}') 1tab

268 

269 # Add keys for each protocol 

270 for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))): 1tab

271 sign_off_keys.add(f'{v}_{i:02}') 1tab

272 

273 return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))} 1tab