arvados.commands.get

  1#!/usr/bin/env python3
  2# Copyright (C) The Arvados Authors. All rights reserved.
  3#
  4# SPDX-License-Identifier: Apache-2.0
  5
  6import argparse
  7import hashlib
  8import os
  9import pathlib
 10import re
 11import string
 12import sys
 13import logging
 14
 15import arvados
 16import arvados.commands._util as arv_cmd
 17import arvados.util as util
 18
 19from arvados._version import __version__
 20
 21logger = logging.getLogger('arvados.arv-get')
 22
 23parser = argparse.ArgumentParser(
 24    description='Copy data from Keep to a local file or pipe.',
 25    parents=[arv_cmd.retry_opt])
 26parser.add_argument('--version', action='version',
 27                    version="%s %s" % (sys.argv[0], __version__),
 28                    help='Print version and exit.')
 29parser.add_argument('locator', type=str,
 30                    help="""
 31Collection locator, optionally with a file path or prefix.
 32""")
 33parser.add_argument('destination', type=str, nargs='?', default='-',
 34                    help="""
 35Local file or directory where the data is to be written. Default: stdout.
 36""")
 37group = parser.add_mutually_exclusive_group()
 38group.add_argument('--progress', action='store_true',
 39                   help="""
 40Display human-readable progress on stderr (bytes and, if possible,
 41percentage of total data size). This is the default behavior when it
 42is not expected to interfere with the output: specifically, stderr is
 43a tty _and_ either stdout is not a tty, or output is being written to
 44named files rather than stdout.
 45""")
 46group.add_argument('--no-progress', action='store_true',
 47                   help="""
 48Do not display human-readable progress on stderr.
 49""")
 50group.add_argument('--batch-progress', action='store_true',
 51                   help="""
 52Display machine-readable progress on stderr (bytes and, if known,
 53total data size).
 54""")
 55group = parser.add_mutually_exclusive_group()
 56group.add_argument('--hash',
 57                    help="""
 58Display the hash of each file as it is read from Keep, using the given
 59hash algorithm. Supported algorithms include md5, sha1, sha224,
 60sha256, sha384, and sha512.
 61""")
 62group.add_argument('--md5sum', action='store_const',
 63                    dest='hash', const='md5',
 64                    help="""
 65Display the MD5 hash of each file as it is read from Keep.
 66""")
 67parser.add_argument('-n', action='store_true',
 68                    help="""
 69Do not write any data -- just read from Keep, and report md5sums if
 70requested.
 71""")
 72parser.add_argument('-r', action='store_true',
 73                    help="""
 74Retrieve all files in the specified collection/prefix. This is the
 75default behavior if the "locator" argument ends with a forward slash.
 76""")
 77group = parser.add_mutually_exclusive_group()
 78group.add_argument('-f', action='store_true',
 79                   help="""
 80Overwrite existing files while writing. The default behavior is to
 81refuse to write *anything* if any of the output files already
 82exist. As a special case, -f is not needed to write to stdout.
 83""")
 84group.add_argument('-v', action='count', default=0,
 85                    help="""
 86Once for verbose mode, twice for debug mode.
 87""")
 88group.add_argument('--skip-existing', action='store_true',
 89                   help="""
 90Skip files that already exist. The default behavior is to refuse to
 91write *anything* if any files exist that would have to be
 92overwritten. This option causes even devices, sockets, and fifos to be
 93skipped.
 94""")
 95group.add_argument('--strip-manifest', action='store_true', default=False,
 96                   help="""
 97When getting a collection manifest, strip its access tokens before writing
 98it.
 99""")
