Coverage for ibllib/pipes/remote_server.py: 0%

81 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1import logging 

2from pathlib import Path, PosixPath 

3import re 

4import subprocess 

5import os 

6 

7from ibllib.ephys import sync_probes 

8from ibllib.pipes import ephys_preprocessing as ephys 

9from ibllib.oneibl.patcher import FTPPatcher 

10from one.api import ONE 

11 

12_logger = logging.getLogger(__name__) 

13 

14FLATIRON_HOST = 'ibl.flatironinstitute.org' 

15FLATIRON_PORT = 61022 

16FLATIRON_USER = 'datauser' 

17root_path = '/mnt/s0/Data/' 

18 

19 

20def _run_command(cmd): 

21 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 

22 stderr=subprocess.PIPE) 

23 info, error = process.communicate() 

24 if process.returncode != 0: 

25 return None, error.decode('utf-8') 

26 else: 

27 return info.decode('utf-8').strip(), None 

28 

29 

30def job_transfer_ks2(probe_path): 

31 

32 assert isinstance(probe_path, str) 

33 

34 def _get_volume_usage_percentage(vol): 

35 cmd = f'df {vol}' 

36 res, _ = _run_command(cmd) 

37 size_list = re.split(' +', res.split('\n')[-1]) 

38 per_usage = int(size_list[4][:-1]) 

39 return per_usage 

40 

41 # First check disk availability 

42 space = _get_volume_usage_percentage('/mnt/s0') 

43 # If we are less than 80% full we can transfer more stuff 

44 if space < 80: 

45 # Transfer data from flatiron to s3 

46 cmd = f'ssh -i ~/.ssh/mayo_alyx.pem -p {FLATIRON_PORT} ' \ 

47 f'{FLATIRON_USER}@{FLATIRON_HOST} ./transfer_to_aws.sh {probe_path}' 

48 result, error = _run_command(cmd) 

49 

50 # Check that command has run as expected and output info to logger 

51 if not result: 

52 _logger.error(f'{probe_path}: Could not transfer data from FlatIron to s3 \n' 

53 f'Error: {error}') 

54 return 

55 else: 

56 _logger.info(f'{probe_path}: Data transferred from FlatIron to s3') 

57 

58 # Transfer data from s3 to /mnt/s0/Data on aws 

59 session = str(PosixPath(probe_path).parent.parent) 

60 cmd = f'aws s3 sync s3://ibl-ks2-storage/{session} "/mnt/s0/Data/{session}"' 

61 result, error = _run_command(cmd) 

62 

63 # Check that command has run as expected and output info to logger 

64 if not result: 

65 _logger.error(f'{probe_path}: Could not transfer data from s3 to aws \n' 

66 f'Error: {error}') 

67 return 

68 else: 

69 _logger.info(f'{probe_path}: Data transferred from s3 to aws') 

70 

71 # Rename the files to get rid of eid associated with each dataset 

72 session_path = Path(root_path).joinpath(session) 

73 for file in session_path.glob('**/*'): 

74 if len(Path(file.stem).suffix) == 37: 

75 file.rename(Path(file.parent, str(Path(file.stem).stem) + file.suffix)) 

76 _logger.info(f'Renamed dataset {file.stem} to {str(Path(file.stem).stem)}') 

77 else: 

78 _logger.warning(f'Dataset {file.stem} not renamed') 

79 continue 

80 

81 # Create a sort_me.flag 

82 cmd = f'touch /mnt/s0/Data/{session}/sort_me.flag' 

83 result, error = _run_command(cmd) 

84 _logger.info(f'{session}: sort_me.flag created') 

85 

86 # Remove files from s3 

87 cmd = f'aws s3 rm --recursive s3://ibl-ks2-storage/{session}' 

88 result, error = _run_command(cmd) 

89 if not result: 

90 _logger.error(f'{session}: Could not remove data from s3 \n' 

91 f'Error: {error}') 

92 return 

93 else: 

94 _logger.info(f'{session}: Data removed from s3') 

95 

96 return 

97 

98 

99def job_run_ks2(): 

100 

101 # Look for flag files in /mnt/s0/Data and sort them in order of date they were created 

102 flag_files = list(Path(root_path).glob('**/sort_me.flag')) 

103 flag_files.sort(key=os.path.getmtime) 

104 

105 # Start with the oldest flag 

106 session_path = flag_files[0].parent 

107 session = str(PosixPath(*session_path.parts[4:])) 

108 flag_files[0].unlink() 

109 

110 # Instantiate one 

111 one = ONE(cache_rest=None) 

112 

113 # sync the probes 

114 status, sync_files = sync_probes.sync(session_path) 

115 

116 if not status: 

117 _logger.error(f'{session}: Could not sync probes') 

118 return 

119 else: 

120 _logger.info(f'{session}: Probes successfully synced') 

121 

122 # run ks2 

123 task = ephys.SpikeSorting(session_path, one=one) 

124 status = task.run() 

125 

126 if status != 0: 

127 _logger.error(f'{session}: Could not run ks2') 

128 return 

129 else: 

130 _logger.info(f'{session}: ks2 successfully completed') 

131 

132 # Run the cell qc 

133 # qc_file = [] 

134 

135 # Register and upload files to FTP Patcher 

136 outfiles = task.outputs 

137 ftp_patcher = FTPPatcher(one=one) 

138 ftp_patcher.create_dataset(path=outfiles, created_by=one._par.ALYX_LOGIN) 

139 

140 # Remove everything apart from alf folder and spike sorter folder 

141 # Don't do this for now unitl we are sure it works for 3A and 3B!! 

142 # cmd = f'rm -r {session_path}/raw_ephys_data rm -r {session_path}/raw_behavior_data' 

143 # result, error = _run_command(cmd)