Expand source code
#!/usr/bin/env python3
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
import argparse
import hashlib
import os
import re
import string
import sys
import logging
import arvados
import arvados.commands._util as arv_cmd
import arvados.util as util
from arvados._version import __version__
logger = logging.getLogger('arvados.arv-get')
parser = argparse.ArgumentParser(
description='Copy data from Keep to a local file or pipe.',
parents=[arv_cmd.retry_opt])
parser.add_argument('--version', action='version',
version="%s %s" % (sys.argv[0], __version__),
help='Print version and exit.')
parser.add_argument('locator', type=str,
help="""
Collection locator, optionally with a file path or prefix.
""")
parser.add_argument('destination', type=str, nargs='?', default='-',
help="""
Local file or directory where the data is to be written. Default: stdout.
""")
group = parser.add_mutually_exclusive_group()
group.add_argument('--progress', action='store_true',
help="""
Display human-readable progress on stderr (bytes and, if possible,
percentage of total data size). This is the default behavior when it
is not expected to interfere with the output: specifically, stderr is
a tty _and_ either stdout is not a tty, or output is being written to
named files rather than stdout.
""")
group.add_argument('--no-progress', action='store_true',
help="""
Do not display human-readable progress on stderr.
""")
group.add_argument('--batch-progress', action='store_true',
help="""
Display machine-readable progress on stderr (bytes and, if known,
total data size).
""")
group = parser.add_mutually_exclusive_group()
group.add_argument('--hash',
help="""
Display the hash of each file as it is read from Keep, using the given
hash algorithm. Supported algorithms include md5, sha1, sha224,
sha256, sha384, and sha512.
""")
group.add_argument('--md5sum', action='store_const',
dest='hash', const='md5',
help="""
Display the MD5 hash of each file as it is read from Keep.
""")
parser.add_argument('-n', action='store_true',
help="""
Do not write any data -- just read from Keep, and report md5sums if
requested.
""")
parser.add_argument('-r', action='store_true',
help="""
Retrieve all files in the specified collection/prefix. This is the
default behavior if the "locator" argument ends with a forward slash.
""")
group = parser.add_mutually_exclusive_group()
group.add_argument('-f', action='store_true',
help="""
Overwrite existing files while writing. The default behavior is to
refuse to write *anything* if any of the output files already
exist. As a special case, -f is not needed to write to stdout.
""")
group.add_argument('-v', action='count', default=0,
help="""
Once for verbose mode, twice for debug mode.
""")
group.add_argument('--skip-existing', action='store_true',
help="""
Skip files that already exist. The default behavior is to refuse to
write *anything* if any files exist that would have to be
overwritten. This option causes even devices, sockets, and fifos to be
skipped.
""")
group.add_argument('--strip-manifest', action='store_true', default=False,
help="""
When getting a collection manifest, strip its access tokens before writing
it.
""")
parser.add_argument('--threads', type=int, metavar='N', default=4,
help="""
Set the number of download threads to be used. Take into account that
using lots of threads will increase the RAM requirements. Default is
to use 4 threads.
On high latency installations, using a greater number will improve
overall throughput.
""")
def parse_arguments(arguments, stdout, stderr):
args = parser.parse_args(arguments)
if args.locator[-1] == os.sep:
args.r = True
if (args.r and
not args.n and
not (args.destination and
os.path.isdir(args.destination))):
parser.error('Destination is not a directory.')
if not args.r and (os.path.isdir(args.destination) or
args.destination[-1] == os.path.sep):
args.destination = os.path.join(args.destination,
os.path.basename(args.locator))
logger.debug("Appended source file name to destination directory: %s",
args.destination)
if args.destination == '/dev/stdout':
args.destination = "-"
if args.destination == '-':
# Normally you have to use -f to write to a file (or device) that
# already exists, but "-" and "/dev/stdout" are common enough to
# merit a special exception.
args.f = True
else:
args.destination = args.destination.rstrip(os.sep)
# Turn on --progress by default if stderr is a tty and output is
# either going to a named file, or going (via stdout) to something
# that isn't a tty.
if (not (args.batch_progress or args.no_progress)
and stderr.isatty()
and (args.destination != '-'
or not stdout.isatty())):
args.progress = True
return args
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
if stdout is sys.stdout and hasattr(stdout, 'buffer'):
# in Python 3, write to stdout as binary
stdout = stdout.buffer
args = parse_arguments(arguments, stdout, stderr)
logger.setLevel(logging.WARNING - 10 * args.v)
request_id = arvados.util.new_request_id()
logger.info('X-Request-Id: '+request_id)
api_client = arvados.api('v1', request_id=request_id)
r = re.search(r'^(.*?)(/.*)?$', args.locator)
col_loc = r.group(1)
get_prefix = r.group(2)
if args.r and not get_prefix:
get_prefix = os.sep
# User asked to download the collection's manifest
if not get_prefix:
if not args.n:
open_flags = os.O_CREAT | os.O_WRONLY
if not args.f:
open_flags |= os.O_EXCL
try:
if args.destination == "-":
write_block_or_manifest(
dest=stdout, src=col_loc,
api_client=api_client, args=args)
else:
out_fd = os.open(args.destination, open_flags)
with os.fdopen(out_fd, 'wb') as out_file:
write_block_or_manifest(
dest=out_file, src=col_loc,
api_client=api_client, args=args)
except (IOError, OSError) as error:
logger.error("can't write to '{}': {}".format(args.destination, error))
return 1
except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
logger.error("failed to download '{}': {}".format(col_loc, error))
return 1
except arvados.errors.ArgumentError as error:
if 'Argument to CollectionReader' in str(error):
logger.error("error reading collection: {}".format(error))
return 1
else:
raise
return 0
try:
reader = arvados.CollectionReader(
col_loc, api_client=api_client, num_retries=args.retries,
keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
get_threads=args.threads)
except Exception as error:
logger.error("failed to read collection: {}".format(error))
return 1
# Scan the collection. Make an array of (stream, file, local
# destination filename) tuples, and add up total size to extract.
todo = []
todo_bytes = 0
try:
if get_prefix == os.sep:
item = reader
else:
item = reader.find('.' + get_prefix)
if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
# If the user asked for a file and we got a subcollection, error out.
if get_prefix[-1] != os.sep:
logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
return 1
# If the user asked stdout as a destination, error out.
elif args.destination == '-':
logger.error("cannot use 'stdout' as destination when downloading multiple files.")
return 1
# User asked for a subcollection, and that's what was found. Add up total size
# to download.
for s, f in files_in_collection(item):
dest_path = os.path.join(
args.destination,
os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
if (not (args.n or args.f or args.skip_existing) and
os.path.exists(dest_path)):
logger.error('Local file %s already exists.' % (dest_path,))
return 1
todo += [(s, f, dest_path)]
todo_bytes += f.size()
elif isinstance(item, arvados.arvfile.ArvadosFile):
todo += [(item.parent, item, args.destination)]
todo_bytes += item.size()
else:
logger.error("'{}' not found.".format('.' + get_prefix))
return 1
except (IOError, arvados.errors.NotFoundError) as e:
logger.error(e)
return 1
out_bytes = 0
for s, f, outfilename in todo:
outfile = None
digestor = None
if not args.n:
if outfilename == "-":
outfile = stdout
else:
if args.skip_existing and os.path.exists(outfilename):
logger.debug('Local file %s exists. Skipping.', outfilename)
continue
elif not args.f and (os.path.isfile(outfilename) or
os.path.isdir(outfilename)):
# Good thing we looked again: apparently this file wasn't
# here yet when we checked earlier.
logger.error('Local file %s already exists.' % (outfilename,))
return 1
if args.r:
arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
try:
outfile = open(outfilename, 'wb')
except Exception as error:
logger.error('Open(%s) failed: %s' % (outfilename, error))
return 1
if args.hash:
digestor = hashlib.new(args.hash)
try:
with s.open(f.name, 'rb') as file_reader:
for data in file_reader.readall():
if outfile:
outfile.write(data)
if digestor:
digestor.update(data)
out_bytes += len(data)
if args.progress:
stderr.write('\r%d MiB / %d MiB %.1f%%' %
(out_bytes >> 20,
todo_bytes >> 20,
(100
if todo_bytes==0
else 100.0*out_bytes/todo_bytes)))
elif args.batch_progress:
stderr.write('%s %d read %d total %d\n' %
(sys.argv[0], os.getpid(),
out_bytes, todo_bytes))
if digestor:
stderr.write("%s %s/%s\n"
% (digestor.hexdigest(), s.stream_name(), f.name))
except KeyboardInterrupt:
if outfile and (outfile.fileno() > 2) and not outfile.closed:
os.unlink(outfile.name)
break
finally:
if outfile != None and outfile != stdout:
outfile.close()
if args.progress:
stderr.write('\n')
return 0
def files_in_collection(c):
# Sort first by file type, then alphabetically by file path.
for i in sorted(list(c.keys()),
key=lambda k: (
isinstance(c[k], arvados.collection.Subcollection),
k.upper())):
if isinstance(c[i], arvados.arvfile.ArvadosFile):
yield (c, c[i])
elif isinstance(c[i], arvados.collection.Subcollection):
for s, f in files_in_collection(c[i]):
yield (s, f)
def write_block_or_manifest(dest, src, api_client, args):
if '+A' in src:
# block locator
kc = arvados.keep.KeepClient(api_client=api_client)
dest.write(kc.get(src, num_retries=args.retries))
else:
# collection UUID or portable data hash
reader = arvados.CollectionReader(
src, api_client=api_client, num_retries=args.retries)
dest.write(reader.manifest_text(strip=args.strip_manifest).encode())