Coverage for ibllib/pipes/remote_server.py: 0%
81 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1import logging
2from pathlib import Path, PosixPath
3import re
4import subprocess
5import os
7from ibllib.ephys import sync_probes
8from ibllib.pipes import ephys_preprocessing as ephys
9from ibllib.oneibl.patcher import FTPPatcher
10from one.api import ONE
12_logger = logging.getLogger(__name__)
14FLATIRON_HOST = 'ibl.flatironinstitute.org'
15FLATIRON_PORT = 61022
16FLATIRON_USER = 'datauser'
17root_path = '/mnt/s0/Data/'
20def _run_command(cmd):
21 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
22 stderr=subprocess.PIPE)
23 info, error = process.communicate()
24 if process.returncode != 0:
25 return None, error.decode('utf-8')
26 else:
27 return info.decode('utf-8').strip(), None
30def job_transfer_ks2(probe_path):
32 assert isinstance(probe_path, str)
34 def _get_volume_usage_percentage(vol):
35 cmd = f'df {vol}'
36 res, _ = _run_command(cmd)
37 size_list = re.split(' +', res.split('\n')[-1])
38 per_usage = int(size_list[4][:-1])
39 return per_usage
41 # First check disk availability
42 space = _get_volume_usage_percentage('/mnt/s0')
43 # If we are less than 80% full we can transfer more stuff
44 if space < 80:
45 # Transfer data from flatiron to s3
46 cmd = f'ssh -i ~/.ssh/mayo_alyx.pem -p {FLATIRON_PORT} ' \
47 f'{FLATIRON_USER}@{FLATIRON_HOST} ./transfer_to_aws.sh {probe_path}'
48 result, error = _run_command(cmd)
50 # Check that command has run as expected and output info to logger
51 if not result:
52 _logger.error(f'{probe_path}: Could not transfer data from FlatIron to s3 \n'
53 f'Error: {error}')
54 return
55 else:
56 _logger.info(f'{probe_path}: Data transferred from FlatIron to s3')
58 # Transfer data from s3 to /mnt/s0/Data on aws
59 session = str(PosixPath(probe_path).parent.parent)
60 cmd = f'aws s3 sync s3://ibl-ks2-storage/{session} "/mnt/s0/Data/{session}"'
61 result, error = _run_command(cmd)
63 # Check that command has run as expected and output info to logger
64 if not result:
65 _logger.error(f'{probe_path}: Could not transfer data from s3 to aws \n'
66 f'Error: {error}')
67 return
68 else:
69 _logger.info(f'{probe_path}: Data transferred from s3 to aws')
71 # Rename the files to get rid of eid associated with each dataset
72 session_path = Path(root_path).joinpath(session)
73 for file in session_path.glob('**/*'):
74 if len(Path(file.stem).suffix) == 37:
75 file.rename(Path(file.parent, str(Path(file.stem).stem) + file.suffix))
76 _logger.info(f'Renamed dataset {file.stem} to {str(Path(file.stem).stem)}')
77 else:
78 _logger.warning(f'Dataset {file.stem} not renamed')
79 continue
81 # Create a sort_me.flag
82 cmd = f'touch /mnt/s0/Data/{session}/sort_me.flag'
83 result, error = _run_command(cmd)
84 _logger.info(f'{session}: sort_me.flag created')
86 # Remove files from s3
87 cmd = f'aws s3 rm --recursive s3://ibl-ks2-storage/{session}'
88 result, error = _run_command(cmd)
89 if not result:
90 _logger.error(f'{session}: Could not remove data from s3 \n'
91 f'Error: {error}')
92 return
93 else:
94 _logger.info(f'{session}: Data removed from s3')
96 return
99def job_run_ks2():
101 # Look for flag files in /mnt/s0/Data and sort them in order of date they were created
102 flag_files = list(Path(root_path).glob('**/sort_me.flag'))
103 flag_files.sort(key=os.path.getmtime)
105 # Start with the oldest flag
106 session_path = flag_files[0].parent
107 session = str(PosixPath(*session_path.parts[4:]))
108 flag_files[0].unlink()
110 # Instantiate one
111 one = ONE(cache_rest=None)
113 # sync the probes
114 status, sync_files = sync_probes.sync(session_path)
116 if not status:
117 _logger.error(f'{session}: Could not sync probes')
118 return
119 else:
120 _logger.info(f'{session}: Probes successfully synced')
122 # run ks2
123 task = ephys.SpikeSorting(session_path, one=one)
124 status = task.run()
126 if status != 0:
127 _logger.error(f'{session}: Could not run ks2')
128 return
129 else:
130 _logger.info(f'{session}: ks2 successfully completed')
132 # Run the cell qc
133 # qc_file = []
135 # Register and upload files to FTP Patcher
136 outfiles = task.outputs
137 ftp_patcher = FTPPatcher(one=one)
138 ftp_patcher.create_dataset(path=outfiles, created_by=one._par.ALYX_LOGIN)
140 # Remove everything apart from alf folder and spike sorter folder
141 # Don't do this for now unitl we are sure it works for 3A and 3B!!
142 # cmd = f'rm -r {session_path}/raw_ephys_data rm -r {session_path}/raw_behavior_data'
143 # result, error = _run_command(cmd)