arvados.commands.put

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import argparse
   6import arvados
   7import arvados.collection
   8import base64
   9import ciso8601
  10import copy
  11import datetime
  12import errno
  13import fcntl
  14import fnmatch
  15import hashlib
  16import json
  17import logging
  18import os
  19import pwd
  20import re
  21import signal
  22import socket
  23import sys
  24import tempfile
  25import threading
  26import time
  27import traceback
  28
  29from pathlib import Path
  30
  31import arvados.util
  32import arvados.commands._util as arv_cmd
  33
  34from apiclient import errors as apiclient_errors
  35from arvados._internal import basedirs
  36from arvados._version import __version__
  37
  38api_client = None
  39
  40upload_opts = argparse.ArgumentParser(add_help=False)
  41
  42upload_opts.add_argument('--version', action='version',
  43                         version="%s %s" % (sys.argv[0], __version__),
  44                         help='Print version and exit.')
  45upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
  46                         help="""
  47Local file or directory. If path is a directory reference with a trailing
  48slash, then just upload the directory's contents; otherwise upload the
  49directory itself. Default: read from standard input.
  50""")
  51
  52_group = upload_opts.add_mutually_exclusive_group()
  53
  54_group.add_argument('--max-manifest-depth', type=int, metavar='N',
  55                    default=-1, help=argparse.SUPPRESS)
  56
  57_group.add_argument('--normalize', action='store_true',
  58                    help="""
  59Normalize the manifest by re-ordering files and streams after writing
  60data.
  61""")
  62
  63_group.add_argument('--dry-run', action='store_true', default=False,
  64                    help="""
  65Don't actually upload files, but only check if any file should be
  66uploaded. Exit with code=2 when files are pending for upload.
  67""")
  68
  69_group = upload_opts.add_mutually_exclusive_group()
  70
  71_group.add_argument('--as-stream', action='store_true', dest='stream',
  72                    help="""
  73Synonym for --stream.
  74""")
  75
  76_group.add_argument('--stream', action='store_true',
  77                    help="""
  78Store the file content and display the resulting manifest on
  79stdout. Do not save a Collection object in Arvados.
  80""")
  81
  82_group.add_argument('--as-manifest', action='store_true', dest='manifest',
  83                    help="""
  84Synonym for --manifest.
  85""")
  86
  87_group.add_argument('--in-manifest', action='store_true', dest='manifest',
  88                    help="""
  89Synonym for --manifest.
  90""")
  91
  92_group.add_argument('--manifest', action='store_true',
  93                    help="""
  94Store the file data and resulting manifest in Keep, save a Collection
  95object in Arvados, and display the manifest locator (Collection uuid)
  96on stdout. This is the default behavior.
  97""")
  98
  99_group.add_argument('--as-raw', action='store_true', dest='raw',
 100                    help="""
 101Synonym for --raw.
 102""")
 103
 104_group.add_argument('--raw', action='store_true',
 105                    help="""
 106Store the file content and display the data block locators on stdout,
 107separated by commas, with a trailing newline. Do not store a
 108manifest.
 109""")
 110
 111upload_opts.add_argument('--update-collection', type=str, default=None,
 112                         dest='update_collection', metavar="UUID", help="""
 113Update an existing collection identified by the given Arvados collection
 114UUID. All new local files will be uploaded.
 115""")
 116
 117upload_opts.add_argument('--use-filename', type=str, default=None,
 118                         dest='filename', help="""
 119Synonym for --filename.
 120""")
 121
 122upload_opts.add_argument('--filename', type=str, default=None,
 123                         help="""
 124Use the given filename in the manifest, instead of the name of the
 125local file. This is useful when "-" or "/dev/stdin" is given as an
 126input file. It can be used only if there is exactly one path given and
 127it is not a directory. Implies --manifest.
 128""")
 129
 130upload_opts.add_argument('--portable-data-hash', action='store_true',
 131                         help="""
 132Print the portable data hash instead of the Arvados UUID for the collection
 133created by the upload.
 134""")
 135
 136upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
 137                         help="""
 138Set the replication level for the new collection: how many different
 139physical storage devices (e.g., disks) should have a copy of each data
 140block. Default is to use the server-provided default (if any) or 2.
 141""")
 142
 143upload_opts.add_argument(
 144    '--storage-classes',
 145    type=arv_cmd.UniqueSplit(),
 146    help="""
 147Specify comma separated list of storage classes to be used when saving data to Keep.
 148""")
 149
 150upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
 151                         help="""
 152Set the number of upload threads to be used. Take into account that
 153using lots of threads will increase the RAM requirements. Default is
 154to use 2 threads.
 155On high latency installations, using a greater number will improve
 156overall throughput.
 157""")
 158
 159upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
 160                      action='append', help="""
 161Exclude files and directories whose names match the given glob pattern. When
 162using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
 163directory, relative to the provided input dirs will be excluded.
 164When using a filename pattern like '*.txt', any text file will be excluded
 165no matter where it is placed.
 166For the special case of needing to exclude only files or dirs directly below
 167the given input directory, you can use a pattern like './exclude_this.gif'.
 168You can specify multiple patterns by using this argument more than once.
 169""")
 170
 171_group = upload_opts.add_mutually_exclusive_group()
 172_group.add_argument('--follow-links', action='store_true', default=True,
 173                    dest='follow_links', help="""
 174Follow file and directory symlinks (default).
 175""")
 176_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
 177                    help="""
 178Ignore file and directory symlinks. Even paths given explicitly on the
 179command line will be skipped if they are symlinks.
 180""")
 181
 182
 183run_opts = argparse.ArgumentParser(add_help=False)
 184
 185run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 186Store the collection in the specified project, instead of your Home
 187project.
 188""")
 189
 190run_opts.add_argument('--name', help="""
 191Save the collection with the specified name.
 192""")
 193
 194_group = run_opts.add_mutually_exclusive_group()
 195_group.add_argument('--progress', action='store_true',
 196                    help="""
 197Display human-readable progress on stderr (bytes and, if possible,
 198percentage of total data size). This is the default behavior when
 199stderr is a tty.
 200""")
 201
 202_group.add_argument('--no-progress', action='store_true',
 203                    help="""
 204Do not display human-readable progress on stderr, even if stderr is a
 205tty.
 206""")
 207
 208_group.add_argument('--batch-progress', action='store_true',
 209                    help="""
 210Display machine-readable progress on stderr (bytes and, if known,
 211total data size).
 212""")
 213
 214run_opts.add_argument('--silent', action='store_true',
 215                      help="""
 216Do not print any debug messages to console. (Any error messages will
 217still be displayed.)
 218""")
 219
 220run_opts.add_argument('--batch', action='store_true', default=False,
 221                      help="""
 222Retries with '--no-resume --no-cache' if cached state contains invalid/expired
 223block signatures.
 224""")
 225
 226_group = run_opts.add_mutually_exclusive_group()
 227_group.add_argument('--resume', action='store_true', default=True,
 228                    help="""
 229Continue interrupted uploads from cached state (default).
 230""")
 231_group.add_argument('--no-resume', action='store_false', dest='resume',
 232                    help="""
 233Do not continue interrupted uploads from cached state.
 234""")
 235
 236_group = run_opts.add_mutually_exclusive_group()
 237_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
 238                    help="""
 239Save upload state in a cache file for resuming (default).
 240""")
 241_group.add_argument('--no-cache', action='store_false', dest='use_cache',
 242                    help="""
 243Do not save upload state in a cache file for resuming.
 244""")
 245
 246_group = upload_opts.add_mutually_exclusive_group()
 247_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
 248                    help="""
 249Set the trash date of the resulting collection to an absolute date in the future.
 250The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
 251Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
 252""")
 253_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
 254                    help="""
 255Set the trash date of the resulting collection to an amount of days from the
 256date/time that the upload process finishes.
 257""")
 258
 259arg_parser = argparse.ArgumentParser(
 260    description='Copy data from the local filesystem to Keep.',
 261    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 262
 263def parse_arguments(arguments):
 264    args = arg_parser.parse_args(arguments)
 265
 266    if len(args.paths) == 0:
 267        args.paths = ['-']
 268
 269    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 270
 271    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
 272        arg_parser.error("""
 273    --filename argument cannot be used when storing a directory or
 274    multiple files.
 275    """)
 276
 277    # Turn on --progress by default if stderr is a tty.
 278    if (not (args.batch_progress or args.no_progress or args.silent)
 279        and os.isatty(sys.stderr.fileno())):
 280        args.progress = True
 281
 282    # Turn off --resume (default) if --no-cache is used.
 283    if not args.use_cache:
 284        args.resume = False
 285
 286    if args.paths == ['-']:
 287        if args.update_collection:
 288            arg_parser.error("""
 289    --update-collection cannot be used when reading from stdin.
 290    """)
 291        args.resume = False
 292        args.use_cache = False
 293        if not args.filename:
 294            args.filename = 'stdin'
 295
 296    # Remove possible duplicated patterns
 297    if len(args.exclude) > 0:
 298        args.exclude = list(set(args.exclude))
 299
 300    return args
 301
 302
 303class PathDoesNotExistError(Exception):
 304    pass
 305
 306
 307class CollectionUpdateError(Exception):
 308    pass
 309
 310
 311class ResumeCacheConflict(Exception):
 312    pass
 313
 314
 315class ResumeCacheInvalidError(Exception):
 316    pass
 317
 318class ArvPutArgumentConflict(Exception):
 319    pass
 320
 321
 322class ArvPutUploadIsPending(Exception):
 323    pass
 324
 325
 326class ArvPutUploadNotPending(Exception):
 327    pass
 328
 329
 330class FileUploadList(list):
 331    def __init__(self, dry_run=False):
 332        list.__init__(self)
 333        self.dry_run = dry_run
 334
 335    def append(self, other):
 336        if self.dry_run:
 337            raise ArvPutUploadIsPending()
 338        super(FileUploadList, self).append(other)
 339
 340
 341# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
 342class ArvPutLogFormatter(logging.Formatter):
 343    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
 344    err_fmtr = None
 345    request_id_informed = False
 346
 347    def __init__(self, request_id):
 348        self.err_fmtr = logging.Formatter(
 349            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
 350            arvados.log_date_format)
 351
 352    def format(self, record):
 353        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
 354            self.request_id_informed = True
 355            return self.err_fmtr.format(record)
 356        return self.std_fmtr.format(record)
 357
 358
 359class ResumeCache(object):
 360    CACHE_DIR = 'arv-put'
 361
 362    def __init__(self, file_spec):
 363        self.cache_file = open(file_spec, 'a+')
 364        self._lock_file(self.cache_file)
 365        self.filename = self.cache_file.name
 366
 367    @classmethod
 368    def make_path(cls, args):
 369        md5 = hashlib.md5()
 370        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
 371        realpaths = sorted(os.path.realpath(path) for path in args.paths)
 372        md5.update(b'\0'.join([p.encode() for p in realpaths]))
 373        if any(os.path.isdir(path) for path in realpaths):
 374            md5.update(b'-1')
 375        elif args.filename:
 376            md5.update(args.filename.encode())
 377        cache_path = Path(cls.CACHE_DIR)
 378        if len(cache_path.parts) == 1:
 379            cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
 380        else:
 381            # Note this is a noop if cache_path is absolute, which is what we want.
 382            cache_path = Path.home() / cache_path
 383            cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
 384        return str(cache_path / md5.hexdigest())
 385
 386    def _lock_file(self, fileobj):
 387        try:
 388            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
 389        except IOError:
 390            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 391
 392    def load(self):
 393        self.cache_file.seek(0)
 394        return json.load(self.cache_file)
 395
 396    def check_cache(self, api_client=None, num_retries=0):
 397        try:
 398            state = self.load()
 399            locator = None
 400            try:
 401                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
 402                    locator = state["_finished_streams"][0][1][0]
 403                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
 404                    locator = state["_current_stream_locators"][0]
 405                if locator is not None:
 406                    kc = arvados.keep.KeepClient(api_client=api_client)
 407                    kc.head(locator, num_retries=num_retries)
 408            except Exception as e:
 409                self.restart()
 410        except (ValueError):
 411            pass
 412
 413    def save(self, data):
 414        try:
 415            new_cache_fd, new_cache_name = tempfile.mkstemp(
 416                dir=os.path.dirname(self.filename))
 417            self._lock_file(new_cache_fd)
 418            new_cache = os.fdopen(new_cache_fd, 'r+')
 419            json.dump(data, new_cache)
 420            os.rename(new_cache_name, self.filename)
 421        except (IOError, OSError, ResumeCacheConflict):
 422            try:
 423                os.unlink(new_cache_name)
 424            except NameError:  # mkstemp failed.
 425                pass
 426        else:
 427            self.cache_file.close()
 428            self.cache_file = new_cache
 429
 430    def close(self):
 431        self.cache_file.close()
 432
 433    def destroy(self):
 434        try:
 435            os.unlink(self.filename)
 436        except OSError as error:
 437            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
 438                raise
 439        self.close()
 440
 441    def restart(self):
 442        self.destroy()
 443        self.__init__(self.filename)
 444
 445
 446class ArvPutUploadJob(object):
 447    CACHE_DIR = 'arv-put'
 448    EMPTY_STATE = {
 449        'manifest' : None, # Last saved manifest checkpoint
 450        'files' : {} # Previous run file list: {path : {size, mtime}}
 451    }
 452
 453    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
 454                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
 455                 ensure_unique_name=False, num_retries=None,
 456                 put_threads=None, replication_desired=None, filename=None,
 457                 update_time=60.0, update_collection=None, storage_classes=None,
 458                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
 459                 follow_links=True, exclude_paths=[], exclude_names=None,
 460                 trash_at=None):
 461        self.paths = paths
 462        self.resume = resume
 463        self.use_cache = use_cache
 464        self.batch_mode = batch_mode
 465        self.update = False
 466        self.reporter = reporter
 467        # This will set to 0 before start counting, if no special files are going
 468        # to be read.
 469        self.bytes_expected = None
 470        self.bytes_written = 0
 471        self.bytes_skipped = 0
 472        self.name = name
 473        self.owner_uuid = owner_uuid
 474        self.ensure_unique_name = ensure_unique_name
 475        self.num_retries = num_retries
 476        self.replication_desired = replication_desired
 477        self.put_threads = put_threads
 478        self.filename = filename
 479        self.storage_classes = storage_classes
 480        self._api_client = api_client
 481        self._state_lock = threading.Lock()
 482        self._state = None # Previous run state (file list & manifest)
 483        self._current_files = [] # Current run file list
 484        self._cache_file = None
 485        self._collection_lock = threading.Lock()
 486        self._remote_collection = None # Collection being updated (if asked)
 487        self._local_collection = None # Collection from previous run manifest
 488        self._file_paths = set() # Files to be updated in remote collection
 489        self._stop_checkpointer = threading.Event()
 490        self._checkpointer = threading.Thread(target=self._update_task)
 491        self._checkpointer.daemon = True
 492        self._update_task_time = update_time  # How many seconds wait between update runs
 493        self._files_to_upload = FileUploadList(dry_run=dry_run)
 494        self._upload_started = False
 495        self.logger = logger
 496        self.dry_run = dry_run
 497        self._checkpoint_before_quit = True
 498        self.follow_links = follow_links
 499        self.exclude_paths = exclude_paths
 500        self.exclude_names = exclude_names
 501        self._trash_at = trash_at
 502
 503        if self._trash_at is not None:
 504            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
 505                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
 506            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
 507                raise TypeError('provided trash_at datetime should be timezone-naive')
 508
 509        if not self.use_cache and self.resume:
 510            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 511
 512        # Check for obvious dry-run responses
 513        if self.dry_run and (not self.use_cache or not self.resume):
 514            raise ArvPutUploadIsPending()
 515
 516        # Load cached data if any and if needed
 517        self._setup_state(update_collection)
 518
 519        # Build the upload file list, excluding requested files and counting the
 520        # bytes expected to be uploaded.
 521        self._build_upload_list()
 522
 523    def _build_upload_list(self):
 524        """
 525        Scan the requested paths to count file sizes, excluding requested files
 526        and dirs and building the upload file list.
 527        """
 528        # If there aren't special files to be read, reset total bytes count to zero
 529        # to start counting.
 530        if not any([p for p in self.paths
 531                    if not (os.path.isfile(p) or os.path.isdir(p))]):
 532            self.bytes_expected = 0
 533
 534        for path in self.paths:
 535            # Test for stdin first, in case some file named '-' exist
 536            if path == '-':
 537                if self.dry_run:
 538                    raise ArvPutUploadIsPending()
 539                self._write_stdin(self.filename or 'stdin')
 540            elif not os.path.exists(path):
 541                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
 542            elif (not self.follow_links) and os.path.islink(path):
 543                self.logger.warning("Skipping symlink '{}'".format(path))
 544                continue
 545            elif os.path.isdir(path):
 546                # Use absolute paths on cache index so CWD doesn't interfere
 547                # with the caching logic.
 548                orig_path = path
 549                path = os.path.abspath(path)
 550                if orig_path[-1:] == os.sep:
 551                    # When passing a directory reference with a trailing slash,
 552                    # its contents should be uploaded directly to the
 553                    # collection's root.
 554                    prefixdir = path
 555                else:
 556                    # When passing a directory reference with no trailing slash,
 557                    # upload the directory to the collection's root.
 558                    prefixdir = os.path.dirname(path)
 559                prefixdir += os.sep
 560                for root, dirs, files in os.walk(path,
 561                                                 followlinks=self.follow_links):
 562                    root_relpath = os.path.relpath(root, path)
 563                    if root_relpath == '.':
 564                        root_relpath = ''
 565                    # Exclude files/dirs by full path matching pattern
 566                    if self.exclude_paths:
 567                        dirs[:] = [d for d in dirs
 568                                   if not any(pathname_match(
 569                                           os.path.join(root_relpath, d), pat)
 570                                              for pat in self.exclude_paths)]
 571                        files = [f for f in files
 572                                 if not any(pathname_match(
 573                                         os.path.join(root_relpath, f), pat)
 574                                            for pat in self.exclude_paths)]
 575                    # Exclude files/dirs by name matching pattern
 576                    if self.exclude_names is not None:
 577                        dirs[:] = [d for d in dirs
 578                                   if not self.exclude_names.match(d)]
 579                        files = [f for f in files
 580                                 if not self.exclude_names.match(f)]
 581                    # Make os.walk()'s dir traversing order deterministic
 582                    dirs.sort()
 583                    files.sort()
 584                    for f in files:
 585                        filepath = os.path.join(root, f)
 586                        if not os.path.isfile(filepath):
 587                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
 588                            continue
 589                        # Add its size to the total bytes count (if applicable)
 590                        if self.follow_links or (not os.path.islink(filepath)):
 591                            if self.bytes_expected is not None:
 592                                self.bytes_expected += os.path.getsize(filepath)
 593                        self._check_file(filepath,
 594                                         os.path.join(root[len(prefixdir):], f))
 595            else:
 596                filepath = os.path.abspath(path)
 597                # Add its size to the total bytes count (if applicable)
 598                if self.follow_links or (not os.path.islink(filepath)):
 599                    if self.bytes_expected is not None:
 600                        self.bytes_expected += os.path.getsize(filepath)
 601                self._check_file(filepath,
 602                                 self.filename or os.path.basename(path))
 603        # If dry-mode is on, and got up to this point, then we should notify that
 604        # there aren't any file to upload.
 605        if self.dry_run:
 606            raise ArvPutUploadNotPending()
 607        # Remove local_collection's files that don't exist locally anymore, so the
 608        # bytes_written count is correct.
 609        for f in self.collection_file_paths(self._local_collection,
 610                                            path_prefix=""):
 611            if f != 'stdin' and f != self.filename and not f in self._file_paths:
 612                self._local_collection.remove(f)
 613
 614    def start(self, save_collection):
 615        """
 616        Start supporting thread & file uploading
 617        """
 618        self._checkpointer.start()
 619        try:
 620            # Update bytes_written from current local collection and
 621            # report initial progress.
 622            self._update()
 623            # Actual file upload
 624            self._upload_started = True # Used by the update thread to start checkpointing
 625            self._upload_files()
 626        except (SystemExit, Exception) as e:
 627            self._checkpoint_before_quit = False
 628            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
 629            # Note: We're expecting SystemExit instead of
 630            # KeyboardInterrupt because we have a custom signal
 631            # handler in place that raises SystemExit with the catched
 632            # signal's code.
 633            if isinstance(e, PathDoesNotExistError):
 634                # We aren't interested in the traceback for this case
 635                pass
 636            elif not isinstance(e, SystemExit) or e.code != -2:
 637                self.logger.warning("Abnormal termination:\n{}".format(
 638                    traceback.format_exc()))
 639            raise
 640        finally:
 641            if not self.dry_run:
 642                # Stop the thread before doing anything else
 643                self._stop_checkpointer.set()
 644                self._checkpointer.join()
 645                if self._checkpoint_before_quit:
 646                    # Commit all pending blocks & one last _update()
 647                    self._local_collection.manifest_text()
 648                    self._update(final=True)
 649                    if save_collection:
 650                        self.save_collection()
 651            if self.use_cache:
 652                self._cache_file.close()
 653
 654    def _collection_trash_at(self):
 655        """
 656        Returns the trash date that the collection should use at save time.
 657        Takes into account absolute/relative trash_at values requested
 658        by the user.
 659        """
 660        if type(self._trash_at) == datetime.timedelta:
 661            # Get an absolute datetime for trash_at
 662            return datetime.datetime.utcnow() + self._trash_at
 663        return self._trash_at
 664
 665    def save_collection(self):
 666        if self.update:
 667            # Check if files should be updated on the remote collection.
 668            for fp in self._file_paths:
 669                remote_file = self._remote_collection.find(fp)
 670                if not remote_file:
 671                    # File don't exist on remote collection, copy it.
 672                    self._remote_collection.copy(fp, fp, self._local_collection)
 673                elif remote_file != self._local_collection.find(fp):
 674                    # A different file exist on remote collection, overwrite it.
 675                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
 676                else:
 677                    # The file already exist on remote collection, skip it.
 678                    pass
 679            self._remote_collection.save(num_retries=self.num_retries,
 680                                         trash_at=self._collection_trash_at())
 681        else:
 682            if len(self._local_collection) == 0:
 683                self.logger.warning("No files were uploaded, skipping collection creation.")
 684                return
 685            self._local_collection.save_new(
 686                name=self.name, owner_uuid=self.owner_uuid,
 687                ensure_unique_name=self.ensure_unique_name,
 688                num_retries=self.num_retries,
 689                trash_at=self._collection_trash_at())
 690
 691    def destroy_cache(self):
 692        if self.use_cache:
 693            try:
 694                os.unlink(self._cache_filename)
 695            except OSError as error:
 696                # That's what we wanted anyway.
 697                if error.errno != errno.ENOENT:
 698                    raise
 699            self._cache_file.close()
 700
 701    def _collection_size(self, collection):
 702        """
 703        Recursively get the total size of the collection
 704        """
 705        size = 0
 706        for item in collection.values():
 707            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
 708                size += self._collection_size(item)
 709            else:
 710                size += item.size()
 711        return size
 712
 713    def _update_task(self):
 714        """
 715        Periodically called support task. File uploading is
 716        asynchronous so we poll status from the collection.
 717        """
 718        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
 719            self._update()
 720
 721    def _update(self, final=False):
 722        """
 723        Update cached manifest text and report progress.
 724        """
 725        if self._upload_started:
 726            with self._collection_lock:
 727                self.bytes_written = self._collection_size(self._local_collection)
 728                if self.use_cache:
 729                    if final:
 730                        manifest = self._local_collection.manifest_text()
 731                    else:
 732                        # Get the manifest text without comitting pending blocks
 733                        manifest = self._local_collection.manifest_text(strip=False,
 734                                                                        normalize=False,
 735                                                                        only_committed=True)
 736                    # Update cache
 737                    with self._state_lock:
 738                        self._state['manifest'] = manifest
 739            if self.use_cache:
 740                try:
 741                    self._save_state()
 742                except Exception as e:
 743                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
 744            # Keep remote collection's trash_at attribute synced when using relative expire dates
 745            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
 746                try:
 747                    self._api_client.collections().update(
 748                        uuid=self._remote_collection.manifest_locator(),
 749                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
 750                    ).execute(num_retries=self.num_retries)
 751                except Exception as e:
 752                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
 753        else:
 754            self.bytes_written = self.bytes_skipped
 755        # Call the reporter, if any
 756        self.report_progress()
 757
 758    def report_progress(self):
 759        if self.reporter is not None:
 760            self.reporter(self.bytes_written, self.bytes_expected)
 761
 762    def _write_stdin(self, filename):
 763        output = self._local_collection.open(filename, 'wb')
 764        self._write(sys.stdin.buffer, output)
 765        output.close()
 766
 767    def _check_file(self, source, filename):
 768        """
 769        Check if this file needs to be uploaded
 770        """
 771        # Ignore symlinks when requested
 772        if (not self.follow_links) and os.path.islink(source):
 773            return
 774        resume_offset = 0
 775        should_upload = False
 776        new_file_in_cache = False
 777        # Record file path for updating the remote collection before exiting
 778        self._file_paths.add(filename)
 779
 780        with self._state_lock:
 781            # If no previous cached data on this file, store it for an eventual
 782            # repeated run.
 783            if source not in self._state['files']:
 784                self._state['files'][source] = {
 785                    'mtime': os.path.getmtime(source),
 786                    'size' : os.path.getsize(source)
 787                }
 788                new_file_in_cache = True
 789            cached_file_data = self._state['files'][source]
 790
 791        # Check if file was already uploaded (at least partially)
 792        file_in_local_collection = self._local_collection.find(filename)
 793
 794        # If not resuming, upload the full file.
 795        if not self.resume:
 796            should_upload = True
 797        # New file detected from last run, upload it.
 798        elif new_file_in_cache:
 799            should_upload = True
 800        # Local file didn't change from last run.
 801        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
 802            if not file_in_local_collection:
 803                # File not uploaded yet, upload it completely
 804                should_upload = True
 805            elif file_in_local_collection.permission_expired():
 806                # Permission token expired, re-upload file. This will change whenever
 807                # we have a API for refreshing tokens.
 808                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
 809                should_upload = True
 810                self._local_collection.remove(filename)
 811            elif cached_file_data['size'] == file_in_local_collection.size():
 812                # File already there, skip it.
 813                self.bytes_skipped += cached_file_data['size']
 814            elif cached_file_data['size'] > file_in_local_collection.size():
 815                # File partially uploaded, resume!
 816                resume_offset = file_in_local_collection.size()
 817                self.bytes_skipped += resume_offset
 818                should_upload = True
 819            else:
 820                # Inconsistent cache, re-upload the file
 821                should_upload = True
 822                self._local_collection.remove(filename)
 823                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
 824        # Local file differs from cached data, re-upload it.
 825        else:
 826            if file_in_local_collection:
 827                self._local_collection.remove(filename)
 828            should_upload = True
 829
 830        if should_upload:
 831            try:
 832                self._files_to_upload.append((source, resume_offset, filename))
 833            except ArvPutUploadIsPending:
 834                # This could happen when running on dry-mode, close cache file to
 835                # avoid locking issues.
 836                self._cache_file.close()
 837                raise
 838
 839    def _upload_files(self):
 840        for source, resume_offset, filename in self._files_to_upload:
 841            with open(source, 'rb') as source_fd:
 842                with self._state_lock:
 843                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
 844                    self._state['files'][source]['size'] = os.path.getsize(source)
 845                if resume_offset > 0:
 846                    # Start upload where we left off
 847                    output = self._local_collection.open(filename, 'ab')
 848                    source_fd.seek(resume_offset)
 849                else:
 850                    # Start from scratch
 851                    output = self._local_collection.open(filename, 'wb')
 852                self._write(source_fd, output)
 853                output.close(flush=False)
 854
 855    def _write(self, source_fd, output):
 856        while True:
 857            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
 858            if not data:
 859                break
 860            output.write(data)
 861
 862    def _my_collection(self):
 863        return self._remote_collection if self.update else self._local_collection
 864
 865    def _get_cache_filepath(self):
 866        # Set up cache file name from input paths.
 867        md5 = hashlib.md5()
 868        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
 869        realpaths = sorted(os.path.realpath(path) for path in self.paths)
 870        md5.update(b'\0'.join([p.encode() for p in realpaths]))
 871        if self.filename:
 872            md5.update(self.filename.encode())
 873        cache_path = Path(self.CACHE_DIR)
 874        if len(cache_path.parts) == 1:
 875            cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
 876        else:
 877            # Note this is a noop if cache_path is absolute, which is what we want.
 878            cache_path = Path.home() / cache_path
 879            cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
 880        return str(cache_path / md5.hexdigest())
 881
 882    def _setup_state(self, update_collection):
 883        """
 884        Create a new cache file or load a previously existing one.
 885        """
 886        # Load an already existing collection for update
 887        if update_collection and re.match(arvados.util.collection_uuid_pattern,
 888                                          update_collection):
 889            try:
 890                self._remote_collection = arvados.collection.Collection(
 891                    update_collection,
 892                    api_client=self._api_client,
 893                    storage_classes_desired=self.storage_classes,
 894                    num_retries=self.num_retries)
 895            except arvados.errors.ApiError as error:
 896                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
 897            else:
 898                self.update = True
 899        elif update_collection:
 900            # Collection locator provided, but unknown format
 901            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 902
 903        if self.use_cache:
 904            cache_filepath = self._get_cache_filepath()
 905            if self.resume and os.path.exists(cache_filepath):
 906                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
 907                self._cache_file = open(cache_filepath, 'a+')
 908            else:
 909                # --no-resume means start with a empty cache file.
 910                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
 911                self._cache_file = open(cache_filepath, 'w+')
 912            self._cache_filename = self._cache_file.name
 913            self._lock_file(self._cache_file)
 914            self._cache_file.seek(0)
 915
 916        with self._state_lock:
 917            if self.use_cache:
 918                try:
 919                    self._state = json.load(self._cache_file)
 920                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
 921                        # Cache at least partially incomplete, set up new cache
 922                        self._state = copy.deepcopy(self.EMPTY_STATE)
 923                except ValueError:
 924                    # Cache file empty, set up new cache
 925                    self._state = copy.deepcopy(self.EMPTY_STATE)
 926            else:
 927                self.logger.info("No cache usage requested for this run.")
 928                # No cache file, set empty state
 929                self._state = copy.deepcopy(self.EMPTY_STATE)
 930            if not self._cached_manifest_valid():
 931                if not self.batch_mode:
 932                    raise ResumeCacheInvalidError()
 933                else:
 934                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
 935                    self.use_cache = False # Don't overwrite preexisting cache file.
 936                    self._state = copy.deepcopy(self.EMPTY_STATE)
 937            # Load the previous manifest so we can check if files were modified remotely.
 938            self._local_collection = arvados.collection.Collection(
 939                self._state['manifest'],
 940                replication_desired=self.replication_desired,
 941                storage_classes_desired=self.storage_classes,
 942                put_threads=self.put_threads,
 943                api_client=self._api_client,
 944                num_retries=self.num_retries)
 945
 946    def _cached_manifest_valid(self):
 947        """
 948        Validate the oldest non-expired block signature to check if cached manifest
 949        is usable: checking if the cached manifest was not created with a different
 950        arvados account.
 951        """
 952        if self._state.get('manifest', None) is None:
 953            # No cached manifest yet, all good.
 954            return True
 955        now = datetime.datetime.utcnow()
 956        oldest_exp = None
 957        oldest_loc = None
 958        block_found = False
 959        for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']):
 960            loc = m.group(0)
 961            try:
 962                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
 963            except IndexError:
 964                # Locator without signature
 965                continue
 966            block_found = True
 967            if exp > now and (oldest_exp is None or exp < oldest_exp):
 968                oldest_exp = exp
 969                oldest_loc = loc
 970        if not block_found:
 971            # No block signatures found => no invalid block signatures.
 972            return True
 973        if oldest_loc is None:
 974            # Locator signatures found, but all have expired.
 975            # Reset the cache and move on.
 976            self.logger.info('Cache expired, starting from scratch.')
 977            self._state['manifest'] = ''
 978            return True
 979        kc = arvados.KeepClient(api_client=self._api_client,
 980                                num_retries=self.num_retries)
 981        try:
 982            kc.head(oldest_loc)
 983        except arvados.errors.KeepRequestError:
 984            # Something is wrong, cached manifest is not valid.
 985            return False
 986        return True
 987
 988    def collection_file_paths(self, col, path_prefix='.'):
 989        """Return a list of file paths by recursively go through the entire collection `col`"""
 990        file_paths = []
 991        for name, item in col.items():
 992            if isinstance(item, arvados.arvfile.ArvadosFile):
 993                file_paths.append(os.path.join(path_prefix, name))
 994            elif isinstance(item, arvados.collection.Subcollection):
 995                new_prefix = os.path.join(path_prefix, name)
 996                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
 997        return file_paths
 998
 999    def _lock_file(self, fileobj):
1000        try:
1001            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
1002        except IOError:
1003            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
1004
1005    def _save_state(self):
1006        """
1007        Atomically save current state into cache.
1008        """
1009        with self._state_lock:
1010            # We're not using copy.deepcopy() here because it's a lot slower
1011            # than json.dumps(), and we're already needing JSON format to be
1012            # saved on disk.
1013            state = json.dumps(self._state)
1014        try:
1015            new_cache = tempfile.NamedTemporaryFile(
1016                mode='w+',
1017                dir=os.path.dirname(self._cache_filename), delete=False)
1018            self._lock_file(new_cache)
1019            new_cache.write(state)
1020            new_cache.flush()
1021            os.fsync(new_cache)
1022            os.rename(new_cache.name, self._cache_filename)
1023        except (IOError, OSError, ResumeCacheConflict) as error:
1024            self.logger.error("There was a problem while saving the cache file: {}".format(error))
1025            try:
1026                os.unlink(new_cache_name)
1027            except NameError:  # mkstemp failed.
1028                pass
1029        else:
1030            self._cache_file.close()
1031            self._cache_file = new_cache
1032
1033    def collection_name(self):
1034        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1035
1036    def collection_trash_at(self):
1037        return self._my_collection().get_trash_at()
1038
1039    def manifest_locator(self):
1040        return self._my_collection().manifest_locator()
1041
1042    def portable_data_hash(self):
1043        pdh = self._my_collection().portable_data_hash()
1044        m = self._my_collection().stripped_manifest().encode()
1045        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1046        if pdh != local_pdh:
1047            self.logger.warning("\n".join([
1048                "arv-put: API server provided PDH differs from local manifest.",
1049                "         This should not happen; showing API server version."]))
1050        return pdh
1051
1052    def manifest_text(self, stream_name=".", strip=False, normalize=False):
1053        return self._my_collection().manifest_text(stream_name, strip, normalize)
1054
1055    def _datablocks_on_item(self, item):
1056        """
1057        Return a list of datablock locators, recursively navigating
1058        through subcollections
1059        """
1060        if isinstance(item, arvados.arvfile.ArvadosFile):
1061            if item.size() == 0:
1062                # Empty file locator
1063                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1064            else:
1065                locators = []
1066                for segment in item.segments():
1067                    loc = segment.locator
1068                    locators.append(loc)
1069                return locators
1070        elif isinstance(item, arvados.collection.Collection):
1071            l = [self._datablocks_on_item(x) for x in item.values()]
1072            # Fast list flattener method taken from:
1073            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1074            return [loc for sublist in l for loc in sublist]
1075        else:
1076            return None
1077
1078    def data_locators(self):
1079        with self._collection_lock:
1080            # Make sure all datablocks are flushed before getting the locators
1081            self._my_collection().manifest_text()
1082            datablocks = self._datablocks_on_item(self._my_collection())
1083        return datablocks
1084
1085_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1086                                                            os.getpid())
1087
1088# Simulate glob.glob() matching behavior without the need to scan the filesystem
1089# Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1090# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1091# so instead we're using it on every path component.
1092def pathname_match(pathname, pattern):
1093    name = pathname.split(os.sep)
1094    # Fix patterns like 'some/subdir/' or 'some//subdir'
1095    pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1096    if len(name) != len(pat):
1097        return False
1098    for i in range(len(name)):
1099        if not fnmatch.fnmatch(name[i], pat[i]):
1100            return False
1101    return True
1102
1103def machine_progress(bytes_written, bytes_expected):
1104    return _machine_format.format(
1105        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1106
1107def human_progress(bytes_written, bytes_expected):
1108    if bytes_expected:
1109        return "\r{}M / {}M {:.1%} ".format(
1110            bytes_written >> 20, bytes_expected >> 20,
1111            float(bytes_written) / bytes_expected)
1112    else:
1113        return "\r{} ".format(bytes_written)
1114
1115def progress_writer(progress_func, outfile=sys.stderr):
1116    def write_progress(bytes_written, bytes_expected):
1117        outfile.write(progress_func(bytes_written, bytes_expected))
1118    return write_progress
1119
1120def desired_project_uuid(api_client, project_uuid, num_retries):
1121    if not project_uuid:
1122        query = api_client.users().current()
1123    elif arvados.util.user_uuid_pattern.match(project_uuid):
1124        query = api_client.users().get(uuid=project_uuid)
1125    elif arvados.util.group_uuid_pattern.match(project_uuid):
1126        query = api_client.groups().get(uuid=project_uuid)
1127    else:
1128        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1129    return query.execute(num_retries=num_retries)['uuid']
1130
1131def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1132         install_sig_handlers=True):
1133    global api_client
1134
1135    args = parse_arguments(arguments)
1136    logger = logging.getLogger('arvados.arv_put')
1137    if args.silent:
1138        logger.setLevel(logging.WARNING)
1139    else:
1140        logger.setLevel(logging.INFO)
1141    status = 0
1142
1143    request_id = arvados.util.new_request_id()
1144
1145    formatter = ArvPutLogFormatter(request_id)
1146    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1147
1148    if api_client is None:
1149        api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1150
1151    if install_sig_handlers:
1152        arv_cmd.install_signal_handlers()
1153
1154    # Trash arguments validation
1155    trash_at = None
1156    if args.trash_at is not None:
1157        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1158        # make sure the user provides a complete YYYY-MM-DD date.
1159        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1160            logger.error("--trash-at argument format invalid, use --help to see examples.")
1161            sys.exit(1)
1162        # Check if no time information was provided. In that case, assume end-of-day.
1163        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1164            args.trash_at += 'T23:59:59'
1165        try:
1166            trash_at = ciso8601.parse_datetime(args.trash_at)
1167        except:
1168            logger.error("--trash-at argument format invalid, use --help to see examples.")
1169            sys.exit(1)
1170        else:
1171            if trash_at.tzinfo is not None:
1172                # Timezone aware datetime provided.
1173                utcoffset = -trash_at.utcoffset()
1174            else:
1175                # Timezone naive datetime provided. Assume is local.
1176                if time.daylight:
1177                    utcoffset = datetime.timedelta(seconds=time.altzone)
1178                else:
1179                    utcoffset = datetime.timedelta(seconds=time.timezone)
1180            # Convert to UTC timezone naive datetime.
1181            trash_at = trash_at.replace(tzinfo=None) + utcoffset
1182
1183        if trash_at <= datetime.datetime.utcnow():
1184            logger.error("--trash-at argument must be set in the future")
1185            sys.exit(1)
1186    if args.trash_after is not None:
1187        if args.trash_after < 1:
1188            logger.error("--trash-after argument must be >= 1")
1189            sys.exit(1)
1190        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1191
1192    # Determine the name to use
1193    if args.name:
1194        if args.stream or args.raw:
1195            logger.error("Cannot use --name with --stream or --raw")
1196            sys.exit(1)
1197        elif args.update_collection:
1198            logger.error("Cannot use --name with --update-collection")
1199            sys.exit(1)
1200        collection_name = args.name
1201    else:
1202        collection_name = "Saved at {} by {}@{}".format(
1203            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1204            pwd.getpwuid(os.getuid()).pw_name,
1205            socket.gethostname())
1206
1207    if args.project_uuid and (args.stream or args.raw):
1208        logger.error("Cannot use --project-uuid with --stream or --raw")
1209        sys.exit(1)
1210
1211    # Determine the parent project
1212    try:
1213        project_uuid = desired_project_uuid(api_client, args.project_uuid,
1214                                            args.retries)
1215    except (apiclient_errors.Error, ValueError) as error:
1216        logger.error(error)
1217        sys.exit(1)
1218
1219    if args.progress:
1220        reporter = progress_writer(human_progress)
1221    elif args.batch_progress:
1222        reporter = progress_writer(machine_progress)
1223    else:
1224        reporter = None
1225
1226    # Setup exclude regex from all the --exclude arguments provided
1227    name_patterns = []
1228    exclude_paths = []
1229    exclude_names = None
1230    if len(args.exclude) > 0:
1231        # We're supporting 2 kinds of exclusion patterns:
1232        # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1233        #                            the name, wherever the file is on the tree)
1234        # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1235        #                            entire path, and should be relative to
1236        #                            any input dir argument)
1237        # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1238        #                            placed directly underneath the input dir)
1239        for p in args.exclude:
1240            # Only relative paths patterns allowed
1241            if p.startswith(os.sep):
1242                logger.error("Cannot use absolute paths with --exclude")
1243                sys.exit(1)
1244            if os.path.dirname(p):
1245                # We don't support of path patterns with '..'
1246                p_parts = p.split(os.sep)
1247                if '..' in p_parts:
1248                    logger.error(
1249                        "Cannot use path patterns that include or '..'")
1250                    sys.exit(1)
1251                # Path search pattern
1252                exclude_paths.append(p)
1253            else:
1254                # Name-only search pattern
1255                name_patterns.append(p)
1256        # For name only matching, we can combine all patterns into a single
1257        # regexp, for better performance.
1258        exclude_names = re.compile('|'.join(
1259            [fnmatch.translate(p) for p in name_patterns]
1260        )) if len(name_patterns) > 0 else None
1261        # Show the user the patterns to be used, just in case they weren't
1262        # specified inside quotes and got changed by the shell expansion.
1263        logger.info("Exclude patterns: {}".format(args.exclude))
1264
1265    # If this is used by a human, and there's at least one directory to be
1266    # uploaded, the expected bytes calculation can take a moment.
1267    if args.progress and any([os.path.isdir(f) for f in args.paths]):
1268        logger.info("Calculating upload size, this could take some time...")
1269    try:
1270        writer = ArvPutUploadJob(paths = args.paths,
1271                                 resume = args.resume,
1272                                 use_cache = args.use_cache,
1273                                 batch_mode= args.batch,
1274                                 filename = args.filename,
1275                                 reporter = reporter,
1276                                 api_client = api_client,
1277                                 num_retries = args.retries,
1278                                 replication_desired = args.replication,
1279                                 put_threads = args.threads,
1280                                 name = collection_name,
1281                                 owner_uuid = project_uuid,
1282                                 ensure_unique_name = True,
1283                                 update_collection = args.update_collection,
1284                                 storage_classes=args.storage_classes,
1285                                 logger=logger,
1286                                 dry_run=args.dry_run,
1287                                 follow_links=args.follow_links,
1288                                 exclude_paths=exclude_paths,
1289                                 exclude_names=exclude_names,
1290                                 trash_at=trash_at)
1291    except ResumeCacheConflict:
1292        logger.error("\n".join([
1293            "arv-put: Another process is already uploading this data.",
1294            "         Use --no-cache if this is really what you want."]))
1295        sys.exit(1)
1296    except ResumeCacheInvalidError:
1297        logger.error("\n".join([
1298            "arv-put: Resume cache contains invalid signature: it may have expired",
1299            "         or been created with another Arvados user's credentials.",
1300            "         Switch user or use one of the following options to restart upload:",
1301            "         --no-resume to start a new resume cache.",
1302            "         --no-cache to disable resume cache.",
1303            "         --batch to ignore the resume cache if invalid."]))
1304        sys.exit(1)
1305    except (CollectionUpdateError, PathDoesNotExistError) as error:
1306        logger.error("\n".join([
1307            "arv-put: %s" % str(error)]))
1308        sys.exit(1)
1309    except ArvPutUploadIsPending:
1310        # Dry run check successful, return proper exit code.
1311        sys.exit(2)
1312    except ArvPutUploadNotPending:
1313        # No files pending for upload
1314        sys.exit(0)
1315
1316    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1317        logger.warning("\n".join([
1318            "arv-put: Resuming previous upload from last checkpoint.",
1319            "         Use the --no-resume option to start over."]))
1320
1321    if not args.dry_run:
1322        writer.report_progress()
1323    output = None
1324    try:
1325        writer.start(save_collection=not(args.stream or args.raw))
1326    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1327        logger.error("\n".join([
1328            "arv-put: %s" % str(error)]))
1329        sys.exit(1)
1330
1331    if args.progress:  # Print newline to split stderr from stdout for humans.
1332        logger.info("\n")
1333
1334    if args.stream:
1335        if args.normalize:
1336            output = writer.manifest_text(normalize=True)
1337        else:
1338            output = writer.manifest_text()
1339    elif args.raw:
1340        output = ','.join(writer.data_locators())
1341    elif writer.manifest_locator() is not None:
1342        try:
1343            expiration_notice = ""
1344            if writer.collection_trash_at() is not None:
1345                # Get the local timezone-naive version, and log it with timezone information.
1346                if time.daylight:
1347                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1348                else:
1349                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1350                expiration_notice = ". It will expire on {} {}.".format(
1351                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1352            if args.update_collection:
1353                logger.info(u"Collection updated: '{}'{}".format(
1354                    writer.collection_name(), expiration_notice))
1355            else:
1356                logger.info(u"Collection saved as '{}'{}".format(
1357                    writer.collection_name(), expiration_notice))
1358            if args.portable_data_hash:
1359                output = writer.portable_data_hash()
1360            else:
1361                output = writer.manifest_locator()
1362        except apiclient_errors.Error as error:
1363            logger.error(
1364                "arv-put: Error creating Collection on project: {}.".format(
1365                    error))
1366            status = 1
1367    else:
1368        status = 1
1369
1370    # Print the locator (uuid) of the new collection.
1371    if output is None:
1372        status = status or 1
1373    elif not args.silent:
1374        stdout.write(output)
1375        if not output.endswith('\n'):
1376            stdout.write('\n')
1377
1378    if install_sig_handlers:
1379        arv_cmd.restore_signal_handlers()
1380
1381    if status != 0:
1382        sys.exit(status)
1383
1384    # Success!
1385    return output
1386
1387
1388if __name__ == '__main__':
1389    main()
api_client = None
upload_opts = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description=None, formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=False)
run_opts = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description=None, formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=False)
arg_parser = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description='Copy data from the local filesystem to Keep.', formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=True)
def parse_arguments(arguments):
264def parse_arguments(arguments):
265    args = arg_parser.parse_args(arguments)
266
267    if len(args.paths) == 0:
268        args.paths = ['-']
269
270    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
271
272    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
273        arg_parser.error("""
274    --filename argument cannot be used when storing a directory or
275    multiple files.
276    """)
277
278    # Turn on --progress by default if stderr is a tty.
279    if (not (args.batch_progress or args.no_progress or args.silent)
280        and os.isatty(sys.stderr.fileno())):
281        args.progress = True
282
283    # Turn off --resume (default) if --no-cache is used.
284    if not args.use_cache:
285        args.resume = False
286
287    if args.paths == ['-']:
288        if args.update_collection:
289            arg_parser.error("""
290    --update-collection cannot be used when reading from stdin.
291    """)
292        args.resume = False
293        args.use_cache = False
294        if not args.filename:
295            args.filename = 'stdin'
296
297    # Remove possible duplicated patterns
298    if len(args.exclude) > 0:
299        args.exclude = list(set(args.exclude))
300
301    return args
class PathDoesNotExistError(builtins.Exception):
304class PathDoesNotExistError(Exception):
305    pass

Common base class for all non-exit exceptions.

class CollectionUpdateError(builtins.Exception):
308class CollectionUpdateError(Exception):
309    pass

Common base class for all non-exit exceptions.

class ResumeCacheConflict(builtins.Exception):
312class ResumeCacheConflict(Exception):
313    pass

Common base class for all non-exit exceptions.

class ResumeCacheInvalidError(builtins.Exception):
316class ResumeCacheInvalidError(Exception):
317    pass

Common base class for all non-exit exceptions.

class ArvPutArgumentConflict(builtins.Exception):
319class ArvPutArgumentConflict(Exception):
320    pass

Common base class for all non-exit exceptions.

class ArvPutUploadIsPending(builtins.Exception):
323class ArvPutUploadIsPending(Exception):
324    pass

Common base class for all non-exit exceptions.

class ArvPutUploadNotPending(builtins.Exception):
327class ArvPutUploadNotPending(Exception):
328    pass

Common base class for all non-exit exceptions.

class FileUploadList(builtins.list):
331class FileUploadList(list):
332    def __init__(self, dry_run=False):
333        list.__init__(self)
334        self.dry_run = dry_run
335
336    def append(self, other):
337        if self.dry_run:
338            raise ArvPutUploadIsPending()
339        super(FileUploadList, self).append(other)

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

FileUploadList(dry_run=False)
332    def __init__(self, dry_run=False):
333        list.__init__(self)
334        self.dry_run = dry_run
dry_run
def append(self, other):
336    def append(self, other):
337        if self.dry_run:
338            raise ArvPutUploadIsPending()
339        super(FileUploadList, self).append(other)

Append object to the end of the list.

class ArvPutLogFormatter(logging.Formatter):
343class ArvPutLogFormatter(logging.Formatter):
344    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
345    err_fmtr = None
346    request_id_informed = False
347
348    def __init__(self, request_id):
349        self.err_fmtr = logging.Formatter(
350            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
351            arvados.log_date_format)
352
353    def format(self, record):
354        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
355            self.request_id_informed = True
356            return self.err_fmtr.format(record)
357        return self.std_fmtr.format(record)

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, “%(message)s”, “{message}”, or “${message}”, is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user’s message and arguments are pre- formatted into a LogRecord’s message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG”, “INFO”, “WARNING”, “ERROR”, “CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

ArvPutLogFormatter(request_id)
348    def __init__(self, request_id):
349        self.err_fmtr = logging.Formatter(
350            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
351            arvados.log_date_format)

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

std_fmtr = <logging.Formatter object>
err_fmtr = None
request_id_informed = False
def format(self, record):
353    def format(self, record):
354        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
355            self.request_id_informed = True
356            return self.err_fmtr.format(record)
357        return self.std_fmtr.format(record)

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class ResumeCache:
360class ResumeCache(object):
361    CACHE_DIR = 'arv-put'
362
363    def __init__(self, file_spec):
364        self.cache_file = open(file_spec, 'a+')
365        self._lock_file(self.cache_file)
366        self.filename = self.cache_file.name
367
368    @classmethod
369    def make_path(cls, args):
370        md5 = hashlib.md5()
371        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
372        realpaths = sorted(os.path.realpath(path) for path in args.paths)
373        md5.update(b'\0'.join([p.encode() for p in realpaths]))
374        if any(os.path.isdir(path) for path in realpaths):
375            md5.update(b'-1')
376        elif args.filename:
377            md5.update(args.filename.encode())
378        cache_path = Path(cls.CACHE_DIR)
379        if len(cache_path.parts) == 1:
380            cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
381        else:
382            # Note this is a noop if cache_path is absolute, which is what we want.
383            cache_path = Path.home() / cache_path
384            cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
385        return str(cache_path / md5.hexdigest())
386
387    def _lock_file(self, fileobj):
388        try:
389            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
390        except IOError:
391            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
392
393    def load(self):
394        self.cache_file.seek(0)
395        return json.load(self.cache_file)
396
397    def check_cache(self, api_client=None, num_retries=0):
398        try:
399            state = self.load()
400            locator = None
401            try:
402                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
403                    locator = state["_finished_streams"][0][1][0]
404                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
405                    locator = state["_current_stream_locators"][0]
406                if locator is not None:
407                    kc = arvados.keep.KeepClient(api_client=api_client)
408                    kc.head(locator, num_retries=num_retries)
409            except Exception as e:
410                self.restart()
411        except (ValueError):
412            pass
413
414    def save(self, data):
415        try:
416            new_cache_fd, new_cache_name = tempfile.mkstemp(
417                dir=os.path.dirname(self.filename))
418            self._lock_file(new_cache_fd)
419            new_cache = os.fdopen(new_cache_fd, 'r+')
420            json.dump(data, new_cache)
421            os.rename(new_cache_name, self.filename)
422        except (IOError, OSError, ResumeCacheConflict):
423            try:
424                os.unlink(new_cache_name)
425            except NameError:  # mkstemp failed.
426                pass
427        else:
428            self.cache_file.close()
429            self.cache_file = new_cache
430
431    def close(self):
432        self.cache_file.close()
433
434    def destroy(self):
435        try:
436            os.unlink(self.filename)
437        except OSError as error:
438            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
439                raise
440        self.close()
441
442    def restart(self):
443        self.destroy()
444        self.__init__(self.filename)
ResumeCache(file_spec)
363    def __init__(self, file_spec):
364        self.cache_file = open(file_spec, 'a+')
365        self._lock_file(self.cache_file)
366        self.filename = self.cache_file.name
CACHE_DIR = 'arv-put'
cache_file
filename
@classmethod
def make_path(cls, args):
368    @classmethod
369    def make_path(cls, args):
370        md5 = hashlib.md5()
371        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
372        realpaths = sorted(os.path.realpath(path) for path in args.paths)
373        md5.update(b'\0'.join([p.encode() for p in realpaths]))
374        if any(os.path.isdir(path) for path in realpaths):
375            md5.update(b'-1')
376        elif args.filename:
377            md5.update(args.filename.encode())
378        cache_path = Path(cls.CACHE_DIR)
379        if len(cache_path.parts) == 1:
380            cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
381        else:
382            # Note this is a noop if cache_path is absolute, which is what we want.
383            cache_path = Path.home() / cache_path
384            cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
385        return str(cache_path / md5.hexdigest())
def load(self):
393    def load(self):
394        self.cache_file.seek(0)
395        return json.load(self.cache_file)
def check_cache(self, api_client=None, num_retries=0):
397    def check_cache(self, api_client=None, num_retries=0):
398        try:
399            state = self.load()
400            locator = None
401            try:
402                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
403                    locator = state["_finished_streams"][0][1][0]
404                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
405                    locator = state["_current_stream_locators"][0]
406                if locator is not None:
407                    kc = arvados.keep.KeepClient(api_client=api_client)
408                    kc.head(locator, num_retries=num_retries)
409            except Exception as e:
410                self.restart()
411        except (ValueError):
412            pass
def save(self, data):
414    def save(self, data):
415        try:
416            new_cache_fd, new_cache_name = tempfile.mkstemp(
417                dir=os.path.dirname(self.filename))
418            self._lock_file(new_cache_fd)
419            new_cache = os.fdopen(new_cache_fd, 'r+')
420            json.dump(data, new_cache)
421            os.rename(new_cache_name, self.filename)
422        except (IOError, OSError, ResumeCacheConflict):
423            try:
424                os.unlink(new_cache_name)
425            except NameError:  # mkstemp failed.
426                pass
427        else:
428            self.cache_file.close()
429            self.cache_file = new_cache
def close(self):
431    def close(self):
432        self.cache_file.close()
def destroy(self):
434    def destroy(self):
435        try:
436            os.unlink(self.filename)
437        except OSError as error:
438            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
439                raise
440        self.close()
def restart(self):
442    def restart(self):
443        self.destroy()
444        self.__init__(self.filename)
class ArvPutUploadJob:
 447class ArvPutUploadJob(object):
 448    CACHE_DIR = 'arv-put'
 449    EMPTY_STATE = {
 450        'manifest' : None, # Last saved manifest checkpoint
 451        'files' : {} # Previous run file list: {path : {size, mtime}}
 452    }
 453
 454    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
 455                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
 456                 ensure_unique_name=False, num_retries=None,
 457                 put_threads=None, replication_desired=None, filename=None,
 458                 update_time=60.0, update_collection=None, storage_classes=None,
 459                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
 460                 follow_links=True, exclude_paths=[], exclude_names=None,
 461                 trash_at=None):
 462        self.paths = paths
 463        self.resume = resume
 464        self.use_cache = use_cache
 465        self.batch_mode = batch_mode
 466        self.update = False
 467        self.reporter = reporter
 468        # This will set to 0 before start counting, if no special files are going
 469        # to be read.
 470        self.bytes_expected = None
 471        self.bytes_written = 0
 472        self.bytes_skipped = 0
 473        self.name = name
 474        self.owner_uuid = owner_uuid
 475        self.ensure_unique_name = ensure_unique_name
 476        self.num_retries = num_retries
 477        self.replication_desired = replication_desired
 478        self.put_threads = put_threads
 479        self.filename = filename
 480        self.storage_classes = storage_classes
 481        self._api_client = api_client
 482        self._state_lock = threading.Lock()
 483        self._state = None # Previous run state (file list & manifest)
 484        self._current_files = [] # Current run file list
 485        self._cache_file = None
 486        self._collection_lock = threading.Lock()
 487        self._remote_collection = None # Collection being updated (if asked)
 488        self._local_collection = None # Collection from previous run manifest
 489        self._file_paths = set() # Files to be updated in remote collection
 490        self._stop_checkpointer = threading.Event()
 491        self._checkpointer = threading.Thread(target=self._update_task)
 492        self._checkpointer.daemon = True
 493        self._update_task_time = update_time  # How many seconds wait between update runs
 494        self._files_to_upload = FileUploadList(dry_run=dry_run)
 495        self._upload_started = False
 496        self.logger = logger
 497        self.dry_run = dry_run
 498        self._checkpoint_before_quit = True
 499        self.follow_links = follow_links
 500        self.exclude_paths = exclude_paths
 501        self.exclude_names = exclude_names
 502        self._trash_at = trash_at
 503
 504        if self._trash_at is not None:
 505            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
 506                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
 507            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
 508                raise TypeError('provided trash_at datetime should be timezone-naive')
 509
 510        if not self.use_cache and self.resume:
 511            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 512
 513        # Check for obvious dry-run responses
 514        if self.dry_run and (not self.use_cache or not self.resume):
 515            raise ArvPutUploadIsPending()
 516
 517        # Load cached data if any and if needed
 518        self._setup_state(update_collection)
 519
 520        # Build the upload file list, excluding requested files and counting the
 521        # bytes expected to be uploaded.
 522        self._build_upload_list()
 523
 524    def _build_upload_list(self):
 525        """
 526        Scan the requested paths to count file sizes, excluding requested files
 527        and dirs and building the upload file list.
 528        """
 529        # If there aren't special files to be read, reset total bytes count to zero
 530        # to start counting.
 531        if not any([p for p in self.paths
 532                    if not (os.path.isfile(p) or os.path.isdir(p))]):
 533            self.bytes_expected = 0
 534
 535        for path in self.paths:
 536            # Test for stdin first, in case some file named '-' exist
 537            if path == '-':
 538                if self.dry_run:
 539                    raise ArvPutUploadIsPending()
 540                self._write_stdin(self.filename or 'stdin')
 541            elif not os.path.exists(path):
 542                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
 543            elif (not self.follow_links) and os.path.islink(path):
 544                self.logger.warning("Skipping symlink '{}'".format(path))
 545                continue
 546            elif os.path.isdir(path):
 547                # Use absolute paths on cache index so CWD doesn't interfere
 548                # with the caching logic.
 549                orig_path = path
 550                path = os.path.abspath(path)
 551                if orig_path[-1:] == os.sep:
 552                    # When passing a directory reference with a trailing slash,
 553                    # its contents should be uploaded directly to the
 554                    # collection's root.
 555                    prefixdir = path
 556                else:
 557                    # When passing a directory reference with no trailing slash,
 558                    # upload the directory to the collection's root.
 559                    prefixdir = os.path.dirname(path)
 560                prefixdir += os.sep
 561                for root, dirs, files in os.walk(path,
 562                                                 followlinks=self.follow_links):
 563                    root_relpath = os.path.relpath(root, path)
 564                    if root_relpath == '.':
 565                        root_relpath = ''
 566                    # Exclude files/dirs by full path matching pattern
 567                    if self.exclude_paths:
 568                        dirs[:] = [d for d in dirs
 569                                   if not any(pathname_match(
 570                                           os.path.join(root_relpath, d), pat)
 571                                              for pat in self.exclude_paths)]
 572                        files = [f for f in files
 573                                 if not any(pathname_match(
 574                                         os.path.join(root_relpath, f), pat)
 575                                            for pat in self.exclude_paths)]
 576                    # Exclude files/dirs by name matching pattern
 577                    if self.exclude_names is not None:
 578                        dirs[:] = [d for d in dirs
 579                                   if not self.exclude_names.match(d)]
 580                        files = [f for f in files
 581                                 if not self.exclude_names.match(f)]
 582                    # Make os.walk()'s dir traversing order deterministic
 583                    dirs.sort()
 584                    files.sort()
 585                    for f in files:
 586                        filepath = os.path.join(root, f)
 587                        if not os.path.isfile(filepath):
 588                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
 589                            continue
 590                        # Add its size to the total bytes count (if applicable)
 591                        if self.follow_links or (not os.path.islink(filepath)):
 592                            if self.bytes_expected is not None:
 593                                self.bytes_expected += os.path.getsize(filepath)
 594                        self._check_file(filepath,
 595                                         os.path.join(root[len(prefixdir):], f))
 596            else:
 597                filepath = os.path.abspath(path)
 598                # Add its size to the total bytes count (if applicable)
 599                if self.follow_links or (not os.path.islink(filepath)):
 600                    if self.bytes_expected is not None:
 601                        self.bytes_expected += os.path.getsize(filepath)
 602                self._check_file(filepath,
 603                                 self.filename or os.path.basename(path))
 604        # If dry-mode is on, and got up to this point, then we should notify that
 605        # there aren't any file to upload.
 606        if self.dry_run:
 607            raise ArvPutUploadNotPending()
 608        # Remove local_collection's files that don't exist locally anymore, so the
 609        # bytes_written count is correct.
 610        for f in self.collection_file_paths(self._local_collection,
 611                                            path_prefix=""):
 612            if f != 'stdin' and f != self.filename and not f in self._file_paths:
 613                self._local_collection.remove(f)
 614
 615    def start(self, save_collection):
 616        """
 617        Start supporting thread & file uploading
 618        """
 619        self._checkpointer.start()
 620        try:
 621            # Update bytes_written from current local collection and
 622            # report initial progress.
 623            self._update()
 624            # Actual file upload
 625            self._upload_started = True # Used by the update thread to start checkpointing
 626            self._upload_files()
 627        except (SystemExit, Exception) as e:
 628            self._checkpoint_before_quit = False
 629            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
 630            # Note: We're expecting SystemExit instead of
 631            # KeyboardInterrupt because we have a custom signal
 632            # handler in place that raises SystemExit with the catched
 633            # signal's code.
 634            if isinstance(e, PathDoesNotExistError):
 635                # We aren't interested in the traceback for this case
 636                pass
 637            elif not isinstance(e, SystemExit) or e.code != -2:
 638                self.logger.warning("Abnormal termination:\n{}".format(
 639                    traceback.format_exc()))
 640            raise
 641        finally:
 642            if not self.dry_run:
 643                # Stop the thread before doing anything else
 644                self._stop_checkpointer.set()
 645                self._checkpointer.join()
 646                if self._checkpoint_before_quit:
 647                    # Commit all pending blocks & one last _update()
 648                    self._local_collection.manifest_text()
 649                    self._update(final=True)
 650                    if save_collection:
 651                        self.save_collection()
 652            if self.use_cache:
 653                self._cache_file.close()
 654
 655    def _collection_trash_at(self):
 656        """
 657        Returns the trash date that the collection should use at save time.
 658        Takes into account absolute/relative trash_at values requested
 659        by the user.
 660        """
 661        if type(self._trash_at) == datetime.timedelta:
 662            # Get an absolute datetime for trash_at
 663            return datetime.datetime.utcnow() + self._trash_at
 664        return self._trash_at
 665
 666    def save_collection(self):
 667        if self.update:
 668            # Check if files should be updated on the remote collection.
 669            for fp in self._file_paths:
 670                remote_file = self._remote_collection.find(fp)
 671                if not remote_file:
 672                    # File don't exist on remote collection, copy it.
 673                    self._remote_collection.copy(fp, fp, self._local_collection)
 674                elif remote_file != self._local_collection.find(fp):
 675                    # A different file exist on remote collection, overwrite it.
 676                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
 677                else:
 678                    # The file already exist on remote collection, skip it.
 679                    pass
 680            self._remote_collection.save(num_retries=self.num_retries,
 681                                         trash_at=self._collection_trash_at())
 682        else:
 683            if len(self._local_collection) == 0:
 684                self.logger.warning("No files were uploaded, skipping collection creation.")
 685                return
 686            self._local_collection.save_new(
 687                name=self.name, owner_uuid=self.owner_uuid,
 688                ensure_unique_name=self.ensure_unique_name,
 689                num_retries=self.num_retries,
 690                trash_at=self._collection_trash_at())
 691
 692    def destroy_cache(self):
 693        if self.use_cache:
 694            try:
 695                os.unlink(self._cache_filename)
 696            except OSError as error:
 697                # That's what we wanted anyway.
 698                if error.errno != errno.ENOENT:
 699                    raise
 700            self._cache_file.close()
 701
 702    def _collection_size(self, collection):
 703        """
 704        Recursively get the total size of the collection
 705        """
 706        size = 0
 707        for item in collection.values():
 708            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
 709                size += self._collection_size(item)
 710            else:
 711                size += item.size()
 712        return size
 713
 714    def _update_task(self):
 715        """
 716        Periodically called support task. File uploading is
 717        asynchronous so we poll status from the collection.
 718        """
 719        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
 720            self._update()
 721
 722    def _update(self, final=False):
 723        """
 724        Update cached manifest text and report progress.
 725        """
 726        if self._upload_started:
 727            with self._collection_lock:
 728                self.bytes_written = self._collection_size(self._local_collection)
 729                if self.use_cache:
 730                    if final:
 731                        manifest = self._local_collection.manifest_text()
 732                    else:
 733                        # Get the manifest text without comitting pending blocks
 734                        manifest = self._local_collection.manifest_text(strip=False,
 735                                                                        normalize=False,
 736                                                                        only_committed=True)
 737                    # Update cache
 738                    with self._state_lock:
 739                        self._state['manifest'] = manifest
 740            if self.use_cache:
 741                try:
 742                    self._save_state()
 743                except Exception as e:
 744                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
 745            # Keep remote collection's trash_at attribute synced when using relative expire dates
 746            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
 747                try:
 748                    self._api_client.collections().update(
 749                        uuid=self._remote_collection.manifest_locator(),
 750                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
 751                    ).execute(num_retries=self.num_retries)
 752                except Exception as e:
 753                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
 754        else:
 755            self.bytes_written = self.bytes_skipped
 756        # Call the reporter, if any
 757        self.report_progress()
 758
 759    def report_progress(self):
 760        if self.reporter is not None:
 761            self.reporter(self.bytes_written, self.bytes_expected)
 762
 763    def _write_stdin(self, filename):
 764        output = self._local_collection.open(filename, 'wb')
 765        self._write(sys.stdin.buffer, output)
 766        output.close()
 767
 768    def _check_file(self, source, filename):
 769        """
 770        Check if this file needs to be uploaded
 771        """
 772        # Ignore symlinks when requested
 773        if (not self.follow_links) and os.path.islink(source):
 774            return
 775        resume_offset = 0
 776        should_upload = False
 777        new_file_in_cache = False
 778        # Record file path for updating the remote collection before exiting
 779        self._file_paths.add(filename)
 780
 781        with self._state_lock:
 782            # If no previous cached data on this file, store it for an eventual
 783            # repeated run.
 784            if source not in self._state['files']:
 785                self._state['files'][source] = {
 786                    'mtime': os.path.getmtime(source),
 787                    'size' : os.path.getsize(source)
 788                }
 789                new_file_in_cache = True
 790            cached_file_data = self._state['files'][source]
 791
 792        # Check if file was already uploaded (at least partially)
 793        file_in_local_collection = self._local_collection.find(filename)
 794
 795        # If not resuming, upload the full file.
 796        if not self.resume:
 797            should_upload = True
 798        # New file detected from last run, upload it.
 799        elif new_file_in_cache:
 800            should_upload = True
 801        # Local file didn't change from last run.
 802        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
 803            if not file_in_local_collection:
 804                # File not uploaded yet, upload it completely
 805                should_upload = True
 806            elif file_in_local_collection.permission_expired():
 807                # Permission token expired, re-upload file. This will change whenever
 808                # we have a API for refreshing tokens.
 809                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
 810                should_upload = True
 811                self._local_collection.remove(filename)
 812            elif cached_file_data['size'] == file_in_local_collection.size():
 813                # File already there, skip it.
 814                self.bytes_skipped += cached_file_data['size']
 815            elif cached_file_data['size'] > file_in_local_collection.size():
 816                # File partially uploaded, resume!
 817                resume_offset = file_in_local_collection.size()
 818                self.bytes_skipped += resume_offset
 819                should_upload = True
 820            else:
 821                # Inconsistent cache, re-upload the file
 822                should_upload = True
 823                self._local_collection.remove(filename)
 824                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
 825        # Local file differs from cached data, re-upload it.
 826        else:
 827            if file_in_local_collection:
 828                self._local_collection.remove(filename)
 829            should_upload = True
 830
 831        if should_upload:
 832            try:
 833                self._files_to_upload.append((source, resume_offset, filename))
 834            except ArvPutUploadIsPending:
 835                # This could happen when running on dry-mode, close cache file to
 836                # avoid locking issues.
 837                self._cache_file.close()
 838                raise
 839
 840    def _upload_files(self):
 841        for source, resume_offset, filename in self._files_to_upload:
 842            with open(source, 'rb') as source_fd:
 843                with self._state_lock:
 844                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
 845                    self._state['files'][source]['size'] = os.path.getsize(source)
 846                if resume_offset > 0:
 847                    # Start upload where we left off
 848                    output = self._local_collection.open(filename, 'ab')
 849                    source_fd.seek(resume_offset)
 850                else:
 851                    # Start from scratch
 852                    output = self._local_collection.open(filename, 'wb')
 853                self._write(source_fd, output)
 854                output.close(flush=False)
 855
 856    def _write(self, source_fd, output):
 857        while True:
 858            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
 859            if not data:
 860                break
 861            output.write(data)
 862
 863    def _my_collection(self):
 864        return self._remote_collection if self.update else self._local_collection
 865
 866    def _get_cache_filepath(self):
 867        # Set up cache file name from input paths.
 868        md5 = hashlib.md5()
 869        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
 870        realpaths = sorted(os.path.realpath(path) for path in self.paths)
 871        md5.update(b'\0'.join([p.encode() for p in realpaths]))
 872        if self.filename:
 873            md5.update(self.filename.encode())
 874        cache_path = Path(self.CACHE_DIR)
 875        if len(cache_path.parts) == 1:
 876            cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path)
 877        else:
 878            # Note this is a noop if cache_path is absolute, which is what we want.
 879            cache_path = Path.home() / cache_path
 880            cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
 881        return str(cache_path / md5.hexdigest())
 882
 883    def _setup_state(self, update_collection):
 884        """
 885        Create a new cache file or load a previously existing one.
 886        """
 887        # Load an already existing collection for update
 888        if update_collection and re.match(arvados.util.collection_uuid_pattern,
 889                                          update_collection):
 890            try:
 891                self._remote_collection = arvados.collection.Collection(
 892                    update_collection,
 893                    api_client=self._api_client,
 894                    storage_classes_desired=self.storage_classes,
 895                    num_retries=self.num_retries)
 896            except arvados.errors.ApiError as error:
 897                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
 898            else:
 899                self.update = True
 900        elif update_collection:
 901            # Collection locator provided, but unknown format
 902            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 903
 904        if self.use_cache:
 905            cache_filepath = self._get_cache_filepath()
 906            if self.resume and os.path.exists(cache_filepath):
 907                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
 908                self._cache_file = open(cache_filepath, 'a+')
 909            else:
 910                # --no-resume means start with a empty cache file.
 911                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
 912                self._cache_file = open(cache_filepath, 'w+')
 913            self._cache_filename = self._cache_file.name
 914            self._lock_file(self._cache_file)
 915            self._cache_file.seek(0)
 916
 917        with self._state_lock:
 918            if self.use_cache:
 919                try:
 920                    self._state = json.load(self._cache_file)
 921                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
 922                        # Cache at least partially incomplete, set up new cache
 923                        self._state = copy.deepcopy(self.EMPTY_STATE)
 924                except ValueError:
 925                    # Cache file empty, set up new cache
 926                    self._state = copy.deepcopy(self.EMPTY_STATE)
 927            else:
 928                self.logger.info("No cache usage requested for this run.")
 929                # No cache file, set empty state
 930                self._state = copy.deepcopy(self.EMPTY_STATE)
 931            if not self._cached_manifest_valid():
 932                if not self.batch_mode:
 933                    raise ResumeCacheInvalidError()
 934                else:
 935                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
 936                    self.use_cache = False # Don't overwrite preexisting cache file.
 937                    self._state = copy.deepcopy(self.EMPTY_STATE)
 938            # Load the previous manifest so we can check if files were modified remotely.
 939            self._local_collection = arvados.collection.Collection(
 940                self._state['manifest'],
 941                replication_desired=self.replication_desired,
 942                storage_classes_desired=self.storage_classes,
 943                put_threads=self.put_threads,
 944                api_client=self._api_client,
 945                num_retries=self.num_retries)
 946
 947    def _cached_manifest_valid(self):
 948        """
 949        Validate the oldest non-expired block signature to check if cached manifest
 950        is usable: checking if the cached manifest was not created with a different
 951        arvados account.
 952        """
 953        if self._state.get('manifest', None) is None:
 954            # No cached manifest yet, all good.
 955            return True
 956        now = datetime.datetime.utcnow()
 957        oldest_exp = None
 958        oldest_loc = None
 959        block_found = False
 960        for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']):
 961            loc = m.group(0)
 962            try:
 963                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
 964            except IndexError:
 965                # Locator without signature
 966                continue
 967            block_found = True
 968            if exp > now and (oldest_exp is None or exp < oldest_exp):
 969                oldest_exp = exp
 970                oldest_loc = loc
 971        if not block_found:
 972            # No block signatures found => no invalid block signatures.
 973            return True
 974        if oldest_loc is None:
 975            # Locator signatures found, but all have expired.
 976            # Reset the cache and move on.
 977            self.logger.info('Cache expired, starting from scratch.')
 978            self._state['manifest'] = ''
 979            return True
 980        kc = arvados.KeepClient(api_client=self._api_client,
 981                                num_retries=self.num_retries)
 982        try:
 983            kc.head(oldest_loc)
 984        except arvados.errors.KeepRequestError:
 985            # Something is wrong, cached manifest is not valid.
 986            return False
 987        return True
 988
 989    def collection_file_paths(self, col, path_prefix='.'):
 990        """Return a list of file paths by recursively go through the entire collection `col`"""
 991        file_paths = []
 992        for name, item in col.items():
 993            if isinstance(item, arvados.arvfile.ArvadosFile):
 994                file_paths.append(os.path.join(path_prefix, name))
 995            elif isinstance(item, arvados.collection.Subcollection):
 996                new_prefix = os.path.join(path_prefix, name)
 997                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
 998        return file_paths
 999
1000    def _lock_file(self, fileobj):
1001        try:
1002            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
1003        except IOError:
1004            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
1005
1006    def _save_state(self):
1007        """
1008        Atomically save current state into cache.
1009        """
1010        with self._state_lock:
1011            # We're not using copy.deepcopy() here because it's a lot slower
1012            # than json.dumps(), and we're already needing JSON format to be
1013            # saved on disk.
1014            state = json.dumps(self._state)
1015        try:
1016            new_cache = tempfile.NamedTemporaryFile(
1017                mode='w+',
1018                dir=os.path.dirname(self._cache_filename), delete=False)
1019            self._lock_file(new_cache)
1020            new_cache.write(state)
1021            new_cache.flush()
1022            os.fsync(new_cache)
1023            os.rename(new_cache.name, self._cache_filename)
1024        except (IOError, OSError, ResumeCacheConflict) as error:
1025            self.logger.error("There was a problem while saving the cache file: {}".format(error))
1026            try:
1027                os.unlink(new_cache_name)
1028            except NameError:  # mkstemp failed.
1029                pass
1030        else:
1031            self._cache_file.close()
1032            self._cache_file = new_cache
1033
1034    def collection_name(self):
1035        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1036
1037    def collection_trash_at(self):
1038        return self._my_collection().get_trash_at()
1039
1040    def manifest_locator(self):
1041        return self._my_collection().manifest_locator()
1042
1043    def portable_data_hash(self):
1044        pdh = self._my_collection().portable_data_hash()
1045        m = self._my_collection().stripped_manifest().encode()
1046        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1047        if pdh != local_pdh:
1048            self.logger.warning("\n".join([
1049                "arv-put: API server provided PDH differs from local manifest.",
1050                "         This should not happen; showing API server version."]))
1051        return pdh
1052
1053    def manifest_text(self, stream_name=".", strip=False, normalize=False):
1054        return self._my_collection().manifest_text(stream_name, strip, normalize)
1055
1056    def _datablocks_on_item(self, item):
1057        """
1058        Return a list of datablock locators, recursively navigating
1059        through subcollections
1060        """
1061        if isinstance(item, arvados.arvfile.ArvadosFile):
1062            if item.size() == 0:
1063                # Empty file locator
1064                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1065            else:
1066                locators = []
1067                for segment in item.segments():
1068                    loc = segment.locator
1069                    locators.append(loc)
1070                return locators
1071        elif isinstance(item, arvados.collection.Collection):
1072            l = [self._datablocks_on_item(x) for x in item.values()]
1073            # Fast list flattener method taken from:
1074            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1075            return [loc for sublist in l for loc in sublist]
1076        else:
1077            return None
1078
1079    def data_locators(self):
1080        with self._collection_lock:
1081            # Make sure all datablocks are flushed before getting the locators
1082            self._my_collection().manifest_text()
1083            datablocks = self._datablocks_on_item(self._my_collection())
1084        return datablocks
ArvPutUploadJob( paths, resume=True, use_cache=True, reporter=None, name=None, owner_uuid=None, api_client=None, batch_mode=False, ensure_unique_name=False, num_retries=None, put_threads=None, replication_desired=None, filename=None, update_time=60.0, update_collection=None, storage_classes=None, logger=<Logger arvados.arv_put (WARNING)>, dry_run=False, follow_links=True, exclude_paths=[], exclude_names=None, trash_at=None)
454    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
455                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
456                 ensure_unique_name=False, num_retries=None,
457                 put_threads=None, replication_desired=None, filename=None,
458                 update_time=60.0, update_collection=None, storage_classes=None,
459                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
460                 follow_links=True, exclude_paths=[], exclude_names=None,
461                 trash_at=None):
462        self.paths = paths
463        self.resume = resume
464        self.use_cache = use_cache
465        self.batch_mode = batch_mode
466        self.update = False
467        self.reporter = reporter
468        # This will set to 0 before start counting, if no special files are going
469        # to be read.
470        self.bytes_expected = None
471        self.bytes_written = 0
472        self.bytes_skipped = 0
473        self.name = name
474        self.owner_uuid = owner_uuid
475        self.ensure_unique_name = ensure_unique_name
476        self.num_retries = num_retries
477        self.replication_desired = replication_desired
478        self.put_threads = put_threads
479        self.filename = filename
480        self.storage_classes = storage_classes
481        self._api_client = api_client
482        self._state_lock = threading.Lock()
483        self._state = None # Previous run state (file list & manifest)
484        self._current_files = [] # Current run file list
485        self._cache_file = None
486        self._collection_lock = threading.Lock()
487        self._remote_collection = None # Collection being updated (if asked)
488        self._local_collection = None # Collection from previous run manifest
489        self._file_paths = set() # Files to be updated in remote collection
490        self._stop_checkpointer = threading.Event()
491        self._checkpointer = threading.Thread(target=self._update_task)
492        self._checkpointer.daemon = True
493        self._update_task_time = update_time  # How many seconds wait between update runs
494        self._files_to_upload = FileUploadList(dry_run=dry_run)
495        self._upload_started = False
496        self.logger = logger
497        self.dry_run = dry_run
498        self._checkpoint_before_quit = True
499        self.follow_links = follow_links
500        self.exclude_paths = exclude_paths
501        self.exclude_names = exclude_names
502        self._trash_at = trash_at
503
504        if self._trash_at is not None:
505            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
506                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
507            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
508                raise TypeError('provided trash_at datetime should be timezone-naive')
509
510        if not self.use_cache and self.resume:
511            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
512
513        # Check for obvious dry-run responses
514        if self.dry_run and (not self.use_cache or not self.resume):
515            raise ArvPutUploadIsPending()
516
517        # Load cached data if any and if needed
518        self._setup_state(update_collection)
519
520        # Build the upload file list, excluding requested files and counting the
521        # bytes expected to be uploaded.
522        self._build_upload_list()
CACHE_DIR = 'arv-put'
EMPTY_STATE = {'manifest': None, 'files': {}}
paths
resume
use_cache
batch_mode
update
reporter
bytes_expected
bytes_written
bytes_skipped
name
owner_uuid
ensure_unique_name
num_retries
replication_desired
put_threads
filename
storage_classes
logger
dry_run
exclude_paths
exclude_names
def start(self, save_collection):
615    def start(self, save_collection):
616        """
617        Start supporting thread & file uploading
618        """
619        self._checkpointer.start()
620        try:
621            # Update bytes_written from current local collection and
622            # report initial progress.
623            self._update()
624            # Actual file upload
625            self._upload_started = True # Used by the update thread to start checkpointing
626            self._upload_files()
627        except (SystemExit, Exception) as e:
628            self._checkpoint_before_quit = False
629            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
630            # Note: We're expecting SystemExit instead of
631            # KeyboardInterrupt because we have a custom signal
632            # handler in place that raises SystemExit with the catched
633            # signal's code.
634            if isinstance(e, PathDoesNotExistError):
635                # We aren't interested in the traceback for this case
636                pass
637            elif not isinstance(e, SystemExit) or e.code != -2:
638                self.logger.warning("Abnormal termination:\n{}".format(
639                    traceback.format_exc()))
640            raise
641        finally:
642            if not self.dry_run:
643                # Stop the thread before doing anything else
644                self._stop_checkpointer.set()
645                self._checkpointer.join()
646                if self._checkpoint_before_quit:
647                    # Commit all pending blocks & one last _update()
648                    self._local_collection.manifest_text()
649                    self._update(final=True)
650                    if save_collection:
651                        self.save_collection()
652            if self.use_cache:
653                self._cache_file.close()

Start supporting thread & file uploading

def save_collection(self):
666    def save_collection(self):
667        if self.update:
668            # Check if files should be updated on the remote collection.
669            for fp in self._file_paths:
670                remote_file = self._remote_collection.find(fp)
671                if not remote_file:
672                    # File don't exist on remote collection, copy it.
673                    self._remote_collection.copy(fp, fp, self._local_collection)
674                elif remote_file != self._local_collection.find(fp):
675                    # A different file exist on remote collection, overwrite it.
676                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
677                else:
678                    # The file already exist on remote collection, skip it.
679                    pass
680            self._remote_collection.save(num_retries=self.num_retries,
681                                         trash_at=self._collection_trash_at())
682        else:
683            if len(self._local_collection) == 0:
684                self.logger.warning("No files were uploaded, skipping collection creation.")
685                return
686            self._local_collection.save_new(
687                name=self.name, owner_uuid=self.owner_uuid,
688                ensure_unique_name=self.ensure_unique_name,
689                num_retries=self.num_retries,
690                trash_at=self._collection_trash_at())
def destroy_cache(self):
692    def destroy_cache(self):
693        if self.use_cache:
694            try:
695                os.unlink(self._cache_filename)
696            except OSError as error:
697                # That's what we wanted anyway.
698                if error.errno != errno.ENOENT:
699                    raise
700            self._cache_file.close()
def report_progress(self):
759    def report_progress(self):
760        if self.reporter is not None:
761            self.reporter(self.bytes_written, self.bytes_expected)
def collection_file_paths(self, col, path_prefix='.'):
989    def collection_file_paths(self, col, path_prefix='.'):
990        """Return a list of file paths by recursively go through the entire collection `col`"""
991        file_paths = []
992        for name, item in col.items():
993            if isinstance(item, arvados.arvfile.ArvadosFile):
994                file_paths.append(os.path.join(path_prefix, name))
995            elif isinstance(item, arvados.collection.Subcollection):
996                new_prefix = os.path.join(path_prefix, name)
997                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
998        return file_paths

Return a list of file paths by recursively go through the entire collection col

def collection_name(self):
1034    def collection_name(self):
1035        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
def collection_trash_at(self):
1037    def collection_trash_at(self):
1038        return self._my_collection().get_trash_at()
def manifest_locator(self):
1040    def manifest_locator(self):
1041        return self._my_collection().manifest_locator()
def portable_data_hash(self):
1043    def portable_data_hash(self):
1044        pdh = self._my_collection().portable_data_hash()
1045        m = self._my_collection().stripped_manifest().encode()
1046        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1047        if pdh != local_pdh:
1048            self.logger.warning("\n".join([
1049                "arv-put: API server provided PDH differs from local manifest.",
1050                "         This should not happen; showing API server version."]))
1051        return pdh
def manifest_text(self, stream_name='.', strip=False, normalize=False):
1053    def manifest_text(self, stream_name=".", strip=False, normalize=False):
1054        return self._my_collection().manifest_text(stream_name, strip, normalize)
def data_locators(self):
1079    def data_locators(self):
1080        with self._collection_lock:
1081            # Make sure all datablocks are flushed before getting the locators
1082            self._my_collection().manifest_text()
1083            datablocks = self._datablocks_on_item(self._my_collection())
1084        return datablocks
def pathname_match(pathname, pattern):
1093def pathname_match(pathname, pattern):
1094    name = pathname.split(os.sep)
1095    # Fix patterns like 'some/subdir/' or 'some//subdir'
1096    pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1097    if len(name) != len(pat):
1098        return False
1099    for i in range(len(name)):
1100        if not fnmatch.fnmatch(name[i], pat[i]):
1101            return False
1102    return True
def machine_progress(bytes_written, bytes_expected):
1104def machine_progress(bytes_written, bytes_expected):
1105    return _machine_format.format(
1106        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
def human_progress(bytes_written, bytes_expected):
1108def human_progress(bytes_written, bytes_expected):
1109    if bytes_expected:
1110        return "\r{}M / {}M {:.1%} ".format(
1111            bytes_written >> 20, bytes_expected >> 20,
1112            float(bytes_written) / bytes_expected)
1113    else:
1114        return "\r{} ".format(bytes_written)
def progress_writer(progress_func, outfile=<_io.TextIOWrapper encoding='UTF-8'>):
1116def progress_writer(progress_func, outfile=sys.stderr):
1117    def write_progress(bytes_written, bytes_expected):
1118        outfile.write(progress_func(bytes_written, bytes_expected))
1119    return write_progress
def desired_project_uuid(api_client, project_uuid, num_retries):
1121def desired_project_uuid(api_client, project_uuid, num_retries):
1122    if not project_uuid:
1123        query = api_client.users().current()
1124    elif arvados.util.user_uuid_pattern.match(project_uuid):
1125        query = api_client.users().get(uuid=project_uuid)
1126    elif arvados.util.group_uuid_pattern.match(project_uuid):
1127        query = api_client.groups().get(uuid=project_uuid)
1128    else:
1129        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1130    return query.execute(num_retries=num_retries)['uuid']
def main( arguments=None, stdout=<_io.TextIOWrapper encoding='UTF-8'>, stderr=<_io.TextIOWrapper encoding='UTF-8'>, install_sig_handlers=True):
1132def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1133         install_sig_handlers=True):
1134    global api_client
1135
1136    args = parse_arguments(arguments)
1137    logger = logging.getLogger('arvados.arv_put')
1138    if args.silent:
1139        logger.setLevel(logging.WARNING)
1140    else:
1141        logger.setLevel(logging.INFO)
1142    status = 0
1143
1144    request_id = arvados.util.new_request_id()
1145
1146    formatter = ArvPutLogFormatter(request_id)
1147    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1148
1149    if api_client is None:
1150        api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1151
1152    if install_sig_handlers:
1153        arv_cmd.install_signal_handlers()
1154
1155    # Trash arguments validation
1156    trash_at = None
1157    if args.trash_at is not None:
1158        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1159        # make sure the user provides a complete YYYY-MM-DD date.
1160        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1161            logger.error("--trash-at argument format invalid, use --help to see examples.")
1162            sys.exit(1)
1163        # Check if no time information was provided. In that case, assume end-of-day.
1164        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1165            args.trash_at += 'T23:59:59'
1166        try:
1167            trash_at = ciso8601.parse_datetime(args.trash_at)
1168        except:
1169            logger.error("--trash-at argument format invalid, use --help to see examples.")
1170            sys.exit(1)
1171        else:
1172            if trash_at.tzinfo is not None:
1173                # Timezone aware datetime provided.
1174                utcoffset = -trash_at.utcoffset()
1175            else:
1176                # Timezone naive datetime provided. Assume is local.
1177                if time.daylight:
1178                    utcoffset = datetime.timedelta(seconds=time.altzone)
1179                else:
1180                    utcoffset = datetime.timedelta(seconds=time.timezone)
1181            # Convert to UTC timezone naive datetime.
1182            trash_at = trash_at.replace(tzinfo=None) + utcoffset
1183
1184        if trash_at <= datetime.datetime.utcnow():
1185            logger.error("--trash-at argument must be set in the future")
1186            sys.exit(1)
1187    if args.trash_after is not None:
1188        if args.trash_after < 1:
1189            logger.error("--trash-after argument must be >= 1")
1190            sys.exit(1)
1191        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1192
1193    # Determine the name to use
1194    if args.name:
1195        if args.stream or args.raw:
1196            logger.error("Cannot use --name with --stream or --raw")
1197            sys.exit(1)
1198        elif args.update_collection:
1199            logger.error("Cannot use --name with --update-collection")
1200            sys.exit(1)
1201        collection_name = args.name
1202    else:
1203        collection_name = "Saved at {} by {}@{}".format(
1204            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1205            pwd.getpwuid(os.getuid()).pw_name,
1206            socket.gethostname())
1207
1208    if args.project_uuid and (args.stream or args.raw):
1209        logger.error("Cannot use --project-uuid with --stream or --raw")
1210        sys.exit(1)
1211
1212    # Determine the parent project
1213    try:
1214        project_uuid = desired_project_uuid(api_client, args.project_uuid,
1215                                            args.retries)
1216    except (apiclient_errors.Error, ValueError) as error:
1217        logger.error(error)
1218        sys.exit(1)
1219
1220    if args.progress:
1221        reporter = progress_writer(human_progress)
1222    elif args.batch_progress:
1223        reporter = progress_writer(machine_progress)
1224    else:
1225        reporter = None
1226
1227    # Setup exclude regex from all the --exclude arguments provided
1228    name_patterns = []
1229    exclude_paths = []
1230    exclude_names = None
1231    if len(args.exclude) > 0:
1232        # We're supporting 2 kinds of exclusion patterns:
1233        # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1234        #                            the name, wherever the file is on the tree)
1235        # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1236        #                            entire path, and should be relative to
1237        #                            any input dir argument)
1238        # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1239        #                            placed directly underneath the input dir)
1240        for p in args.exclude:
1241            # Only relative paths patterns allowed
1242            if p.startswith(os.sep):
1243                logger.error("Cannot use absolute paths with --exclude")
1244                sys.exit(1)
1245            if os.path.dirname(p):
1246                # We don't support of path patterns with '..'
1247                p_parts = p.split(os.sep)
1248                if '..' in p_parts:
1249                    logger.error(
1250                        "Cannot use path patterns that include or '..'")
1251                    sys.exit(1)
1252                # Path search pattern
1253                exclude_paths.append(p)
1254            else:
1255                # Name-only search pattern
1256                name_patterns.append(p)
1257        # For name only matching, we can combine all patterns into a single
1258        # regexp, for better performance.
1259        exclude_names = re.compile('|'.join(
1260            [fnmatch.translate(p) for p in name_patterns]
1261        )) if len(name_patterns) > 0 else None
1262        # Show the user the patterns to be used, just in case they weren't
1263        # specified inside quotes and got changed by the shell expansion.
1264        logger.info("Exclude patterns: {}".format(args.exclude))
1265
1266    # If this is used by a human, and there's at least one directory to be
1267    # uploaded, the expected bytes calculation can take a moment.
1268    if args.progress and any([os.path.isdir(f) for f in args.paths]):
1269        logger.info("Calculating upload size, this could take some time...")
1270    try:
1271        writer = ArvPutUploadJob(paths = args.paths,
1272                                 resume = args.resume,
1273                                 use_cache = args.use_cache,
1274                                 batch_mode= args.batch,
1275                                 filename = args.filename,
1276                                 reporter = reporter,
1277                                 api_client = api_client,
1278                                 num_retries = args.retries,
1279                                 replication_desired = args.replication,
1280                                 put_threads = args.threads,
1281                                 name = collection_name,
1282                                 owner_uuid = project_uuid,
1283                                 ensure_unique_name = True,
1284                                 update_collection = args.update_collection,
1285                                 storage_classes=args.storage_classes,
1286                                 logger=logger,
1287                                 dry_run=args.dry_run,
1288                                 follow_links=args.follow_links,
1289                                 exclude_paths=exclude_paths,
1290                                 exclude_names=exclude_names,
1291                                 trash_at=trash_at)
1292    except ResumeCacheConflict:
1293        logger.error("\n".join([
1294            "arv-put: Another process is already uploading this data.",
1295            "         Use --no-cache if this is really what you want."]))
1296        sys.exit(1)
1297    except ResumeCacheInvalidError:
1298        logger.error("\n".join([
1299            "arv-put: Resume cache contains invalid signature: it may have expired",
1300            "         or been created with another Arvados user's credentials.",
1301            "         Switch user or use one of the following options to restart upload:",
1302            "         --no-resume to start a new resume cache.",
1303            "         --no-cache to disable resume cache.",
1304            "         --batch to ignore the resume cache if invalid."]))
1305        sys.exit(1)
1306    except (CollectionUpdateError, PathDoesNotExistError) as error:
1307        logger.error("\n".join([
1308            "arv-put: %s" % str(error)]))
1309        sys.exit(1)
1310    except ArvPutUploadIsPending:
1311        # Dry run check successful, return proper exit code.
1312        sys.exit(2)
1313    except ArvPutUploadNotPending:
1314        # No files pending for upload
1315        sys.exit(0)
1316
1317    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1318        logger.warning("\n".join([
1319            "arv-put: Resuming previous upload from last checkpoint.",
1320            "         Use the --no-resume option to start over."]))
1321
1322    if not args.dry_run:
1323        writer.report_progress()
1324    output = None
1325    try:
1326        writer.start(save_collection=not(args.stream or args.raw))
1327    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1328        logger.error("\n".join([
1329            "arv-put: %s" % str(error)]))
1330        sys.exit(1)
1331
1332    if args.progress:  # Print newline to split stderr from stdout for humans.
1333        logger.info("\n")
1334
1335    if args.stream:
1336        if args.normalize:
1337            output = writer.manifest_text(normalize=True)
1338        else:
1339            output = writer.manifest_text()
1340    elif args.raw:
1341        output = ','.join(writer.data_locators())
1342    elif writer.manifest_locator() is not None:
1343        try:
1344            expiration_notice = ""
1345            if writer.collection_trash_at() is not None:
1346                # Get the local timezone-naive version, and log it with timezone information.
1347                if time.daylight:
1348                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1349                else:
1350                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1351                expiration_notice = ". It will expire on {} {}.".format(
1352                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1353            if args.update_collection:
1354                logger.info(u"Collection updated: '{}'{}".format(
1355                    writer.collection_name(), expiration_notice))
1356            else:
1357                logger.info(u"Collection saved as '{}'{}".format(
1358                    writer.collection_name(), expiration_notice))
1359            if args.portable_data_hash:
1360                output = writer.portable_data_hash()
1361            else:
1362                output = writer.manifest_locator()
1363        except apiclient_errors.Error as error:
1364            logger.error(
1365                "arv-put: Error creating Collection on project: {}.".format(
1366                    error))
1367            status = 1
1368    else:
1369        status = 1
1370
1371    # Print the locator (uuid) of the new collection.
1372    if output is None:
1373        status = status or 1
1374    elif not args.silent:
1375        stdout.write(output)
1376        if not output.endswith('\n'):
1377            stdout.write('\n')
1378
1379    if install_sig_handlers:
1380        arv_cmd.restore_signal_handlers()
1381
1382    if status != 0:
1383        sys.exit(status)
1384
1385    # Success!
1386    return output