100
101parser.add_argument('--threads', type=int, metavar='N', default=4,
102                    help="""
103Set the number of download threads to be used. Take into account that
104using lots of threads will increase the RAM requirements. Default is
105to use 4 threads.
106On high latency installations, using a greater number will improve
107overall throughput.
108""")
109
110def parse_arguments(arguments, stdout, stderr):
111    args = parser.parse_args(arguments)
112
113    if args.locator[-1] == os.sep:
114        args.r = True
115    if (args.r and
116        not args.n and
117        not (args.destination and
118             os.path.isdir(args.destination))):
119        parser.error('Destination is not a directory.')
120    if not args.r and (os.path.isdir(args.destination) or
121                       args.destination[-1] == os.path.sep):
122        args.destination = os.path.join(args.destination,
123                                        os.path.basename(args.locator))
124        logger.debug("Appended source file name to destination directory: %s",
125                     args.destination)
126
127    if args.destination == '/dev/stdout':
128        args.destination = "-"
129
130    if args.destination == '-':
131        # Normally you have to use -f to write to a file (or device) that
132        # already exists, but "-" and "/dev/stdout" are common enough to
133        # merit a special exception.
134        args.f = True
135    else:
136        args.destination = args.destination.rstrip(os.sep)
137
138    # Turn on --progress by default if stderr is a tty and output is
139    # either going to a named file, or going (via stdout) to something
140    # that isn't a tty.
141    if (not (args.batch_progress or args.no_progress)
142        and stderr.isatty()
143        and (args.destination != '-'
144             or not stdout.isatty())):
145        args.progress = True
146    return args
147
148def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
149    if stdout is sys.stdout and hasattr(stdout, 'buffer'):
150        # in Python 3, write to stdout as binary
151        stdout = stdout.buffer
152
153    args = parse_arguments(arguments, stdout, stderr)
154    logger.setLevel(logging.WARNING - 10 * args.v)
155
156    request_id = arvados.util.new_request_id()
157    logger.info('X-Request-Id: '+request_id)
158
159    api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
160
161    r = re.search(r'^(.*?)(/.*)?$', args.locator)
162    col_loc = r.group(1)
163    get_prefix = r.group(2)
164    if args.r and not get_prefix:
165        get_prefix = os.sep
166
167    # User asked to download the collection's manifest
168    if not get_prefix:
169        if not args.n:
170            open_flags = os.O_CREAT | os.O_WRONLY
171            if not args.f:
172                open_flags |= os.O_EXCL
173            try:
174                if args.destination == "-":
175                    write_block_or_manifest(
176                        dest=stdout, src=col_loc,
177                        api_client=api_client, args=args)
178                else:
179                    out_fd = os.open(args.destination, open_flags)
180                    with os.fdopen(out_fd, 'wb') as out_file:
181                        write_block_or_manifest(
182                            dest=out_file, src=col_loc,
183                            api_client=api_client, args=args)
184            except (IOError, OSError) as error:
185                logger.error("can't write to '{}': {}".format(args.destination, error))
186                return 1
187            except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
188                logger.error("failed to download '{}': {}".format(col_loc, error))
189                return 1
190            except arvados.errors.ArgumentError as error:
191                if 'Argument to CollectionReader' in str(error):
192                    logger.error("error reading collection: {}".format(error))
193                    return 1
194                else:
195                    raise
196        return 0
197
198    try:
199        reader = arvados.CollectionReader(
200            col_loc, api_client=api_client, num_retries=args.retries,
201            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
202    except Exception as error:
203        logger.error("failed to read collection: {}".format(error))
204        return 1
205
206    # Scan the collection. Make an array of (stream, file, local
207    # destination filename) tuples, and add up total size to extract.
208    todo = []
209    todo_bytes = 0
210    try:
211        if get_prefix == os.sep:
212            item = reader
213        else:
214            item = reader.find('.' + get_prefix)
215
216        if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
217            # If the user asked for a file and we got a subcollection, error out.
218            if get_prefix[-1] != os.sep:
219                logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
220                return 1
221            # If the user asked stdout as a destination, error out.
222            elif args.destination == '-':
223                logger.error("cannot use 'stdout' as destination when downloading multiple files.")
224                return 1
225            # User asked for a subcollection, and that's what was found. Add up total size
226            # to download.
227            for s, f in files_in_collection(item):
228                dest_path = os.path.join(
229                    args.destination,
230                    os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
231                if (not (args.n or args.f or args.skip_existing) and
232                    os.path.exists(dest_path)):
233                    logger.error('Local file %s already exists.' % (dest_path,))
234                    return 1
235                todo += [(s, f, dest_path)]
236                todo_bytes += f.size()
237        elif isinstance(item, arvados.arvfile.ArvadosFile):
238            todo += [(item.parent, item, args.destination)]
239            todo_bytes += item.size()
240        else:
241            logger.error("'{}' not found.".format('.' + get_prefix))
242            return 1
243    except (IOError, arvados.errors.NotFoundError) as e:
244        logger.error(e)
245        return 1
246
247    out_bytes = 0
248    for s, f, outfilename in todo:
249        outfile = None
250        digestor = None
251        if not args.n:
252            if outfilename == "-":
253                outfile = stdout
254            else:
255                if args.skip_existing and os.path.exists(outfilename):
256                    logger.debug('Local file %s exists. Skipping.', outfilename)
257                    continue
258                elif not args.f and (os.path.isfile(outfilename) or
259                                   os.path.isdir(outfilename)):
260                    # Good thing we looked again: apparently this file wasn't
261                    # here yet when we checked earlier.
262                    logger.error('Local file %s already exists.' % (outfilename,))
263                    return 1
264                if args.r:
265                    pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True)
266                try:
267                    outfile = open(outfilename, 'wb')
268                except Exception as error:
269                    logger.error('Open(%s) failed: %s' % (outfilename, error))
270                    return 1
271        if args.hash:
272            digestor = hashlib.new(args.hash)
273        try:
274            with s.open(f.name, 'rb') as file_reader:
275                for data in file_reader.readall():
276                    if outfile:
277                        outfile.write(data)
278                    if digestor:
279                        digestor.update(data)
280                    out_bytes += len(data)
281                    if args.progress:
282                        stderr.write('\r%d MiB / %d MiB %.1f%%' %
283                                     (out_bytes >> 20,
284                                      todo_bytes >> 20,
285                                      (100
286                                       if todo_bytes==0
287                                       else 100.0*out_bytes/todo_bytes)))
288                    elif args.batch_progress:
289                        stderr.write('%s %d read %d total %d\n' %
290                                     (sys.argv[0], os.getpid(),
291                                      out_bytes, todo_bytes))
292            if digestor:
293                stderr.write("%s  %s/%s\n"
294                             % (digestor.hexdigest(), s.stream_name(), f.name))
295        except KeyboardInterrupt:
296            if outfile and (outfile.fileno() > 2) and not outfile.closed:
297                os.unlink(outfile.name)
298            break
299        finally:
300            if outfile != None and outfile != stdout:
301                outfile.close()
302
303    if args.progress:
304        stderr.write('\n')
305    return 0
306
307def files_in_collection(c):
308    # Sort first by file type, then alphabetically by file path.
309    for i in sorted(list(c.keys()),
310                    key=lambda k: (
311                        isinstance(c[k], arvados.collection.Subcollection),
312                        k.upper())):
313        if isinstance(c[i], arvados.arvfile.ArvadosFile):
314            yield (c, c[i])
315        elif isinstance(c[i], arvados.collection.Subcollection):
316            for s, f in files_in_collection(c[i]):
317                yield (s, f)
318
319def write_block_or_manifest(dest, src, api_client, args):
320    if '+A' in src:
321        # block locator
322        kc = arvados.keep.KeepClient(api_client=api_client)
323        dest.write(kc.get(src, num_retries=args.retries))
324    else:
325        # collection UUID or portable data hash
326        reader = arvados.CollectionReader(
327            src, api_client=api_client, num_retries=args.retries)
328        dest.write(reader.manifest_text(strip=args.strip_manifest).encode())
logger = <Logger arvados.arv-get (WARNING)>
parser = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description='Copy data from Keep to a local file or pipe.', formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=True)
group = <argparse._MutuallyExclusiveGroup object>
def parse_arguments(arguments, stdout, stderr):
111def parse_arguments(arguments, stdout, stderr):
112    args = parser.parse_args(arguments)
113
114    if args.locator[-1] == os.sep:
115        args.r = True
116    if (args.r and
117        not args.n and
118        not (args.destination and
119             os.path.isdir(args.destination))):
120        parser.error('Destination is not a directory.')
121    if not args.r and (os.path.isdir(args.destination) or
122                       args.destination[-1] == os.path.sep):
123        args.destination = os.path.join(args.destination,
124                                        os.path.basename(args.locator))
125        logger.debug("Appended source file name to destination directory: %s",
126                     args.destination)
127
128    if args.destination == '/dev/stdout':
129        args.destination = "-"
130
131    if args.destination == '-':
132        # Normally you have to use -f to write to a file (or device) that
133        # already exists, but "-" and "/dev/stdout" are common enough to
134        # merit a special exception.
135        args.f = True
136    else:
137        args.destination = args.destination.rstrip(os.sep)
138
139    # Turn on --progress by default if stderr is a tty and output is
140    # either going to a named file, or going (via stdout) to something
141    # that isn't a tty.
142    if (not (args.batch_progress or args.no_progress)
143        and stderr.isatty()
144        and (args.destination != '-'
145             or not stdout.isatty())):
146        args.progress = True
147    return args
def main( arguments=None, stdout=<_io.StringIO object>, stderr=<_io.StringIO object>):
149def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
150    if stdout is sys.stdout and hasattr(stdout, 'buffer'):
151        # in Python 3, write to stdout as binary
152        stdout = stdout.buffer
153
154    args = parse_arguments(arguments, stdout, stderr)
155    logger.setLevel(logging.WARNING - 10 * args.v)
156
157    request_id = arvados.util.new_request_id()
158    logger.info('X-Request-Id: '+request_id)
159
160    api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
161
162    r = re.search(r'^(.*?)(/.*)?$', args.locator)
163    col_loc = r.group(1)
164    get_prefix = r.group(2)
165    if args.r and not get_prefix:
166        get_prefix = os.sep
167
168    # User asked to download the collection's manifest
169    if not get_prefix:
170        if not args.n:
171            open_flags = os.O_CREAT | os.O_WRONLY
172            if not args.f:
173                open_flags |= os.O_EXCL
174            try:
175                if args.destination == "-":
176                    write_block_or_manifest(
177                        dest=stdout, src=col_loc,
178                        api_client=api_client, args=args)
179                else:
180                    out_fd = os.open(args.destination, open_flags)
181                    with os.fdopen(out_fd, 'wb') as out_file:
182                        write_block_or_manifest(
183                            dest=out_file, src=col_loc,
184                            api_client=api_client, args=args)
185            except (IOError, OSError) as error:
186                logger.error("can't write to '{}': {}".format(args.destination, error))
187                return 1
188            except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
189                logger.error("failed to download '{}': {}".format(col_loc, error))
190                return 1
191            except arvados.errors.ArgumentError as error:
192                if 'Argument to CollectionReader' in str(error):
193                    logger.error("error reading collection: {}".format(error))
194                    return 1
195                else:
196                    raise
197        return 0
198
199    try:
200        reader = arvados.CollectionReader(
201            col_loc, api_client=api_client, num_retries=args.retries,
202            keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024), num_prefetch_threads=args.threads))
203    except Exception as error:
204        logger.error("failed to read collection: {}".format(error))
205        return 1
206
207    # Scan the collection. Make an array of (stream, file, local
208    # destination filename) tuples, and add up total size to extract.
209    todo = []
210    todo_bytes = 0
211    try:
212        if get_prefix == os.sep:
213            item = reader
214        else:
215            item = reader.find('.' + get_prefix)
216
217        if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
218            # If the user asked for a file and we got a subcollection, error out.
219            if get_prefix[-1] != os.sep:
220                logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
221                return 1
222            # If the user asked stdout as a destination, error out.
223            elif args.destination == '-':
224                logger.error("cannot use 'stdout' as destination when downloading multiple files.")
225                return 1
226            # User asked for a subcollection, and that's what was found. Add up total size
227            # to download.
228            for s, f in files_in_collection(item):
229                dest_path = os.path.join(
230                    args.destination,
231                    os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
232                if (not (args.n or args.f or args.skip_existing) and
233                    os.path.exists(dest_path)):
234                    logger.error('Local file %s already exists.' % (dest_path,))
235                    return 1
236                todo += [(s, f, dest_path)]
237                todo_bytes += f.size()
238        elif isinstance(item, arvados.arvfile.ArvadosFile):
239            todo += [(item.parent, item, args.destination)]
240            todo_bytes += item.size()
241        else:
242            logger.error("'{}' not found.".format('.' + get_prefix))
243            return 1
244    except (IOError, arvados.errors.NotFoundError) as e:
245        logger.error(e)
246        return 1
247
248    out_bytes = 0
249    for s, f, outfilename in todo:
250        outfile = None
251        digestor = None
252        if not args.n:
253            if outfilename == "-":
254                outfile = stdout
255            else:
256                if args.skip_existing and os.path.exists(outfilename):
257                    logger.debug('Local file %s exists. Skipping.', outfilename)
258                    continue
259                elif not args.f and (os.path.isfile(outfilename) or
260                                   os.path.isdir(outfilename)):
261                    # Good thing we looked again: apparently this file wasn't
262                    # here yet when we checked earlier.
263                    logger.error('Local file %s already exists.' % (outfilename,))
264                    return 1
265                if args.r:
266                    pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True)
267                try:
268                    outfile = open(outfilename, 'wb')
269                except Exception as error:
270                    logger.error('Open(%s) failed: %s' % (outfilename, error))
271                    return 1
272        if args.hash:
273            digestor = hashlib.new(args.hash)
274        try:
275            with s.open(f.name, 'rb') as file_reader:
276                for data in file_reader.readall():
277                    if outfile:
278                        outfile.write(data)
279                    if digestor:
280                        digestor.update(data)
281                    out_bytes += len(data)
282                    if args.progress:
283                        stderr.write('\r%d MiB / %d MiB %.1f%%' %
284                                     (out_bytes >> 20,
285                                      todo_bytes >> 20,
286                                      (100
287                                       if todo_bytes==0
288                                       else 100.0*out_bytes/todo_bytes)))
289                    elif args.batch_progress:
290                        stderr.write('%s %d read %d total %d\n' %
291                                     (sys.argv[0], os.getpid(),
292                                      out_bytes, todo_bytes))
293            if digestor:
294                stderr.write("%s  %s/%s\n"
295                             % (digestor.hexdigest(), s.stream_name(), f.name))
296        except KeyboardInterrupt:
297            if outfile and (outfile.fileno() > 2) and not outfile.closed:
298                os.unlink(outfile.name)
299            break
300        finally:
301            if outfile != None and outfile != stdout:
302                outfile.close()
303
304    if args.progress:
305        stderr.write('\n')
306    return 0
def files_in_collection(c):
308def files_in_collection(c):
309    # Sort first by file type, then alphabetically by file path.
310    for i in sorted(list(c.keys()),
311                    key=lambda k: (
312                        isinstance(c[k], arvados.collection.Subcollection),
313                        k.upper())):
314        if isinstance(c[i], arvados.arvfile.ArvadosFile):
315            yield (c, c[i])
316        elif isinstance(c[i], arvados.collection.Subcollection):
317            for s, f in files_in_collection(c[i]):
318                yield (s, f)
def write_block_or_manifest(dest, src, api_client, args):
320def write_block_or_manifest(dest, src, api_client, args):
321    if '+A' in src:
322        # block locator
323        kc = arvados.keep.KeepClient(api_client=api_client)
324        dest.write(kc.get(src, num_retries=args.retries))
325    else:
326        # collection UUID or portable data hash
327        reader = arvados.CollectionReader(
328            src, api_client=api_client, num_retries=args.retries)
329        dest.write(reader.manifest_text(strip=args.strip_manifest).encode())