arvados.commands.get
1#!/usr/bin/env python3 2# Copyright (C) The Arvados Authors. All rights reserved. 3# 4# SPDX-License-Identifier: Apache-2.0 5 6import argparse 7import hashlib 8import os 9import pathlib 10import re 11import string 12import sys 13import logging 14 15import arvados 16import arvados.commands._util as arv_cmd 17import arvados.util as util 18 19from arvados._version import __version__ 20 21logger = logging.getLogger('arvados.arv-get') 22 23parser = argparse.ArgumentParser( 24 description='Copy data from Keep to a local file or pipe.', 25 parents=[arv_cmd.retry_opt]) 26parser.add_argument('--version', action='version', 27 version="%s %s" % (sys.argv[0], __version__), 28 help='Print version and exit.') 29parser.add_argument('locator', type=str, 30 help=""" 31Collection locator, optionally with a file path or prefix. 32""") 33parser.add_argument('destination', type=str, nargs='?', default='-', 34 help=""" 35Local file or directory where the data is to be written. Default: stdout. 36""") 37group = parser.add_mutually_exclusive_group() 38group.add_argument('--progress', action='store_true', 39 help=""" 40Display human-readable progress on stderr (bytes and, if possible, 41percentage of total data size). This is the default behavior when it 42is not expected to interfere with the output: specifically, stderr is 43a tty _and_ either stdout is not a tty, or output is being written to 44named files rather than stdout. 45""") 46group.add_argument('--no-progress', action='store_true', 47 help=""" 48Do not display human-readable progress on stderr. 49""") 50group.add_argument('--batch-progress', action='store_true', 51 help=""" 52Display machine-readable progress on stderr (bytes and, if known, 53total data size). 54""") 55group = parser.add_mutually_exclusive_group() 56group.add_argument('--hash', 57 help=""" 58Display the hash of each file as it is read from Keep, using the given 59hash algorithm. Supported algorithms include md5, sha1, sha224, 60sha256, sha384, and sha512. 61""") 62group.add_argument('--md5sum', action='store_const', 63 dest='hash', const='md5', 64 help=""" 65Display the MD5 hash of each file as it is read from Keep. 66""") 67parser.add_argument('-n', action='store_true', 68 help=""" 69Do not write any data -- just read from Keep, and report md5sums if 70requested. 71""") 72parser.add_argument('-r', action='store_true', 73 help=""" 74Retrieve all files in the specified collection/prefix. This is the 75default behavior if the "locator" argument ends with a forward slash. 76""") 77group = parser.add_mutually_exclusive_group() 78group.add_argument('-f', action='store_true', 79 help=""" 80Overwrite existing files while writing. The default behavior is to 81refuse to write *anything* if any of the output files already 82exist. As a special case, -f is not needed to write to stdout. 83""") 84group.add_argument('-v', action='count', default=0, 85 help=""" 86Once for verbose mode, twice for debug mode. 87""") 88group.add_argument('--skip-existing', action='store_true', 89 help=""" 90Skip files that already exist. The default behavior is to refuse to 91write *anything* if any files exist that would have to be 92overwritten. This option causes even devices, sockets, and fifos to be 93skipped. 94""") 95group.add_argument('--strip-manifest', action='store_true', default=False, 96 help=""" 97When getting a collection manifest, strip its access tokens before writing 98it. 99""") 100 101parser.add_argument('--threads', type=int, metavar='N', default=4, 102 help=""" 103Set the number of download threads to be used. Take into account that 104using lots of threads will increase the RAM requirements. Default is 105to use 4 threads. 106On high latency installations, using a greater number will improve 107overall throughput. 108""") 109 110def parse_arguments(arguments, stdout, stderr): 111 args = parser.parse_args(arguments) 112 113 if args.locator[-1] == os.sep: 114 args.r = True 115 if (args.r and 116 not args.n and 117 not (args.destination and 118 os.path.isdir(args.destination))): 119 parser.error('Destination is not a directory.') 120 if not args.r and (os.path.isdir(args.destination) or 121 args.destination[-1] == os.path.sep): 122 args.destination = os.path.join(args.destination, 123 os.path.basename(args.locator)) 124 logger.debug("Appended source file name to destination directory: %s", 125 args.destination) 126 127 if args.destination == '/dev/stdout': 128 args.destination = "-" 129 130 if args.destination == '-': 131 # Normally you have to use -f to write to a file (or device) that 132 # already exists, but "-" and "/dev/stdout" are common enough to 133 # merit a special exception. 134 args.f = True 135 else: 136 args.destination = args.destination.rstrip(os.sep) 137 138 # Turn on --progress by default if stderr is a tty and output is 139 # either going to a named file, or going (via stdout) to something 140 # that isn't a tty. 141 if (not (args.batch_progress or args.no_progress) 142 and stderr.isatty() 143 and (args.destination != '-' 144 or not stdout.isatty())): 145 args.progress = True 146 return args 147 148def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): 149 if stdout is sys.stdout and hasattr(stdout, 'buffer'): 150 # in Python 3, write to stdout as binary 151 stdout = stdout.buffer 152 153 args = parse_arguments(arguments, stdout, stderr) 154 logger.setLevel(logging.WARNING - 10 * args.v) 155 156 request_id = arvados.util.new_request_id() 157 logger.info('X-Request-Id: '+request_id) 158 159 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries) 160 161 r = re.search(r'^(.*?)(/.*)?$', args.locator) 162 col_loc = r.group(1) 163 get_prefix = r.group(2) 164 if args.r and not get_prefix: 165 get_prefix = os.sep 166 167 # User asked to download the collection's manifest 168 if not get_prefix: 169 if not args.n: 170 open_flags = os.O_CREAT | os.O_WRONLY 171 if not args.f: 172 open_flags |= os.O_EXCL 173 try: 174 if args.destination == "-": 175 write_block_or_manifest( 176 dest=stdout, src=col_loc, 177 api_client=api_client, args=args) 178 else: 179 out_fd = os.open(args.destination, open_flags) 180 with os.fdopen(out_fd, 'wb') as out_file: 181 write_block_or_manifest( 182 dest=out_file, src=col_loc, 183 api_client=api_client, args=args) 184 except (IOError, OSError) as error: 185 logger.error("can't write to '{}': {}".format(args.destination, error)) 186 return 1 187 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error: 188 logger.error("failed to download '{}': {}".format(col_loc, error)) 189 return 1 190 except arvados.errors.ArgumentError as error: 191 if 'Argument to CollectionReader' in str(error): 192 logger.error("error reading collection: {}".format(error)) 193 return 1 194 else: 195 raise 196 return 0 197 198 try: 199 reader = arvados.CollectionReader( 200 col_loc, api_client=api_client, num_retries=args.retries, 201 keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads)) 202 except Exception as error: 203 logger.error("failed to read collection: {}".format(error)) 204 return 1 205 206 # Scan the collection. Make an array of (stream, file, local 207 # destination filename) tuples, and add up total size to extract. 208 todo = [] 209 todo_bytes = 0 210 try: 211 if get_prefix == os.sep: 212 item = reader 213 else: 214 item = reader.find('.' + get_prefix) 215 216 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader): 217 # If the user asked for a file and we got a subcollection, error out. 218 if get_prefix[-1] != os.sep: 219 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix)) 220 return 1 221 # If the user asked stdout as a destination, error out. 222 elif args.destination == '-': 223 logger.error("cannot use 'stdout' as destination when downloading multiple files.") 224 return 1 225 # User asked for a subcollection, and that's what was found. Add up total size 226 # to download. 227 for s, f in files_in_collection(item): 228 dest_path = os.path.join( 229 args.destination, 230 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:]) 231 if (not (args.n or args.f or args.skip_existing) and 232 os.path.exists(dest_path)): 233 logger.error('Local file %s already exists.' % (dest_path,)) 234 return 1 235 todo += [(s, f, dest_path)] 236 todo_bytes += f.size() 237 elif isinstance(item, arvados.arvfile.ArvadosFile): 238 todo += [(item.parent, item, args.destination)] 239 todo_bytes += item.size() 240 else: 241 logger.error("'{}' not found.".format('.' + get_prefix)) 242 return 1 243 except (IOError, arvados.errors.NotFoundError) as e: 244 logger.error(e) 245 return 1 246 247 out_bytes = 0 248 for s, f, outfilename in todo: 249 outfile = None 250 digestor = None 251 if not args.n: 252 if outfilename == "-": 253 outfile = stdout 254 else: 255 if args.skip_existing and os.path.exists(outfilename): 256 logger.debug('Local file %s exists. Skipping.', outfilename) 257 continue 258 elif not args.f and (os.path.isfile(outfilename) or 259 os.path.isdir(outfilename)): 260 # Good thing we looked again: apparently this file wasn't 261 # here yet when we checked earlier. 262 logger.error('Local file %s already exists.' % (outfilename,)) 263 return 1 264 if args.r: 265 pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True) 266 try: 267 outfile = open(outfilename, 'wb') 268 except Exception as error: 269 logger.error('Open(%s) failed: %s' % (outfilename, error)) 270 return 1 271 if args.hash: 272 digestor = hashlib.new(args.hash) 273 try: 274 with s.open(f.name, 'rb') as file_reader: 275 for data in file_reader.readall(): 276 if outfile: 277 outfile.write(data) 278 if digestor: 279 digestor.update(data) 280 out_bytes += len(data) 281 if args.progress: 282 stderr.write('\r%d MiB / %d MiB %.1f%%' % 283 (out_bytes >> 20, 284 todo_bytes >> 20, 285 (100 286 if todo_bytes==0 287 else 100.0*out_bytes/todo_bytes))) 288 elif args.batch_progress: 289 stderr.write('%s %d read %d total %d\n' % 290 (sys.argv[0], os.getpid(), 291 out_bytes, todo_bytes)) 292 if digestor: 293 stderr.write("%s %s/%s\n" 294 % (digestor.hexdigest(), s.stream_name(), f.name)) 295 except KeyboardInterrupt: 296 if outfile and (outfile.fileno() > 2) and not outfile.closed: 297 os.unlink(outfile.name) 298 break 299 finally: 300 if outfile != None and outfile != stdout: 301 outfile.close() 302 303 if args.progress: 304 stderr.write('\n') 305 return 0 306 307def files_in_collection(c): 308 # Sort first by file type, then alphabetically by file path. 309 for i in sorted(list(c.keys()), 310 key=lambda k: ( 311 isinstance(c[k], arvados.collection.Subcollection), 312 k.upper())): 313 if isinstance(c[i], arvados.arvfile.ArvadosFile): 314 yield (c, c[i]) 315 elif isinstance(c[i], arvados.collection.Subcollection): 316 for s, f in files_in_collection(c[i]): 317 yield (s, f) 318 319def write_block_or_manifest(dest, src, api_client, args): 320 if '+A' in src: 321 # block locator 322 kc = arvados.keep.KeepClient(api_client=api_client) 323 dest.write(kc.get(src, num_retries=args.retries)) 324 else: 325 # collection UUID or portable data hash 326 reader = arvados.CollectionReader( 327 src, api_client=api_client, num_retries=args.retries) 328 dest.write(reader.manifest_text(strip=args.strip_manifest).encode())
logger =
<Logger arvados.arv-get (WARNING)>
parser =
ArgumentParser(prog='pysdk_pdoc.py', usage=None, description='Copy data from Keep to a local file or pipe.', formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=True)
group =
<argparse._MutuallyExclusiveGroup object>
def
parse_arguments(arguments, stdout, stderr):
111def parse_arguments(arguments, stdout, stderr): 112 args = parser.parse_args(arguments) 113 114 if args.locator[-1] == os.sep: 115 args.r = True 116 if (args.r and 117 not args.n and 118 not (args.destination and 119 os.path.isdir(args.destination))): 120 parser.error('Destination is not a directory.') 121 if not args.r and (os.path.isdir(args.destination) or 122 args.destination[-1] == os.path.sep): 123 args.destination = os.path.join(args.destination, 124 os.path.basename(args.locator)) 125 logger.debug("Appended source file name to destination directory: %s", 126 args.destination) 127 128 if args.destination == '/dev/stdout': 129 args.destination = "-" 130 131 if args.destination == '-': 132 # Normally you have to use -f to write to a file (or device) that 133 # already exists, but "-" and "/dev/stdout" are common enough to 134 # merit a special exception. 135 args.f = True 136 else: 137 args.destination = args.destination.rstrip(os.sep) 138 139 # Turn on --progress by default if stderr is a tty and output is 140 # either going to a named file, or going (via stdout) to something 141 # that isn't a tty. 142 if (not (args.batch_progress or args.no_progress) 143 and stderr.isatty() 144 and (args.destination != '-' 145 or not stdout.isatty())): 146 args.progress = True 147 return args
def
main( arguments=None, stdout=<_io.StringIO object>, stderr=<_io.StringIO object>):
149def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): 150 if stdout is sys.stdout and hasattr(stdout, 'buffer'): 151 # in Python 3, write to stdout as binary 152 stdout = stdout.buffer 153 154 args = parse_arguments(arguments, stdout, stderr) 155 logger.setLevel(logging.WARNING - 10 * args.v) 156 157 request_id = arvados.util.new_request_id() 158 logger.info('X-Request-Id: '+request_id) 159 160 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries) 161 162 r = re.search(r'^(.*?)(/.*)?$', args.locator) 163 col_loc = r.group(1) 164 get_prefix = r.group(2) 165 if args.r and not get_prefix: 166 get_prefix = os.sep 167 168 # User asked to download the collection's manifest 169 if not get_prefix: 170 if not args.n: 171 open_flags = os.O_CREAT | os.O_WRONLY 172 if not args.f: 173 open_flags |= os.O_EXCL 174 try: 175 if args.destination == "-": 176 write_block_or_manifest( 177 dest=stdout, src=col_loc, 178 api_client=api_client, args=args) 179 else: 180 out_fd = os.open(args.destination, open_flags) 181 with os.fdopen(out_fd, 'wb') as out_file: 182 write_block_or_manifest( 183 dest=out_file, src=col_loc, 184 api_client=api_client, args=args) 185 except (IOError, OSError) as error: 186 logger.error("can't write to '{}': {}".format(args.destination, error)) 187 return 1 188 except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error: 189 logger.error("failed to download '{}': {}".format(col_loc, error)) 190 return 1 191 except arvados.errors.ArgumentError as error: 192 if 'Argument to CollectionReader' in str(error): 193 logger.error("error reading collection: {}".format(error)) 194 return 1 195 else: 196 raise 197 return 0 198 199 try: 200 reader = arvados.CollectionReader( 201 col_loc, api_client=api_client, num_retries=args.retries, 202 keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads)) 203 except Exception as error: 204 logger.error("failed to read collection: {}".format(error)) 205 return 1 206 207 # Scan the collection. Make an array of (stream, file, local 208 # destination filename) tuples, and add up total size to extract. 209 todo = [] 210 todo_bytes = 0 211 try: 212 if get_prefix == os.sep: 213 item = reader 214 else: 215 item = reader.find('.' + get_prefix) 216 217 if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader): 218 # If the user asked for a file and we got a subcollection, error out. 219 if get_prefix[-1] != os.sep: 220 logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix)) 221 return 1 222 # If the user asked stdout as a destination, error out. 223 elif args.destination == '-': 224 logger.error("cannot use 'stdout' as destination when downloading multiple files.") 225 return 1 226 # User asked for a subcollection, and that's what was found. Add up total size 227 # to download. 228 for s, f in files_in_collection(item): 229 dest_path = os.path.join( 230 args.destination, 231 os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:]) 232 if (not (args.n or args.f or args.skip_existing) and 233 os.path.exists(dest_path)): 234 logger.error('Local file %s already exists.' % (dest_path,)) 235 return 1 236 todo += [(s, f, dest_path)] 237 todo_bytes += f.size() 238 elif isinstance(item, arvados.arvfile.ArvadosFile): 239 todo += [(item.parent, item, args.destination)] 240 todo_bytes += item.size() 241 else: 242 logger.error("'{}' not found.".format('.' + get_prefix)) 243 return 1 244 except (IOError, arvados.errors.NotFoundError) as e: 245 logger.error(e) 246 return 1 247 248 out_bytes = 0 249 for s, f, outfilename in todo: 250 outfile = None 251 digestor = None 252 if not args.n: 253 if outfilename == "-": 254 outfile = stdout 255 else: 256 if args.skip_existing and os.path.exists(outfilename): 257 logger.debug('Local file %s exists. Skipping.', outfilename) 258 continue 259 elif not args.f and (os.path.isfile(outfilename) or 260 os.path.isdir(outfilename)): 261 # Good thing we looked again: apparently this file wasn't 262 # here yet when we checked earlier. 263 logger.error('Local file %s already exists.' % (outfilename,)) 264 return 1 265 if args.r: 266 pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True) 267 try: 268 outfile = open(outfilename, 'wb') 269 except Exception as error: 270 logger.error('Open(%s) failed: %s' % (outfilename, error)) 271 return 1 272 if args.hash: 273 digestor = hashlib.new(args.hash) 274 try: 275 with s.open(f.name, 'rb') as file_reader: 276 for data in file_reader.readall(): 277 if outfile: 278 outfile.write(data) 279 if digestor: 280 digestor.update(data) 281 out_bytes += len(data) 282 if args.progress: 283 stderr.write('\r%d MiB / %d MiB %.1f%%' % 284 (out_bytes >> 20, 285 todo_bytes >> 20, 286 (100 287 if todo_bytes==0 288 else 100.0*out_bytes/todo_bytes))) 289 elif args.batch_progress: 290 stderr.write('%s %d read %d total %d\n' % 291 (sys.argv[0], os.getpid(), 292 out_bytes, todo_bytes)) 293 if digestor: 294 stderr.write("%s %s/%s\n" 295 % (digestor.hexdigest(), s.stream_name(), f.name)) 296 except KeyboardInterrupt: 297 if outfile and (outfile.fileno() > 2) and not outfile.closed: 298 os.unlink(outfile.name) 299 break 300 finally: 301 if outfile != None and outfile != stdout: 302 outfile.close() 303 304 if args.progress: 305 stderr.write('\n') 306 return 0
def
files_in_collection(c):
308def files_in_collection(c): 309 # Sort first by file type, then alphabetically by file path. 310 for i in sorted(list(c.keys()), 311 key=lambda k: ( 312 isinstance(c[k], arvados.collection.Subcollection), 313 k.upper())): 314 if isinstance(c[i], arvados.arvfile.ArvadosFile): 315 yield (c, c[i]) 316 elif isinstance(c[i], arvados.collection.Subcollection): 317 for s, f in files_in_collection(c[i]): 318 yield (s, f)
def
write_block_or_manifest(dest, src, api_client, args):
320def write_block_or_manifest(dest, src, api_client, args): 321 if '+A' in src: 322 # block locator 323 kc = arvados.keep.KeepClient(api_client=api_client) 324 dest.write(kc.get(src, num_retries=args.retries)) 325 else: 326 # collection UUID or portable data hash 327 reader = arvados.CollectionReader( 328 src, api_client=api_client, num_retries=args.retries) 329 dest.write(reader.manifest_text(strip=args.strip_manifest).encode())