arvados.commands.put

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5from __future__ import division
   6from future.utils import listitems, listvalues
   7from builtins import str
   8from builtins import object
   9import argparse
  10import arvados
  11import arvados.collection
  12import base64
  13import ciso8601
  14import copy
  15import datetime
  16import errno
  17import fcntl
  18import fnmatch
  19import hashlib
  20import json
  21import logging
  22import os
  23import pwd
  24import re
  25import signal
  26import socket
  27import sys
  28import tempfile
  29import threading
  30import time
  31import traceback
  32
  33from apiclient import errors as apiclient_errors
  34from arvados._version import __version__
  35from arvados.util import keep_locator_pattern
  36
  37import arvados.commands._util as arv_cmd
  38
  39api_client = None
  40
  41upload_opts = argparse.ArgumentParser(add_help=False)
  42
  43upload_opts.add_argument('--version', action='version',
  44                         version="%s %s" % (sys.argv[0], __version__),
  45                         help='Print version and exit.')
  46upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
  47                         help="""
  48Local file or directory. If path is a directory reference with a trailing
  49slash, then just upload the directory's contents; otherwise upload the
  50directory itself. Default: read from standard input.
  51""")
  52
  53_group = upload_opts.add_mutually_exclusive_group()
  54
  55_group.add_argument('--max-manifest-depth', type=int, metavar='N',
  56                    default=-1, help=argparse.SUPPRESS)
  57
  58_group.add_argument('--normalize', action='store_true',
  59                    help="""
  60Normalize the manifest by re-ordering files and streams after writing
  61data.
  62""")
  63
  64_group.add_argument('--dry-run', action='store_true', default=False,
  65                    help="""
  66Don't actually upload files, but only check if any file should be
  67uploaded. Exit with code=2 when files are pending for upload.
  68""")
  69
  70_group = upload_opts.add_mutually_exclusive_group()
  71
  72_group.add_argument('--as-stream', action='store_true', dest='stream',
  73                    help="""
  74Synonym for --stream.
  75""")
  76
  77_group.add_argument('--stream', action='store_true',
  78                    help="""
  79Store the file content and display the resulting manifest on
  80stdout. Do not save a Collection object in Arvados.
  81""")
  82
  83_group.add_argument('--as-manifest', action='store_true', dest='manifest',
  84                    help="""
  85Synonym for --manifest.
  86""")
  87
  88_group.add_argument('--in-manifest', action='store_true', dest='manifest',
  89                    help="""
  90Synonym for --manifest.
  91""")
  92
  93_group.add_argument('--manifest', action='store_true',
  94                    help="""
  95Store the file data and resulting manifest in Keep, save a Collection
  96object in Arvados, and display the manifest locator (Collection uuid)
  97on stdout. This is the default behavior.
  98""")
  99
 100_group.add_argument('--as-raw', action='store_true', dest='raw',
 101                    help="""
 102Synonym for --raw.
 103""")
 104
 105_group.add_argument('--raw', action='store_true',
 106                    help="""
 107Store the file content and display the data block locators on stdout,
 108separated by commas, with a trailing newline. Do not store a
 109manifest.
 110""")
 111
 112upload_opts.add_argument('--update-collection', type=str, default=None,
 113                         dest='update_collection', metavar="UUID", help="""
 114Update an existing collection identified by the given Arvados collection
 115UUID. All new local files will be uploaded.
 116""")
 117
 118upload_opts.add_argument('--use-filename', type=str, default=None,
 119                         dest='filename', help="""
 120Synonym for --filename.
 121""")
 122
 123upload_opts.add_argument('--filename', type=str, default=None,
 124                         help="""
 125Use the given filename in the manifest, instead of the name of the
 126local file. This is useful when "-" or "/dev/stdin" is given as an
 127input file. It can be used only if there is exactly one path given and
 128it is not a directory. Implies --manifest.
 129""")
 130
 131upload_opts.add_argument('--portable-data-hash', action='store_true',
 132                         help="""
 133Print the portable data hash instead of the Arvados UUID for the collection
 134created by the upload.
 135""")
 136
 137upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
 138                         help="""
 139Set the replication level for the new collection: how many different
 140physical storage devices (e.g., disks) should have a copy of each data
 141block. Default is to use the server-provided default (if any) or 2.
 142""")
 143
 144upload_opts.add_argument('--storage-classes', help="""
 145Specify comma separated list of storage classes to be used when saving data to Keep.
 146""")
 147
 148upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
 149                         help="""
 150Set the number of upload threads to be used. Take into account that
 151using lots of threads will increase the RAM requirements. Default is
 152to use 2 threads.
 153On high latency installations, using a greater number will improve
 154overall throughput.
 155""")
 156
 157upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
 158                      action='append', help="""
 159Exclude files and directories whose names match the given glob pattern. When
 160using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
 161directory, relative to the provided input dirs will be excluded.
 162When using a filename pattern like '*.txt', any text file will be excluded
 163no matter where it is placed.
 164For the special case of needing to exclude only files or dirs directly below
 165the given input directory, you can use a pattern like './exclude_this.gif'.
 166You can specify multiple patterns by using this argument more than once.
 167""")
 168
 169_group = upload_opts.add_mutually_exclusive_group()
 170_group.add_argument('--follow-links', action='store_true', default=True,
 171                    dest='follow_links', help="""
 172Follow file and directory symlinks (default).
 173""")
 174_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
 175                    help="""
 176Ignore file and directory symlinks. Even paths given explicitly on the
 177command line will be skipped if they are symlinks.
 178""")
 179
 180
 181run_opts = argparse.ArgumentParser(add_help=False)
 182
 183run_opts.add_argument('--project-uuid', metavar='UUID', help="""
 184Store the collection in the specified project, instead of your Home
 185project.
 186""")
 187
 188run_opts.add_argument('--name', help="""
 189Save the collection with the specified name.
 190""")
 191
 192_group = run_opts.add_mutually_exclusive_group()
 193_group.add_argument('--progress', action='store_true',
 194                    help="""
 195Display human-readable progress on stderr (bytes and, if possible,
 196percentage of total data size). This is the default behavior when
 197stderr is a tty.
 198""")
 199
 200_group.add_argument('--no-progress', action='store_true',
 201                    help="""
 202Do not display human-readable progress on stderr, even if stderr is a
 203tty.
 204""")
 205
 206_group.add_argument('--batch-progress', action='store_true',
 207                    help="""
 208Display machine-readable progress on stderr (bytes and, if known,
 209total data size).
 210""")
 211
 212run_opts.add_argument('--silent', action='store_true',
 213                      help="""
 214Do not print any debug messages to console. (Any error messages will
 215still be displayed.)
 216""")
 217
 218run_opts.add_argument('--batch', action='store_true', default=False,
 219                      help="""
 220Retries with '--no-resume --no-cache' if cached state contains invalid/expired
 221block signatures.
 222""")
 223
 224_group = run_opts.add_mutually_exclusive_group()
 225_group.add_argument('--resume', action='store_true', default=True,
 226                    help="""
 227Continue interrupted uploads from cached state (default).
 228""")
 229_group.add_argument('--no-resume', action='store_false', dest='resume',
 230                    help="""
 231Do not continue interrupted uploads from cached state.
 232""")
 233
 234_group = run_opts.add_mutually_exclusive_group()
 235_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
 236                    help="""
 237Save upload state in a cache file for resuming (default).
 238""")
 239_group.add_argument('--no-cache', action='store_false', dest='use_cache',
 240                    help="""
 241Do not save upload state in a cache file for resuming.
 242""")
 243
 244_group = upload_opts.add_mutually_exclusive_group()
 245_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
 246                    help="""
 247Set the trash date of the resulting collection to an absolute date in the future.
 248The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
 249Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
 250""")
 251_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
 252                    help="""
 253Set the trash date of the resulting collection to an amount of days from the
 254date/time that the upload process finishes.
 255""")
 256
 257arg_parser = argparse.ArgumentParser(
 258    description='Copy data from the local filesystem to Keep.',
 259    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 260
 261def parse_arguments(arguments):
 262    args = arg_parser.parse_args(arguments)
 263
 264    if len(args.paths) == 0:
 265        args.paths = ['-']
 266
 267    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
 268
 269    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
 270        arg_parser.error("""
 271    --filename argument cannot be used when storing a directory or
 272    multiple files.
 273    """)
 274
 275    # Turn on --progress by default if stderr is a tty.
 276    if (not (args.batch_progress or args.no_progress or args.silent)
 277        and os.isatty(sys.stderr.fileno())):
 278        args.progress = True
 279
 280    # Turn off --resume (default) if --no-cache is used.
 281    if not args.use_cache:
 282        args.resume = False
 283
 284    if args.paths == ['-']:
 285        if args.update_collection:
 286            arg_parser.error("""
 287    --update-collection cannot be used when reading from stdin.
 288    """)
 289        args.resume = False
 290        args.use_cache = False
 291        if not args.filename:
 292            args.filename = 'stdin'
 293
 294    # Remove possible duplicated patterns
 295    if len(args.exclude) > 0:
 296        args.exclude = list(set(args.exclude))
 297
 298    return args
 299
 300
 301class PathDoesNotExistError(Exception):
 302    pass
 303
 304
 305class CollectionUpdateError(Exception):
 306    pass
 307
 308
 309class ResumeCacheConflict(Exception):
 310    pass
 311
 312
 313class ResumeCacheInvalidError(Exception):
 314    pass
 315
 316class ArvPutArgumentConflict(Exception):
 317    pass
 318
 319
 320class ArvPutUploadIsPending(Exception):
 321    pass
 322
 323
 324class ArvPutUploadNotPending(Exception):
 325    pass
 326
 327
 328class FileUploadList(list):
 329    def __init__(self, dry_run=False):
 330        list.__init__(self)
 331        self.dry_run = dry_run
 332
 333    def append(self, other):
 334        if self.dry_run:
 335            raise ArvPutUploadIsPending()
 336        super(FileUploadList, self).append(other)
 337
 338
 339# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
 340class ArvPutLogFormatter(logging.Formatter):
 341    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
 342    err_fmtr = None
 343    request_id_informed = False
 344
 345    def __init__(self, request_id):
 346        self.err_fmtr = logging.Formatter(
 347            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
 348            arvados.log_date_format)
 349
 350    def format(self, record):
 351        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
 352            self.request_id_informed = True
 353            return self.err_fmtr.format(record)
 354        return self.std_fmtr.format(record)
 355
 356
 357class ResumeCache(object):
 358    CACHE_DIR = '.cache/arvados/arv-put'
 359
 360    def __init__(self, file_spec):
 361        self.cache_file = open(file_spec, 'a+')
 362        self._lock_file(self.cache_file)
 363        self.filename = self.cache_file.name
 364
 365    @classmethod
 366    def make_path(cls, args):
 367        md5 = hashlib.md5()
 368        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
 369        realpaths = sorted(os.path.realpath(path) for path in args.paths)
 370        md5.update(b'\0'.join([p.encode() for p in realpaths]))
 371        if any(os.path.isdir(path) for path in realpaths):
 372            md5.update(b'-1')
 373        elif args.filename:
 374            md5.update(args.filename.encode())
 375        return os.path.join(
 376            arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
 377            md5.hexdigest())
 378
 379    def _lock_file(self, fileobj):
 380        try:
 381            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
 382        except IOError:
 383            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 384
 385    def load(self):
 386        self.cache_file.seek(0)
 387        return json.load(self.cache_file)
 388
 389    def check_cache(self, api_client=None, num_retries=0):
 390        try:
 391            state = self.load()
 392            locator = None
 393            try:
 394                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
 395                    locator = state["_finished_streams"][0][1][0]
 396                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
 397                    locator = state["_current_stream_locators"][0]
 398                if locator is not None:
 399                    kc = arvados.keep.KeepClient(api_client=api_client)
 400                    kc.head(locator, num_retries=num_retries)
 401            except Exception as e:
 402                self.restart()
 403        except (ValueError):
 404            pass
 405
 406    def save(self, data):
 407        try:
 408            new_cache_fd, new_cache_name = tempfile.mkstemp(
 409                dir=os.path.dirname(self.filename))
 410            self._lock_file(new_cache_fd)
 411            new_cache = os.fdopen(new_cache_fd, 'r+')
 412            json.dump(data, new_cache)
 413            os.rename(new_cache_name, self.filename)
 414        except (IOError, OSError, ResumeCacheConflict):
 415            try:
 416                os.unlink(new_cache_name)
 417            except NameError:  # mkstemp failed.
 418                pass
 419        else:
 420            self.cache_file.close()
 421            self.cache_file = new_cache
 422
 423    def close(self):
 424        self.cache_file.close()
 425
 426    def destroy(self):
 427        try:
 428            os.unlink(self.filename)
 429        except OSError as error:
 430            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
 431                raise
 432        self.close()
 433
 434    def restart(self):
 435        self.destroy()
 436        self.__init__(self.filename)
 437
 438
 439class ArvPutUploadJob(object):
 440    CACHE_DIR = '.cache/arvados/arv-put'
 441    EMPTY_STATE = {
 442        'manifest' : None, # Last saved manifest checkpoint
 443        'files' : {} # Previous run file list: {path : {size, mtime}}
 444    }
 445
 446    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
 447                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
 448                 ensure_unique_name=False, num_retries=None,
 449                 put_threads=None, replication_desired=None, filename=None,
 450                 update_time=60.0, update_collection=None, storage_classes=None,
 451                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
 452                 follow_links=True, exclude_paths=[], exclude_names=None,
 453                 trash_at=None):
 454        self.paths = paths
 455        self.resume = resume
 456        self.use_cache = use_cache
 457        self.batch_mode = batch_mode
 458        self.update = False
 459        self.reporter = reporter
 460        # This will set to 0 before start counting, if no special files are going
 461        # to be read.
 462        self.bytes_expected = None
 463        self.bytes_written = 0
 464        self.bytes_skipped = 0
 465        self.name = name
 466        self.owner_uuid = owner_uuid
 467        self.ensure_unique_name = ensure_unique_name
 468        self.num_retries = num_retries
 469        self.replication_desired = replication_desired
 470        self.put_threads = put_threads
 471        self.filename = filename
 472        self.storage_classes = storage_classes
 473        self._api_client = api_client
 474        self._state_lock = threading.Lock()
 475        self._state = None # Previous run state (file list & manifest)
 476        self._current_files = [] # Current run file list
 477        self._cache_file = None
 478        self._collection_lock = threading.Lock()
 479        self._remote_collection = None # Collection being updated (if asked)
 480        self._local_collection = None # Collection from previous run manifest
 481        self._file_paths = set() # Files to be updated in remote collection
 482        self._stop_checkpointer = threading.Event()
 483        self._checkpointer = threading.Thread(target=self._update_task)
 484        self._checkpointer.daemon = True
 485        self._update_task_time = update_time  # How many seconds wait between update runs
 486        self._files_to_upload = FileUploadList(dry_run=dry_run)
 487        self._upload_started = False
 488        self.logger = logger
 489        self.dry_run = dry_run
 490        self._checkpoint_before_quit = True
 491        self.follow_links = follow_links
 492        self.exclude_paths = exclude_paths
 493        self.exclude_names = exclude_names
 494        self._trash_at = trash_at
 495
 496        if self._trash_at is not None:
 497            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
 498                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
 499            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
 500                raise TypeError('provided trash_at datetime should be timezone-naive')
 501
 502        if not self.use_cache and self.resume:
 503            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 504
 505        # Check for obvious dry-run responses
 506        if self.dry_run and (not self.use_cache or not self.resume):
 507            raise ArvPutUploadIsPending()
 508
 509        # Load cached data if any and if needed
 510        self._setup_state(update_collection)
 511
 512        # Build the upload file list, excluding requested files and counting the
 513        # bytes expected to be uploaded.
 514        self._build_upload_list()
 515
 516    def _build_upload_list(self):
 517        """
 518        Scan the requested paths to count file sizes, excluding requested files
 519        and dirs and building the upload file list.
 520        """
 521        # If there aren't special files to be read, reset total bytes count to zero
 522        # to start counting.
 523        if not any([p for p in self.paths
 524                    if not (os.path.isfile(p) or os.path.isdir(p))]):
 525            self.bytes_expected = 0
 526
 527        for path in self.paths:
 528            # Test for stdin first, in case some file named '-' exist
 529            if path == '-':
 530                if self.dry_run:
 531                    raise ArvPutUploadIsPending()
 532                self._write_stdin(self.filename or 'stdin')
 533            elif not os.path.exists(path):
 534                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
 535            elif (not self.follow_links) and os.path.islink(path):
 536                self.logger.warning("Skipping symlink '{}'".format(path))
 537                continue
 538            elif os.path.isdir(path):
 539                # Use absolute paths on cache index so CWD doesn't interfere
 540                # with the caching logic.
 541                orig_path = path
 542                path = os.path.abspath(path)
 543                if orig_path[-1:] == os.sep:
 544                    # When passing a directory reference with a trailing slash,
 545                    # its contents should be uploaded directly to the
 546                    # collection's root.
 547                    prefixdir = path
 548                else:
 549                    # When passing a directory reference with no trailing slash,
 550                    # upload the directory to the collection's root.
 551                    prefixdir = os.path.dirname(path)
 552                prefixdir += os.sep
 553                for root, dirs, files in os.walk(path,
 554                                                 followlinks=self.follow_links):
 555                    root_relpath = os.path.relpath(root, path)
 556                    if root_relpath == '.':
 557                        root_relpath = ''
 558                    # Exclude files/dirs by full path matching pattern
 559                    if self.exclude_paths:
 560                        dirs[:] = [d for d in dirs
 561                                   if not any(pathname_match(
 562                                           os.path.join(root_relpath, d), pat)
 563                                              for pat in self.exclude_paths)]
 564                        files = [f for f in files
 565                                 if not any(pathname_match(
 566                                         os.path.join(root_relpath, f), pat)
 567                                            for pat in self.exclude_paths)]
 568                    # Exclude files/dirs by name matching pattern
 569                    if self.exclude_names is not None:
 570                        dirs[:] = [d for d in dirs
 571                                   if not self.exclude_names.match(d)]
 572                        files = [f for f in files
 573                                 if not self.exclude_names.match(f)]
 574                    # Make os.walk()'s dir traversing order deterministic
 575                    dirs.sort()
 576                    files.sort()
 577                    for f in files:
 578                        filepath = os.path.join(root, f)
 579                        if not os.path.isfile(filepath):
 580                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
 581                            continue
 582                        # Add its size to the total bytes count (if applicable)
 583                        if self.follow_links or (not os.path.islink(filepath)):
 584                            if self.bytes_expected is not None:
 585                                self.bytes_expected += os.path.getsize(filepath)
 586                        self._check_file(filepath,
 587                                         os.path.join(root[len(prefixdir):], f))
 588            else:
 589                filepath = os.path.abspath(path)
 590                # Add its size to the total bytes count (if applicable)
 591                if self.follow_links or (not os.path.islink(filepath)):
 592                    if self.bytes_expected is not None:
 593                        self.bytes_expected += os.path.getsize(filepath)
 594                self._check_file(filepath,
 595                                 self.filename or os.path.basename(path))
 596        # If dry-mode is on, and got up to this point, then we should notify that
 597        # there aren't any file to upload.
 598        if self.dry_run:
 599            raise ArvPutUploadNotPending()
 600        # Remove local_collection's files that don't exist locally anymore, so the
 601        # bytes_written count is correct.
 602        for f in self.collection_file_paths(self._local_collection,
 603                                            path_prefix=""):
 604            if f != 'stdin' and f != self.filename and not f in self._file_paths:
 605                self._local_collection.remove(f)
 606
 607    def start(self, save_collection):
 608        """
 609        Start supporting thread & file uploading
 610        """
 611        self._checkpointer.start()
 612        try:
 613            # Update bytes_written from current local collection and
 614            # report initial progress.
 615            self._update()
 616            # Actual file upload
 617            self._upload_started = True # Used by the update thread to start checkpointing
 618            self._upload_files()
 619        except (SystemExit, Exception) as e:
 620            self._checkpoint_before_quit = False
 621            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
 622            # Note: We're expecting SystemExit instead of
 623            # KeyboardInterrupt because we have a custom signal
 624            # handler in place that raises SystemExit with the catched
 625            # signal's code.
 626            if isinstance(e, PathDoesNotExistError):
 627                # We aren't interested in the traceback for this case
 628                pass
 629            elif not isinstance(e, SystemExit) or e.code != -2:
 630                self.logger.warning("Abnormal termination:\n{}".format(
 631                    traceback.format_exc()))
 632            raise
 633        finally:
 634            if not self.dry_run:
 635                # Stop the thread before doing anything else
 636                self._stop_checkpointer.set()
 637                self._checkpointer.join()
 638                if self._checkpoint_before_quit:
 639                    # Commit all pending blocks & one last _update()
 640                    self._local_collection.manifest_text()
 641                    self._update(final=True)
 642                    if save_collection:
 643                        self.save_collection()
 644            if self.use_cache:
 645                self._cache_file.close()
 646
 647    def _collection_trash_at(self):
 648        """
 649        Returns the trash date that the collection should use at save time.
 650        Takes into account absolute/relative trash_at values requested
 651        by the user.
 652        """
 653        if type(self._trash_at) == datetime.timedelta:
 654            # Get an absolute datetime for trash_at
 655            return datetime.datetime.utcnow() + self._trash_at
 656        return self._trash_at
 657
 658    def save_collection(self):
 659        if self.update:
 660            # Check if files should be updated on the remote collection.
 661            for fp in self._file_paths:
 662                remote_file = self._remote_collection.find(fp)
 663                if not remote_file:
 664                    # File don't exist on remote collection, copy it.
 665                    self._remote_collection.copy(fp, fp, self._local_collection)
 666                elif remote_file != self._local_collection.find(fp):
 667                    # A different file exist on remote collection, overwrite it.
 668                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
 669                else:
 670                    # The file already exist on remote collection, skip it.
 671                    pass
 672            self._remote_collection.save(num_retries=self.num_retries,
 673                                         trash_at=self._collection_trash_at())
 674        else:
 675            if len(self._local_collection) == 0:
 676                self.logger.warning("No files were uploaded, skipping collection creation.")
 677                return
 678            self._local_collection.save_new(
 679                name=self.name, owner_uuid=self.owner_uuid,
 680                ensure_unique_name=self.ensure_unique_name,
 681                num_retries=self.num_retries,
 682                trash_at=self._collection_trash_at())
 683
 684    def destroy_cache(self):
 685        if self.use_cache:
 686            try:
 687                os.unlink(self._cache_filename)
 688            except OSError as error:
 689                # That's what we wanted anyway.
 690                if error.errno != errno.ENOENT:
 691                    raise
 692            self._cache_file.close()
 693
 694    def _collection_size(self, collection):
 695        """
 696        Recursively get the total size of the collection
 697        """
 698        size = 0
 699        for item in listvalues(collection):
 700            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
 701                size += self._collection_size(item)
 702            else:
 703                size += item.size()
 704        return size
 705
 706    def _update_task(self):
 707        """
 708        Periodically called support task. File uploading is
 709        asynchronous so we poll status from the collection.
 710        """
 711        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
 712            self._update()
 713
 714    def _update(self, final=False):
 715        """
 716        Update cached manifest text and report progress.
 717        """
 718        if self._upload_started:
 719            with self._collection_lock:
 720                self.bytes_written = self._collection_size(self._local_collection)
 721                if self.use_cache:
 722                    if final:
 723                        manifest = self._local_collection.manifest_text()
 724                    else:
 725                        # Get the manifest text without comitting pending blocks
 726                        manifest = self._local_collection.manifest_text(strip=False,
 727                                                                        normalize=False,
 728                                                                        only_committed=True)
 729                    # Update cache
 730                    with self._state_lock:
 731                        self._state['manifest'] = manifest
 732            if self.use_cache:
 733                try:
 734                    self._save_state()
 735                except Exception as e:
 736                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
 737            # Keep remote collection's trash_at attribute synced when using relative expire dates
 738            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
 739                try:
 740                    self._api_client.collections().update(
 741                        uuid=self._remote_collection.manifest_locator(),
 742                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
 743                    ).execute(num_retries=self.num_retries)
 744                except Exception as e:
 745                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
 746        else:
 747            self.bytes_written = self.bytes_skipped
 748        # Call the reporter, if any
 749        self.report_progress()
 750
 751    def report_progress(self):
 752        if self.reporter is not None:
 753            self.reporter(self.bytes_written, self.bytes_expected)
 754
 755    def _write_stdin(self, filename):
 756        output = self._local_collection.open(filename, 'wb')
 757        self._write(sys.stdin.buffer, output)
 758        output.close()
 759
 760    def _check_file(self, source, filename):
 761        """
 762        Check if this file needs to be uploaded
 763        """
 764        # Ignore symlinks when requested
 765        if (not self.follow_links) and os.path.islink(source):
 766            return
 767        resume_offset = 0
 768        should_upload = False
 769        new_file_in_cache = False
 770        # Record file path for updating the remote collection before exiting
 771        self._file_paths.add(filename)
 772
 773        with self._state_lock:
 774            # If no previous cached data on this file, store it for an eventual
 775            # repeated run.
 776            if source not in self._state['files']:
 777                self._state['files'][source] = {
 778                    'mtime': os.path.getmtime(source),
 779                    'size' : os.path.getsize(source)
 780                }
 781                new_file_in_cache = True
 782            cached_file_data = self._state['files'][source]
 783
 784        # Check if file was already uploaded (at least partially)
 785        file_in_local_collection = self._local_collection.find(filename)
 786
 787        # If not resuming, upload the full file.
 788        if not self.resume:
 789            should_upload = True
 790        # New file detected from last run, upload it.
 791        elif new_file_in_cache:
 792            should_upload = True
 793        # Local file didn't change from last run.
 794        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
 795            if not file_in_local_collection:
 796                # File not uploaded yet, upload it completely
 797                should_upload = True
 798            elif file_in_local_collection.permission_expired():
 799                # Permission token expired, re-upload file. This will change whenever
 800                # we have a API for refreshing tokens.
 801                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
 802                should_upload = True
 803                self._local_collection.remove(filename)
 804            elif cached_file_data['size'] == file_in_local_collection.size():
 805                # File already there, skip it.
 806                self.bytes_skipped += cached_file_data['size']
 807            elif cached_file_data['size'] > file_in_local_collection.size():
 808                # File partially uploaded, resume!
 809                resume_offset = file_in_local_collection.size()
 810                self.bytes_skipped += resume_offset
 811                should_upload = True
 812            else:
 813                # Inconsistent cache, re-upload the file
 814                should_upload = True
 815                self._local_collection.remove(filename)
 816                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
 817        # Local file differs from cached data, re-upload it.
 818        else:
 819            if file_in_local_collection:
 820                self._local_collection.remove(filename)
 821            should_upload = True
 822
 823        if should_upload:
 824            try:
 825                self._files_to_upload.append((source, resume_offset, filename))
 826            except ArvPutUploadIsPending:
 827                # This could happen when running on dry-mode, close cache file to
 828                # avoid locking issues.
 829                self._cache_file.close()
 830                raise
 831
 832    def _upload_files(self):
 833        for source, resume_offset, filename in self._files_to_upload:
 834            with open(source, 'rb') as source_fd:
 835                with self._state_lock:
 836                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
 837                    self._state['files'][source]['size'] = os.path.getsize(source)
 838                if resume_offset > 0:
 839                    # Start upload where we left off
 840                    output = self._local_collection.open(filename, 'ab')
 841                    source_fd.seek(resume_offset)
 842                else:
 843                    # Start from scratch
 844                    output = self._local_collection.open(filename, 'wb')
 845                self._write(source_fd, output)
 846                output.close(flush=False)
 847
 848    def _write(self, source_fd, output):
 849        while True:
 850            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
 851            if not data:
 852                break
 853            output.write(data)
 854
 855    def _my_collection(self):
 856        return self._remote_collection if self.update else self._local_collection
 857
 858    def _get_cache_filepath(self):
 859        # Set up cache file name from input paths.
 860        md5 = hashlib.md5()
 861        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
 862        realpaths = sorted(os.path.realpath(path) for path in self.paths)
 863        md5.update(b'\0'.join([p.encode() for p in realpaths]))
 864        if self.filename:
 865            md5.update(self.filename.encode())
 866        cache_filename = md5.hexdigest()
 867        cache_filepath = os.path.join(
 868            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
 869            cache_filename)
 870        return cache_filepath
 871
 872    def _setup_state(self, update_collection):
 873        """
 874        Create a new cache file or load a previously existing one.
 875        """
 876        # Load an already existing collection for update
 877        if update_collection and re.match(arvados.util.collection_uuid_pattern,
 878                                          update_collection):
 879            try:
 880                self._remote_collection = arvados.collection.Collection(
 881                    update_collection,
 882                    api_client=self._api_client,
 883                    storage_classes_desired=self.storage_classes,
 884                    num_retries=self.num_retries)
 885            except arvados.errors.ApiError as error:
 886                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
 887            else:
 888                self.update = True
 889        elif update_collection:
 890            # Collection locator provided, but unknown format
 891            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 892
 893        if self.use_cache:
 894            cache_filepath = self._get_cache_filepath()
 895            if self.resume and os.path.exists(cache_filepath):
 896                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
 897                self._cache_file = open(cache_filepath, 'a+')
 898            else:
 899                # --no-resume means start with a empty cache file.
 900                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
 901                self._cache_file = open(cache_filepath, 'w+')
 902            self._cache_filename = self._cache_file.name
 903            self._lock_file(self._cache_file)
 904            self._cache_file.seek(0)
 905
 906        with self._state_lock:
 907            if self.use_cache:
 908                try:
 909                    self._state = json.load(self._cache_file)
 910                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
 911                        # Cache at least partially incomplete, set up new cache
 912                        self._state = copy.deepcopy(self.EMPTY_STATE)
 913                except ValueError:
 914                    # Cache file empty, set up new cache
 915                    self._state = copy.deepcopy(self.EMPTY_STATE)
 916            else:
 917                self.logger.info("No cache usage requested for this run.")
 918                # No cache file, set empty state
 919                self._state = copy.deepcopy(self.EMPTY_STATE)
 920            if not self._cached_manifest_valid():
 921                if not self.batch_mode:
 922                    raise ResumeCacheInvalidError()
 923                else:
 924                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
 925                    self.use_cache = False # Don't overwrite preexisting cache file.
 926                    self._state = copy.deepcopy(self.EMPTY_STATE)
 927            # Load the previous manifest so we can check if files were modified remotely.
 928            self._local_collection = arvados.collection.Collection(
 929                self._state['manifest'],
 930                replication_desired=self.replication_desired,
 931                storage_classes_desired=self.storage_classes,
 932                put_threads=self.put_threads,
 933                api_client=self._api_client,
 934                num_retries=self.num_retries)
 935
 936    def _cached_manifest_valid(self):
 937        """
 938        Validate the oldest non-expired block signature to check if cached manifest
 939        is usable: checking if the cached manifest was not created with a different
 940        arvados account.
 941        """
 942        if self._state.get('manifest', None) is None:
 943            # No cached manifest yet, all good.
 944            return True
 945        now = datetime.datetime.utcnow()
 946        oldest_exp = None
 947        oldest_loc = None
 948        block_found = False
 949        for m in keep_locator_pattern.finditer(self._state['manifest']):
 950            loc = m.group(0)
 951            try:
 952                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
 953            except IndexError:
 954                # Locator without signature
 955                continue
 956            block_found = True
 957            if exp > now and (oldest_exp is None or exp < oldest_exp):
 958                oldest_exp = exp
 959                oldest_loc = loc
 960        if not block_found:
 961            # No block signatures found => no invalid block signatures.
 962            return True
 963        if oldest_loc is None:
 964            # Locator signatures found, but all have expired.
 965            # Reset the cache and move on.
 966            self.logger.info('Cache expired, starting from scratch.')
 967            self._state['manifest'] = ''
 968            return True
 969        kc = arvados.KeepClient(api_client=self._api_client,
 970                                num_retries=self.num_retries)
 971        try:
 972            kc.head(oldest_loc)
 973        except arvados.errors.KeepRequestError:
 974            # Something is wrong, cached manifest is not valid.
 975            return False
 976        return True
 977
 978    def collection_file_paths(self, col, path_prefix='.'):
 979        """Return a list of file paths by recursively go through the entire collection `col`"""
 980        file_paths = []
 981        for name, item in listitems(col):
 982            if isinstance(item, arvados.arvfile.ArvadosFile):
 983                file_paths.append(os.path.join(path_prefix, name))
 984            elif isinstance(item, arvados.collection.Subcollection):
 985                new_prefix = os.path.join(path_prefix, name)
 986                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
 987        return file_paths
 988
 989    def _lock_file(self, fileobj):
 990        try:
 991            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
 992        except IOError:
 993            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 994
 995    def _save_state(self):
 996        """
 997        Atomically save current state into cache.
 998        """
 999        with self._state_lock:
1000            # We're not using copy.deepcopy() here because it's a lot slower
1001            # than json.dumps(), and we're already needing JSON format to be
1002            # saved on disk.
1003            state = json.dumps(self._state)
1004        try:
1005            new_cache = tempfile.NamedTemporaryFile(
1006                mode='w+',
1007                dir=os.path.dirname(self._cache_filename), delete=False)
1008            self._lock_file(new_cache)
1009            new_cache.write(state)
1010            new_cache.flush()
1011            os.fsync(new_cache)
1012            os.rename(new_cache.name, self._cache_filename)
1013        except (IOError, OSError, ResumeCacheConflict) as error:
1014            self.logger.error("There was a problem while saving the cache file: {}".format(error))
1015            try:
1016                os.unlink(new_cache_name)
1017            except NameError:  # mkstemp failed.
1018                pass
1019        else:
1020            self._cache_file.close()
1021            self._cache_file = new_cache
1022
1023    def collection_name(self):
1024        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1025
1026    def collection_trash_at(self):
1027        return self._my_collection().get_trash_at()
1028
1029    def manifest_locator(self):
1030        return self._my_collection().manifest_locator()
1031
1032    def portable_data_hash(self):
1033        pdh = self._my_collection().portable_data_hash()
1034        m = self._my_collection().stripped_manifest().encode()
1035        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1036        if pdh != local_pdh:
1037            self.logger.warning("\n".join([
1038                "arv-put: API server provided PDH differs from local manifest.",
1039                "         This should not happen; showing API server version."]))
1040        return pdh
1041
1042    def manifest_text(self, stream_name=".", strip=False, normalize=False):
1043        return self._my_collection().manifest_text(stream_name, strip, normalize)
1044
1045    def _datablocks_on_item(self, item):
1046        """
1047        Return a list of datablock locators, recursively navigating
1048        through subcollections
1049        """
1050        if isinstance(item, arvados.arvfile.ArvadosFile):
1051            if item.size() == 0:
1052                # Empty file locator
1053                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1054            else:
1055                locators = []
1056                for segment in item.segments():
1057                    loc = segment.locator
1058                    locators.append(loc)
1059                return locators
1060        elif isinstance(item, arvados.collection.Collection):
1061            l = [self._datablocks_on_item(x) for x in listvalues(item)]
1062            # Fast list flattener method taken from:
1063            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1064            return [loc for sublist in l for loc in sublist]
1065        else:
1066            return None
1067
1068    def data_locators(self):
1069        with self._collection_lock:
1070            # Make sure all datablocks are flushed before getting the locators
1071            self._my_collection().manifest_text()
1072            datablocks = self._datablocks_on_item(self._my_collection())
1073        return datablocks
1074
1075_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1076                                                            os.getpid())
1077
1078# Simulate glob.glob() matching behavior without the need to scan the filesystem
1079# Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1080# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1081# so instead we're using it on every path component.
1082def pathname_match(pathname, pattern):
1083    name = pathname.split(os.sep)
1084    # Fix patterns like 'some/subdir/' or 'some//subdir'
1085    pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1086    if len(name) != len(pat):
1087        return False
1088    for i in range(len(name)):
1089        if not fnmatch.fnmatch(name[i], pat[i]):
1090            return False
1091    return True
1092
1093def machine_progress(bytes_written, bytes_expected):
1094    return _machine_format.format(
1095        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1096
1097def human_progress(bytes_written, bytes_expected):
1098    if bytes_expected:
1099        return "\r{}M / {}M {:.1%} ".format(
1100            bytes_written >> 20, bytes_expected >> 20,
1101            float(bytes_written) / bytes_expected)
1102    else:
1103        return "\r{} ".format(bytes_written)
1104
1105def progress_writer(progress_func, outfile=sys.stderr):
1106    def write_progress(bytes_written, bytes_expected):
1107        outfile.write(progress_func(bytes_written, bytes_expected))
1108    return write_progress
1109
1110def desired_project_uuid(api_client, project_uuid, num_retries):
1111    if not project_uuid:
1112        query = api_client.users().current()
1113    elif arvados.util.user_uuid_pattern.match(project_uuid):
1114        query = api_client.users().get(uuid=project_uuid)
1115    elif arvados.util.group_uuid_pattern.match(project_uuid):
1116        query = api_client.groups().get(uuid=project_uuid)
1117    else:
1118        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1119    return query.execute(num_retries=num_retries)['uuid']
1120
1121def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1122         install_sig_handlers=True):
1123    global api_client
1124
1125    args = parse_arguments(arguments)
1126    logger = logging.getLogger('arvados.arv_put')
1127    if args.silent:
1128        logger.setLevel(logging.WARNING)
1129    else:
1130        logger.setLevel(logging.INFO)
1131    status = 0
1132
1133    request_id = arvados.util.new_request_id()
1134
1135    formatter = ArvPutLogFormatter(request_id)
1136    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1137
1138    if api_client is None:
1139        api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1140
1141    if install_sig_handlers:
1142        arv_cmd.install_signal_handlers()
1143
1144    # Trash arguments validation
1145    trash_at = None
1146    if args.trash_at is not None:
1147        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1148        # make sure the user provides a complete YYYY-MM-DD date.
1149        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1150            logger.error("--trash-at argument format invalid, use --help to see examples.")
1151            sys.exit(1)
1152        # Check if no time information was provided. In that case, assume end-of-day.
1153        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1154            args.trash_at += 'T23:59:59'
1155        try:
1156            trash_at = ciso8601.parse_datetime(args.trash_at)
1157        except:
1158            logger.error("--trash-at argument format invalid, use --help to see examples.")
1159            sys.exit(1)
1160        else:
1161            if trash_at.tzinfo is not None:
1162                # Timezone aware datetime provided.
1163                utcoffset = -trash_at.utcoffset()
1164            else:
1165                # Timezone naive datetime provided. Assume is local.
1166                if time.daylight:
1167                    utcoffset = datetime.timedelta(seconds=time.altzone)
1168                else:
1169                    utcoffset = datetime.timedelta(seconds=time.timezone)
1170            # Convert to UTC timezone naive datetime.
1171            trash_at = trash_at.replace(tzinfo=None) + utcoffset
1172
1173        if trash_at <= datetime.datetime.utcnow():
1174            logger.error("--trash-at argument must be set in the future")
1175            sys.exit(1)
1176    if args.trash_after is not None:
1177        if args.trash_after < 1:
1178            logger.error("--trash-after argument must be >= 1")
1179            sys.exit(1)
1180        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1181
1182    # Determine the name to use
1183    if args.name:
1184        if args.stream or args.raw:
1185            logger.error("Cannot use --name with --stream or --raw")
1186            sys.exit(1)
1187        elif args.update_collection:
1188            logger.error("Cannot use --name with --update-collection")
1189            sys.exit(1)
1190        collection_name = args.name
1191    else:
1192        collection_name = "Saved at {} by {}@{}".format(
1193            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1194            pwd.getpwuid(os.getuid()).pw_name,
1195            socket.gethostname())
1196
1197    if args.project_uuid and (args.stream or args.raw):
1198        logger.error("Cannot use --project-uuid with --stream or --raw")
1199        sys.exit(1)
1200
1201    # Determine the parent project
1202    try:
1203        project_uuid = desired_project_uuid(api_client, args.project_uuid,
1204                                            args.retries)
1205    except (apiclient_errors.Error, ValueError) as error:
1206        logger.error(error)
1207        sys.exit(1)
1208
1209    if args.progress:
1210        reporter = progress_writer(human_progress)
1211    elif args.batch_progress:
1212        reporter = progress_writer(machine_progress)
1213    else:
1214        reporter = None
1215
1216    #  Split storage-classes argument
1217    storage_classes = None
1218    if args.storage_classes:
1219        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1220
1221    # Setup exclude regex from all the --exclude arguments provided
1222    name_patterns = []
1223    exclude_paths = []
1224    exclude_names = None
1225    if len(args.exclude) > 0:
1226        # We're supporting 2 kinds of exclusion patterns:
1227        # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1228        #                            the name, wherever the file is on the tree)
1229        # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1230        #                            entire path, and should be relative to
1231        #                            any input dir argument)
1232        # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1233        #                            placed directly underneath the input dir)
1234        for p in args.exclude:
1235            # Only relative paths patterns allowed
1236            if p.startswith(os.sep):
1237                logger.error("Cannot use absolute paths with --exclude")
1238                sys.exit(1)
1239            if os.path.dirname(p):
1240                # We don't support of path patterns with '..'
1241                p_parts = p.split(os.sep)
1242                if '..' in p_parts:
1243                    logger.error(
1244                        "Cannot use path patterns that include or '..'")
1245                    sys.exit(1)
1246                # Path search pattern
1247                exclude_paths.append(p)
1248            else:
1249                # Name-only search pattern
1250                name_patterns.append(p)
1251        # For name only matching, we can combine all patterns into a single
1252        # regexp, for better performance.
1253        exclude_names = re.compile('|'.join(
1254            [fnmatch.translate(p) for p in name_patterns]
1255        )) if len(name_patterns) > 0 else None
1256        # Show the user the patterns to be used, just in case they weren't
1257        # specified inside quotes and got changed by the shell expansion.
1258        logger.info("Exclude patterns: {}".format(args.exclude))
1259
1260    # If this is used by a human, and there's at least one directory to be
1261    # uploaded, the expected bytes calculation can take a moment.
1262    if args.progress and any([os.path.isdir(f) for f in args.paths]):
1263        logger.info("Calculating upload size, this could take some time...")
1264    try:
1265        writer = ArvPutUploadJob(paths = args.paths,
1266                                 resume = args.resume,
1267                                 use_cache = args.use_cache,
1268                                 batch_mode= args.batch,
1269                                 filename = args.filename,
1270                                 reporter = reporter,
1271                                 api_client = api_client,
1272                                 num_retries = args.retries,
1273                                 replication_desired = args.replication,
1274                                 put_threads = args.threads,
1275                                 name = collection_name,
1276                                 owner_uuid = project_uuid,
1277                                 ensure_unique_name = True,
1278                                 update_collection = args.update_collection,
1279                                 storage_classes=storage_classes,
1280                                 logger=logger,
1281                                 dry_run=args.dry_run,
1282                                 follow_links=args.follow_links,
1283                                 exclude_paths=exclude_paths,
1284                                 exclude_names=exclude_names,
1285                                 trash_at=trash_at)
1286    except ResumeCacheConflict:
1287        logger.error("\n".join([
1288            "arv-put: Another process is already uploading this data.",
1289            "         Use --no-cache if this is really what you want."]))
1290        sys.exit(1)
1291    except ResumeCacheInvalidError:
1292        logger.error("\n".join([
1293            "arv-put: Resume cache contains invalid signature: it may have expired",
1294            "         or been created with another Arvados user's credentials.",
1295            "         Switch user or use one of the following options to restart upload:",
1296            "         --no-resume to start a new resume cache.",
1297            "         --no-cache to disable resume cache.",
1298            "         --batch to ignore the resume cache if invalid."]))
1299        sys.exit(1)
1300    except (CollectionUpdateError, PathDoesNotExistError) as error:
1301        logger.error("\n".join([
1302            "arv-put: %s" % str(error)]))
1303        sys.exit(1)
1304    except ArvPutUploadIsPending:
1305        # Dry run check successful, return proper exit code.
1306        sys.exit(2)
1307    except ArvPutUploadNotPending:
1308        # No files pending for upload
1309        sys.exit(0)
1310
1311    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1312        logger.warning("\n".join([
1313            "arv-put: Resuming previous upload from last checkpoint.",
1314            "         Use the --no-resume option to start over."]))
1315
1316    if not args.dry_run:
1317        writer.report_progress()
1318    output = None
1319    try:
1320        writer.start(save_collection=not(args.stream or args.raw))
1321    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1322        logger.error("\n".join([
1323            "arv-put: %s" % str(error)]))
1324        sys.exit(1)
1325
1326    if args.progress:  # Print newline to split stderr from stdout for humans.
1327        logger.info("\n")
1328
1329    if args.stream:
1330        if args.normalize:
1331            output = writer.manifest_text(normalize=True)
1332        else:
1333            output = writer.manifest_text()
1334    elif args.raw:
1335        output = ','.join(writer.data_locators())
1336    elif writer.manifest_locator() is not None:
1337        try:
1338            expiration_notice = ""
1339            if writer.collection_trash_at() is not None:
1340                # Get the local timezone-naive version, and log it with timezone information.
1341                if time.daylight:
1342                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1343                else:
1344                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1345                expiration_notice = ". It will expire on {} {}.".format(
1346                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1347            if args.update_collection:
1348                logger.info(u"Collection updated: '{}'{}".format(
1349                    writer.collection_name(), expiration_notice))
1350            else:
1351                logger.info(u"Collection saved as '{}'{}".format(
1352                    writer.collection_name(), expiration_notice))
1353            if args.portable_data_hash:
1354                output = writer.portable_data_hash()
1355            else:
1356                output = writer.manifest_locator()
1357        except apiclient_errors.Error as error:
1358            logger.error(
1359                "arv-put: Error creating Collection on project: {}.".format(
1360                    error))
1361            status = 1
1362    else:
1363        status = 1
1364
1365    # Print the locator (uuid) of the new collection.
1366    if output is None:
1367        status = status or 1
1368    elif not args.silent:
1369        stdout.write(output)
1370        if not output.endswith('\n'):
1371            stdout.write('\n')
1372
1373    if install_sig_handlers:
1374        arv_cmd.restore_signal_handlers()
1375
1376    if status != 0:
1377        sys.exit(status)
1378
1379    # Success!
1380    return output
1381
1382
1383if __name__ == '__main__':
1384    main()
api_client = None
upload_opts = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description=None, formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=False)
run_opts = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description=None, formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=False)
arg_parser = ArgumentParser(prog='pysdk_pdoc.py', usage=None, description='Copy data from the local filesystem to Keep.', formatter_class=<class 'argparse.HelpFormatter'>, conflict_handler='error', add_help=True)
def parse_arguments(arguments):
262def parse_arguments(arguments):
263    args = arg_parser.parse_args(arguments)
264
265    if len(args.paths) == 0:
266        args.paths = ['-']
267
268    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
269
270    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
271        arg_parser.error("""
272    --filename argument cannot be used when storing a directory or
273    multiple files.
274    """)
275
276    # Turn on --progress by default if stderr is a tty.
277    if (not (args.batch_progress or args.no_progress or args.silent)
278        and os.isatty(sys.stderr.fileno())):
279        args.progress = True
280
281    # Turn off --resume (default) if --no-cache is used.
282    if not args.use_cache:
283        args.resume = False
284
285    if args.paths == ['-']:
286        if args.update_collection:
287            arg_parser.error("""
288    --update-collection cannot be used when reading from stdin.
289    """)
290        args.resume = False
291        args.use_cache = False
292        if not args.filename:
293            args.filename = 'stdin'
294
295    # Remove possible duplicated patterns
296    if len(args.exclude) > 0:
297        args.exclude = list(set(args.exclude))
298
299    return args
class PathDoesNotExistError(builtins.Exception):
302class PathDoesNotExistError(Exception):
303    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class CollectionUpdateError(builtins.Exception):
306class CollectionUpdateError(Exception):
307    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ResumeCacheConflict(builtins.Exception):
310class ResumeCacheConflict(Exception):
311    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ResumeCacheInvalidError(builtins.Exception):
314class ResumeCacheInvalidError(Exception):
315    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvPutArgumentConflict(builtins.Exception):
317class ArvPutArgumentConflict(Exception):
318    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvPutUploadIsPending(builtins.Exception):
321class ArvPutUploadIsPending(Exception):
322    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvPutUploadNotPending(builtins.Exception):
325class ArvPutUploadNotPending(Exception):
326    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class FileUploadList(builtins.list):
329class FileUploadList(list):
330    def __init__(self, dry_run=False):
331        list.__init__(self)
332        self.dry_run = dry_run
333
334    def append(self, other):
335        if self.dry_run:
336            raise ArvPutUploadIsPending()
337        super(FileUploadList, self).append(other)

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

FileUploadList(dry_run=False)
330    def __init__(self, dry_run=False):
331        list.__init__(self)
332        self.dry_run = dry_run
dry_run
def append(self, other):
334    def append(self, other):
335        if self.dry_run:
336            raise ArvPutUploadIsPending()
337        super(FileUploadList, self).append(other)

Append object to the end of the list.

Inherited Members
builtins.list
clear
copy
insert
extend
pop
remove
index
count
reverse
sort
class ArvPutLogFormatter(logging.Formatter):
341class ArvPutLogFormatter(logging.Formatter):
342    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
343    err_fmtr = None
344    request_id_informed = False
345
346    def __init__(self, request_id):
347        self.err_fmtr = logging.Formatter(
348            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
349            arvados.log_date_format)
350
351    def format(self, record):
352        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
353            self.request_id_informed = True
354            return self.err_fmtr.format(record)
355        return self.std_fmtr.format(record)

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, “%(message)s”, “{message}”, or “${message}”, is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user’s message and arguments are pre- formatted into a LogRecord’s message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG”, “INFO”, “WARNING”, “ERROR”, “CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

ArvPutLogFormatter(request_id)
346    def __init__(self, request_id):
347        self.err_fmtr = logging.Formatter(
348            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
349            arvados.log_date_format)

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

std_fmtr = <logging.Formatter object>
err_fmtr = None
request_id_informed = False
def format(self, record):
351    def format(self, record):
352        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
353            self.request_id_informed = True
354            return self.err_fmtr.format(record)
355        return self.std_fmtr.format(record)

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Inherited Members
logging.Formatter
converter
datefmt
default_time_format
default_msec_format
formatTime
formatException
usesTime
formatMessage
formatStack
class ResumeCache:
358class ResumeCache(object):
359    CACHE_DIR = '.cache/arvados/arv-put'
360
361    def __init__(self, file_spec):
362        self.cache_file = open(file_spec, 'a+')
363        self._lock_file(self.cache_file)
364        self.filename = self.cache_file.name
365
366    @classmethod
367    def make_path(cls, args):
368        md5 = hashlib.md5()
369        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
370        realpaths = sorted(os.path.realpath(path) for path in args.paths)
371        md5.update(b'\0'.join([p.encode() for p in realpaths]))
372        if any(os.path.isdir(path) for path in realpaths):
373            md5.update(b'-1')
374        elif args.filename:
375            md5.update(args.filename.encode())
376        return os.path.join(
377            arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
378            md5.hexdigest())
379
380    def _lock_file(self, fileobj):
381        try:
382            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
383        except IOError:
384            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
385
386    def load(self):
387        self.cache_file.seek(0)
388        return json.load(self.cache_file)
389
390    def check_cache(self, api_client=None, num_retries=0):
391        try:
392            state = self.load()
393            locator = None
394            try:
395                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
396                    locator = state["_finished_streams"][0][1][0]
397                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
398                    locator = state["_current_stream_locators"][0]
399                if locator is not None:
400                    kc = arvados.keep.KeepClient(api_client=api_client)
401                    kc.head(locator, num_retries=num_retries)
402            except Exception as e:
403                self.restart()
404        except (ValueError):
405            pass
406
407    def save(self, data):
408        try:
409            new_cache_fd, new_cache_name = tempfile.mkstemp(
410                dir=os.path.dirname(self.filename))
411            self._lock_file(new_cache_fd)
412            new_cache = os.fdopen(new_cache_fd, 'r+')
413            json.dump(data, new_cache)
414            os.rename(new_cache_name, self.filename)
415        except (IOError, OSError, ResumeCacheConflict):
416            try:
417                os.unlink(new_cache_name)
418            except NameError:  # mkstemp failed.
419                pass
420        else:
421            self.cache_file.close()
422            self.cache_file = new_cache
423
424    def close(self):
425        self.cache_file.close()
426
427    def destroy(self):
428        try:
429            os.unlink(self.filename)
430        except OSError as error:
431            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
432                raise
433        self.close()
434
435    def restart(self):
436        self.destroy()
437        self.__init__(self.filename)
ResumeCache(file_spec)
361    def __init__(self, file_spec):
362        self.cache_file = open(file_spec, 'a+')
363        self._lock_file(self.cache_file)
364        self.filename = self.cache_file.name
CACHE_DIR = '.cache/arvados/arv-put'
cache_file
filename
@classmethod
def make_path(cls, args):
366    @classmethod
367    def make_path(cls, args):
368        md5 = hashlib.md5()
369        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
370        realpaths = sorted(os.path.realpath(path) for path in args.paths)
371        md5.update(b'\0'.join([p.encode() for p in realpaths]))
372        if any(os.path.isdir(path) for path in realpaths):
373            md5.update(b'-1')
374        elif args.filename:
375            md5.update(args.filename.encode())
376        return os.path.join(
377            arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
378            md5.hexdigest())
def load(self):
386    def load(self):
387        self.cache_file.seek(0)
388        return json.load(self.cache_file)
def check_cache(self, api_client=None, num_retries=0):
390    def check_cache(self, api_client=None, num_retries=0):
391        try:
392            state = self.load()
393            locator = None
394            try:
395                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
396                    locator = state["_finished_streams"][0][1][0]
397                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
398                    locator = state["_current_stream_locators"][0]
399                if locator is not None:
400                    kc = arvados.keep.KeepClient(api_client=api_client)
401                    kc.head(locator, num_retries=num_retries)
402            except Exception as e:
403                self.restart()
404        except (ValueError):
405            pass
def save(self, data):
407    def save(self, data):
408        try:
409            new_cache_fd, new_cache_name = tempfile.mkstemp(
410                dir=os.path.dirname(self.filename))
411            self._lock_file(new_cache_fd)
412            new_cache = os.fdopen(new_cache_fd, 'r+')
413            json.dump(data, new_cache)
414            os.rename(new_cache_name, self.filename)
415        except (IOError, OSError, ResumeCacheConflict):
416            try:
417                os.unlink(new_cache_name)
418            except NameError:  # mkstemp failed.
419                pass
420        else:
421            self.cache_file.close()
422            self.cache_file = new_cache
def close(self):
424    def close(self):
425        self.cache_file.close()
def destroy(self):
427    def destroy(self):
428        try:
429            os.unlink(self.filename)
430        except OSError as error:
431            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
432                raise
433        self.close()
def restart(self):
435    def restart(self):
436        self.destroy()
437        self.__init__(self.filename)
class ArvPutUploadJob:
 440class ArvPutUploadJob(object):
 441    CACHE_DIR = '.cache/arvados/arv-put'
 442    EMPTY_STATE = {
 443        'manifest' : None, # Last saved manifest checkpoint
 444        'files' : {} # Previous run file list: {path : {size, mtime}}
 445    }
 446
 447    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
 448                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
 449                 ensure_unique_name=False, num_retries=None,
 450                 put_threads=None, replication_desired=None, filename=None,
 451                 update_time=60.0, update_collection=None, storage_classes=None,
 452                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
 453                 follow_links=True, exclude_paths=[], exclude_names=None,
 454                 trash_at=None):
 455        self.paths = paths
 456        self.resume = resume
 457        self.use_cache = use_cache
 458        self.batch_mode = batch_mode
 459        self.update = False
 460        self.reporter = reporter
 461        # This will set to 0 before start counting, if no special files are going
 462        # to be read.
 463        self.bytes_expected = None
 464        self.bytes_written = 0
 465        self.bytes_skipped = 0
 466        self.name = name
 467        self.owner_uuid = owner_uuid
 468        self.ensure_unique_name = ensure_unique_name
 469        self.num_retries = num_retries
 470        self.replication_desired = replication_desired
 471        self.put_threads = put_threads
 472        self.filename = filename
 473        self.storage_classes = storage_classes
 474        self._api_client = api_client
 475        self._state_lock = threading.Lock()
 476        self._state = None # Previous run state (file list & manifest)
 477        self._current_files = [] # Current run file list
 478        self._cache_file = None
 479        self._collection_lock = threading.Lock()
 480        self._remote_collection = None # Collection being updated (if asked)
 481        self._local_collection = None # Collection from previous run manifest
 482        self._file_paths = set() # Files to be updated in remote collection
 483        self._stop_checkpointer = threading.Event()
 484        self._checkpointer = threading.Thread(target=self._update_task)
 485        self._checkpointer.daemon = True
 486        self._update_task_time = update_time  # How many seconds wait between update runs
 487        self._files_to_upload = FileUploadList(dry_run=dry_run)
 488        self._upload_started = False
 489        self.logger = logger
 490        self.dry_run = dry_run
 491        self._checkpoint_before_quit = True
 492        self.follow_links = follow_links
 493        self.exclude_paths = exclude_paths
 494        self.exclude_names = exclude_names
 495        self._trash_at = trash_at
 496
 497        if self._trash_at is not None:
 498            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
 499                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
 500            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
 501                raise TypeError('provided trash_at datetime should be timezone-naive')
 502
 503        if not self.use_cache and self.resume:
 504            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
 505
 506        # Check for obvious dry-run responses
 507        if self.dry_run and (not self.use_cache or not self.resume):
 508            raise ArvPutUploadIsPending()
 509
 510        # Load cached data if any and if needed
 511        self._setup_state(update_collection)
 512
 513        # Build the upload file list, excluding requested files and counting the
 514        # bytes expected to be uploaded.
 515        self._build_upload_list()
 516
 517    def _build_upload_list(self):
 518        """
 519        Scan the requested paths to count file sizes, excluding requested files
 520        and dirs and building the upload file list.
 521        """
 522        # If there aren't special files to be read, reset total bytes count to zero
 523        # to start counting.
 524        if not any([p for p in self.paths
 525                    if not (os.path.isfile(p) or os.path.isdir(p))]):
 526            self.bytes_expected = 0
 527
 528        for path in self.paths:
 529            # Test for stdin first, in case some file named '-' exist
 530            if path == '-':
 531                if self.dry_run:
 532                    raise ArvPutUploadIsPending()
 533                self._write_stdin(self.filename or 'stdin')
 534            elif not os.path.exists(path):
 535                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
 536            elif (not self.follow_links) and os.path.islink(path):
 537                self.logger.warning("Skipping symlink '{}'".format(path))
 538                continue
 539            elif os.path.isdir(path):
 540                # Use absolute paths on cache index so CWD doesn't interfere
 541                # with the caching logic.
 542                orig_path = path
 543                path = os.path.abspath(path)
 544                if orig_path[-1:] == os.sep:
 545                    # When passing a directory reference with a trailing slash,
 546                    # its contents should be uploaded directly to the
 547                    # collection's root.
 548                    prefixdir = path
 549                else:
 550                    # When passing a directory reference with no trailing slash,
 551                    # upload the directory to the collection's root.
 552                    prefixdir = os.path.dirname(path)
 553                prefixdir += os.sep
 554                for root, dirs, files in os.walk(path,
 555                                                 followlinks=self.follow_links):
 556                    root_relpath = os.path.relpath(root, path)
 557                    if root_relpath == '.':
 558                        root_relpath = ''
 559                    # Exclude files/dirs by full path matching pattern
 560                    if self.exclude_paths:
 561                        dirs[:] = [d for d in dirs
 562                                   if not any(pathname_match(
 563                                           os.path.join(root_relpath, d), pat)
 564                                              for pat in self.exclude_paths)]
 565                        files = [f for f in files
 566                                 if not any(pathname_match(
 567                                         os.path.join(root_relpath, f), pat)
 568                                            for pat in self.exclude_paths)]
 569                    # Exclude files/dirs by name matching pattern
 570                    if self.exclude_names is not None:
 571                        dirs[:] = [d for d in dirs
 572                                   if not self.exclude_names.match(d)]
 573                        files = [f for f in files
 574                                 if not self.exclude_names.match(f)]
 575                    # Make os.walk()'s dir traversing order deterministic
 576                    dirs.sort()
 577                    files.sort()
 578                    for f in files:
 579                        filepath = os.path.join(root, f)
 580                        if not os.path.isfile(filepath):
 581                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
 582                            continue
 583                        # Add its size to the total bytes count (if applicable)
 584                        if self.follow_links or (not os.path.islink(filepath)):
 585                            if self.bytes_expected is not None:
 586                                self.bytes_expected += os.path.getsize(filepath)
 587                        self._check_file(filepath,
 588                                         os.path.join(root[len(prefixdir):], f))
 589            else:
 590                filepath = os.path.abspath(path)
 591                # Add its size to the total bytes count (if applicable)
 592                if self.follow_links or (not os.path.islink(filepath)):
 593                    if self.bytes_expected is not None:
 594                        self.bytes_expected += os.path.getsize(filepath)
 595                self._check_file(filepath,
 596                                 self.filename or os.path.basename(path))
 597        # If dry-mode is on, and got up to this point, then we should notify that
 598        # there aren't any file to upload.
 599        if self.dry_run:
 600            raise ArvPutUploadNotPending()
 601        # Remove local_collection's files that don't exist locally anymore, so the
 602        # bytes_written count is correct.
 603        for f in self.collection_file_paths(self._local_collection,
 604                                            path_prefix=""):
 605            if f != 'stdin' and f != self.filename and not f in self._file_paths:
 606                self._local_collection.remove(f)
 607
 608    def start(self, save_collection):
 609        """
 610        Start supporting thread & file uploading
 611        """
 612        self._checkpointer.start()
 613        try:
 614            # Update bytes_written from current local collection and
 615            # report initial progress.
 616            self._update()
 617            # Actual file upload
 618            self._upload_started = True # Used by the update thread to start checkpointing
 619            self._upload_files()
 620        except (SystemExit, Exception) as e:
 621            self._checkpoint_before_quit = False
 622            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
 623            # Note: We're expecting SystemExit instead of
 624            # KeyboardInterrupt because we have a custom signal
 625            # handler in place that raises SystemExit with the catched
 626            # signal's code.
 627            if isinstance(e, PathDoesNotExistError):
 628                # We aren't interested in the traceback for this case
 629                pass
 630            elif not isinstance(e, SystemExit) or e.code != -2:
 631                self.logger.warning("Abnormal termination:\n{}".format(
 632                    traceback.format_exc()))
 633            raise
 634        finally:
 635            if not self.dry_run:
 636                # Stop the thread before doing anything else
 637                self._stop_checkpointer.set()
 638                self._checkpointer.join()
 639                if self._checkpoint_before_quit:
 640                    # Commit all pending blocks & one last _update()
 641                    self._local_collection.manifest_text()
 642                    self._update(final=True)
 643                    if save_collection:
 644                        self.save_collection()
 645            if self.use_cache:
 646                self._cache_file.close()
 647
 648    def _collection_trash_at(self):
 649        """
 650        Returns the trash date that the collection should use at save time.
 651        Takes into account absolute/relative trash_at values requested
 652        by the user.
 653        """
 654        if type(self._trash_at) == datetime.timedelta:
 655            # Get an absolute datetime for trash_at
 656            return datetime.datetime.utcnow() + self._trash_at
 657        return self._trash_at
 658
 659    def save_collection(self):
 660        if self.update:
 661            # Check if files should be updated on the remote collection.
 662            for fp in self._file_paths:
 663                remote_file = self._remote_collection.find(fp)
 664                if not remote_file:
 665                    # File don't exist on remote collection, copy it.
 666                    self._remote_collection.copy(fp, fp, self._local_collection)
 667                elif remote_file != self._local_collection.find(fp):
 668                    # A different file exist on remote collection, overwrite it.
 669                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
 670                else:
 671                    # The file already exist on remote collection, skip it.
 672                    pass
 673            self._remote_collection.save(num_retries=self.num_retries,
 674                                         trash_at=self._collection_trash_at())
 675        else:
 676            if len(self._local_collection) == 0:
 677                self.logger.warning("No files were uploaded, skipping collection creation.")
 678                return
 679            self._local_collection.save_new(
 680                name=self.name, owner_uuid=self.owner_uuid,
 681                ensure_unique_name=self.ensure_unique_name,
 682                num_retries=self.num_retries,
 683                trash_at=self._collection_trash_at())
 684
 685    def destroy_cache(self):
 686        if self.use_cache:
 687            try:
 688                os.unlink(self._cache_filename)
 689            except OSError as error:
 690                # That's what we wanted anyway.
 691                if error.errno != errno.ENOENT:
 692                    raise
 693            self._cache_file.close()
 694
 695    def _collection_size(self, collection):
 696        """
 697        Recursively get the total size of the collection
 698        """
 699        size = 0
 700        for item in listvalues(collection):
 701            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
 702                size += self._collection_size(item)
 703            else:
 704                size += item.size()
 705        return size
 706
 707    def _update_task(self):
 708        """
 709        Periodically called support task. File uploading is
 710        asynchronous so we poll status from the collection.
 711        """
 712        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
 713            self._update()
 714
 715    def _update(self, final=False):
 716        """
 717        Update cached manifest text and report progress.
 718        """
 719        if self._upload_started:
 720            with self._collection_lock:
 721                self.bytes_written = self._collection_size(self._local_collection)
 722                if self.use_cache:
 723                    if final:
 724                        manifest = self._local_collection.manifest_text()
 725                    else:
 726                        # Get the manifest text without comitting pending blocks
 727                        manifest = self._local_collection.manifest_text(strip=False,
 728                                                                        normalize=False,
 729                                                                        only_committed=True)
 730                    # Update cache
 731                    with self._state_lock:
 732                        self._state['manifest'] = manifest
 733            if self.use_cache:
 734                try:
 735                    self._save_state()
 736                except Exception as e:
 737                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
 738            # Keep remote collection's trash_at attribute synced when using relative expire dates
 739            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
 740                try:
 741                    self._api_client.collections().update(
 742                        uuid=self._remote_collection.manifest_locator(),
 743                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
 744                    ).execute(num_retries=self.num_retries)
 745                except Exception as e:
 746                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
 747        else:
 748            self.bytes_written = self.bytes_skipped
 749        # Call the reporter, if any
 750        self.report_progress()
 751
 752    def report_progress(self):
 753        if self.reporter is not None:
 754            self.reporter(self.bytes_written, self.bytes_expected)
 755
 756    def _write_stdin(self, filename):
 757        output = self._local_collection.open(filename, 'wb')
 758        self._write(sys.stdin.buffer, output)
 759        output.close()
 760
 761    def _check_file(self, source, filename):
 762        """
 763        Check if this file needs to be uploaded
 764        """
 765        # Ignore symlinks when requested
 766        if (not self.follow_links) and os.path.islink(source):
 767            return
 768        resume_offset = 0
 769        should_upload = False
 770        new_file_in_cache = False
 771        # Record file path for updating the remote collection before exiting
 772        self._file_paths.add(filename)
 773
 774        with self._state_lock:
 775            # If no previous cached data on this file, store it for an eventual
 776            # repeated run.
 777            if source not in self._state['files']:
 778                self._state['files'][source] = {
 779                    'mtime': os.path.getmtime(source),
 780                    'size' : os.path.getsize(source)
 781                }
 782                new_file_in_cache = True
 783            cached_file_data = self._state['files'][source]
 784
 785        # Check if file was already uploaded (at least partially)
 786        file_in_local_collection = self._local_collection.find(filename)
 787
 788        # If not resuming, upload the full file.
 789        if not self.resume:
 790            should_upload = True
 791        # New file detected from last run, upload it.
 792        elif new_file_in_cache:
 793            should_upload = True
 794        # Local file didn't change from last run.
 795        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
 796            if not file_in_local_collection:
 797                # File not uploaded yet, upload it completely
 798                should_upload = True
 799            elif file_in_local_collection.permission_expired():
 800                # Permission token expired, re-upload file. This will change whenever
 801                # we have a API for refreshing tokens.
 802                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
 803                should_upload = True
 804                self._local_collection.remove(filename)
 805            elif cached_file_data['size'] == file_in_local_collection.size():
 806                # File already there, skip it.
 807                self.bytes_skipped += cached_file_data['size']
 808            elif cached_file_data['size'] > file_in_local_collection.size():
 809                # File partially uploaded, resume!
 810                resume_offset = file_in_local_collection.size()
 811                self.bytes_skipped += resume_offset
 812                should_upload = True
 813            else:
 814                # Inconsistent cache, re-upload the file
 815                should_upload = True
 816                self._local_collection.remove(filename)
 817                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
 818        # Local file differs from cached data, re-upload it.
 819        else:
 820            if file_in_local_collection:
 821                self._local_collection.remove(filename)
 822            should_upload = True
 823
 824        if should_upload:
 825            try:
 826                self._files_to_upload.append((source, resume_offset, filename))
 827            except ArvPutUploadIsPending:
 828                # This could happen when running on dry-mode, close cache file to
 829                # avoid locking issues.
 830                self._cache_file.close()
 831                raise
 832
 833    def _upload_files(self):
 834        for source, resume_offset, filename in self._files_to_upload:
 835            with open(source, 'rb') as source_fd:
 836                with self._state_lock:
 837                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
 838                    self._state['files'][source]['size'] = os.path.getsize(source)
 839                if resume_offset > 0:
 840                    # Start upload where we left off
 841                    output = self._local_collection.open(filename, 'ab')
 842                    source_fd.seek(resume_offset)
 843                else:
 844                    # Start from scratch
 845                    output = self._local_collection.open(filename, 'wb')
 846                self._write(source_fd, output)
 847                output.close(flush=False)
 848
 849    def _write(self, source_fd, output):
 850        while True:
 851            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
 852            if not data:
 853                break
 854            output.write(data)
 855
 856    def _my_collection(self):
 857        return self._remote_collection if self.update else self._local_collection
 858
 859    def _get_cache_filepath(self):
 860        # Set up cache file name from input paths.
 861        md5 = hashlib.md5()
 862        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
 863        realpaths = sorted(os.path.realpath(path) for path in self.paths)
 864        md5.update(b'\0'.join([p.encode() for p in realpaths]))
 865        if self.filename:
 866            md5.update(self.filename.encode())
 867        cache_filename = md5.hexdigest()
 868        cache_filepath = os.path.join(
 869            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
 870            cache_filename)
 871        return cache_filepath
 872
 873    def _setup_state(self, update_collection):
 874        """
 875        Create a new cache file or load a previously existing one.
 876        """
 877        # Load an already existing collection for update
 878        if update_collection and re.match(arvados.util.collection_uuid_pattern,
 879                                          update_collection):
 880            try:
 881                self._remote_collection = arvados.collection.Collection(
 882                    update_collection,
 883                    api_client=self._api_client,
 884                    storage_classes_desired=self.storage_classes,
 885                    num_retries=self.num_retries)
 886            except arvados.errors.ApiError as error:
 887                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
 888            else:
 889                self.update = True
 890        elif update_collection:
 891            # Collection locator provided, but unknown format
 892            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 893
 894        if self.use_cache:
 895            cache_filepath = self._get_cache_filepath()
 896            if self.resume and os.path.exists(cache_filepath):
 897                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
 898                self._cache_file = open(cache_filepath, 'a+')
 899            else:
 900                # --no-resume means start with a empty cache file.
 901                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
 902                self._cache_file = open(cache_filepath, 'w+')
 903            self._cache_filename = self._cache_file.name
 904            self._lock_file(self._cache_file)
 905            self._cache_file.seek(0)
 906
 907        with self._state_lock:
 908            if self.use_cache:
 909                try:
 910                    self._state = json.load(self._cache_file)
 911                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
 912                        # Cache at least partially incomplete, set up new cache
 913                        self._state = copy.deepcopy(self.EMPTY_STATE)
 914                except ValueError:
 915                    # Cache file empty, set up new cache
 916                    self._state = copy.deepcopy(self.EMPTY_STATE)
 917            else:
 918                self.logger.info("No cache usage requested for this run.")
 919                # No cache file, set empty state
 920                self._state = copy.deepcopy(self.EMPTY_STATE)
 921            if not self._cached_manifest_valid():
 922                if not self.batch_mode:
 923                    raise ResumeCacheInvalidError()
 924                else:
 925                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
 926                    self.use_cache = False # Don't overwrite preexisting cache file.
 927                    self._state = copy.deepcopy(self.EMPTY_STATE)
 928            # Load the previous manifest so we can check if files were modified remotely.
 929            self._local_collection = arvados.collection.Collection(
 930                self._state['manifest'],
 931                replication_desired=self.replication_desired,
 932                storage_classes_desired=self.storage_classes,
 933                put_threads=self.put_threads,
 934                api_client=self._api_client,
 935                num_retries=self.num_retries)
 936
 937    def _cached_manifest_valid(self):
 938        """
 939        Validate the oldest non-expired block signature to check if cached manifest
 940        is usable: checking if the cached manifest was not created with a different
 941        arvados account.
 942        """
 943        if self._state.get('manifest', None) is None:
 944            # No cached manifest yet, all good.
 945            return True
 946        now = datetime.datetime.utcnow()
 947        oldest_exp = None
 948        oldest_loc = None
 949        block_found = False
 950        for m in keep_locator_pattern.finditer(self._state['manifest']):
 951            loc = m.group(0)
 952            try:
 953                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
 954            except IndexError:
 955                # Locator without signature
 956                continue
 957            block_found = True
 958            if exp > now and (oldest_exp is None or exp < oldest_exp):
 959                oldest_exp = exp
 960                oldest_loc = loc
 961        if not block_found:
 962            # No block signatures found => no invalid block signatures.
 963            return True
 964        if oldest_loc is None:
 965            # Locator signatures found, but all have expired.
 966            # Reset the cache and move on.
 967            self.logger.info('Cache expired, starting from scratch.')
 968            self._state['manifest'] = ''
 969            return True
 970        kc = arvados.KeepClient(api_client=self._api_client,
 971                                num_retries=self.num_retries)
 972        try:
 973            kc.head(oldest_loc)
 974        except arvados.errors.KeepRequestError:
 975            # Something is wrong, cached manifest is not valid.
 976            return False
 977        return True
 978
 979    def collection_file_paths(self, col, path_prefix='.'):
 980        """Return a list of file paths by recursively go through the entire collection `col`"""
 981        file_paths = []
 982        for name, item in listitems(col):
 983            if isinstance(item, arvados.arvfile.ArvadosFile):
 984                file_paths.append(os.path.join(path_prefix, name))
 985            elif isinstance(item, arvados.collection.Subcollection):
 986                new_prefix = os.path.join(path_prefix, name)
 987                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
 988        return file_paths
 989
 990    def _lock_file(self, fileobj):
 991        try:
 992            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
 993        except IOError:
 994            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
 995
 996    def _save_state(self):
 997        """
 998        Atomically save current state into cache.
 999        """
1000        with self._state_lock:
1001            # We're not using copy.deepcopy() here because it's a lot slower
1002            # than json.dumps(), and we're already needing JSON format to be
1003            # saved on disk.
1004            state = json.dumps(self._state)
1005        try:
1006            new_cache = tempfile.NamedTemporaryFile(
1007                mode='w+',
1008                dir=os.path.dirname(self._cache_filename), delete=False)
1009            self._lock_file(new_cache)
1010            new_cache.write(state)
1011            new_cache.flush()
1012            os.fsync(new_cache)
1013            os.rename(new_cache.name, self._cache_filename)
1014        except (IOError, OSError, ResumeCacheConflict) as error:
1015            self.logger.error("There was a problem while saving the cache file: {}".format(error))
1016            try:
1017                os.unlink(new_cache_name)
1018            except NameError:  # mkstemp failed.
1019                pass
1020        else:
1021            self._cache_file.close()
1022            self._cache_file = new_cache
1023
1024    def collection_name(self):
1025        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1026
1027    def collection_trash_at(self):
1028        return self._my_collection().get_trash_at()
1029
1030    def manifest_locator(self):
1031        return self._my_collection().manifest_locator()
1032
1033    def portable_data_hash(self):
1034        pdh = self._my_collection().portable_data_hash()
1035        m = self._my_collection().stripped_manifest().encode()
1036        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1037        if pdh != local_pdh:
1038            self.logger.warning("\n".join([
1039                "arv-put: API server provided PDH differs from local manifest.",
1040                "         This should not happen; showing API server version."]))
1041        return pdh
1042
1043    def manifest_text(self, stream_name=".", strip=False, normalize=False):
1044        return self._my_collection().manifest_text(stream_name, strip, normalize)
1045
1046    def _datablocks_on_item(self, item):
1047        """
1048        Return a list of datablock locators, recursively navigating
1049        through subcollections
1050        """
1051        if isinstance(item, arvados.arvfile.ArvadosFile):
1052            if item.size() == 0:
1053                # Empty file locator
1054                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1055            else:
1056                locators = []
1057                for segment in item.segments():
1058                    loc = segment.locator
1059                    locators.append(loc)
1060                return locators
1061        elif isinstance(item, arvados.collection.Collection):
1062            l = [self._datablocks_on_item(x) for x in listvalues(item)]
1063            # Fast list flattener method taken from:
1064            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1065            return [loc for sublist in l for loc in sublist]
1066        else:
1067            return None
1068
1069    def data_locators(self):
1070        with self._collection_lock:
1071            # Make sure all datablocks are flushed before getting the locators
1072            self._my_collection().manifest_text()
1073            datablocks = self._datablocks_on_item(self._my_collection())
1074        return datablocks
ArvPutUploadJob( paths, resume=True, use_cache=True, reporter=None, name=None, owner_uuid=None, api_client=None, batch_mode=False, ensure_unique_name=False, num_retries=None, put_threads=None, replication_desired=None, filename=None, update_time=60.0, update_collection=None, storage_classes=None, logger=<Logger arvados.arv_put (WARNING)>, dry_run=False, follow_links=True, exclude_paths=[], exclude_names=None, trash_at=None)
447    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
448                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
449                 ensure_unique_name=False, num_retries=None,
450                 put_threads=None, replication_desired=None, filename=None,
451                 update_time=60.0, update_collection=None, storage_classes=None,
452                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
453                 follow_links=True, exclude_paths=[], exclude_names=None,
454                 trash_at=None):
455        self.paths = paths
456        self.resume = resume
457        self.use_cache = use_cache
458        self.batch_mode = batch_mode
459        self.update = False
460        self.reporter = reporter
461        # This will set to 0 before start counting, if no special files are going
462        # to be read.
463        self.bytes_expected = None
464        self.bytes_written = 0
465        self.bytes_skipped = 0
466        self.name = name
467        self.owner_uuid = owner_uuid
468        self.ensure_unique_name = ensure_unique_name
469        self.num_retries = num_retries
470        self.replication_desired = replication_desired
471        self.put_threads = put_threads
472        self.filename = filename
473        self.storage_classes = storage_classes
474        self._api_client = api_client
475        self._state_lock = threading.Lock()
476        self._state = None # Previous run state (file list & manifest)
477        self._current_files = [] # Current run file list
478        self._cache_file = None
479        self._collection_lock = threading.Lock()
480        self._remote_collection = None # Collection being updated (if asked)
481        self._local_collection = None # Collection from previous run manifest
482        self._file_paths = set() # Files to be updated in remote collection
483        self._stop_checkpointer = threading.Event()
484        self._checkpointer = threading.Thread(target=self._update_task)
485        self._checkpointer.daemon = True
486        self._update_task_time = update_time  # How many seconds wait between update runs
487        self._files_to_upload = FileUploadList(dry_run=dry_run)
488        self._upload_started = False
489        self.logger = logger
490        self.dry_run = dry_run
491        self._checkpoint_before_quit = True
492        self.follow_links = follow_links
493        self.exclude_paths = exclude_paths
494        self.exclude_names = exclude_names
495        self._trash_at = trash_at
496
497        if self._trash_at is not None:
498            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
499                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
500            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
501                raise TypeError('provided trash_at datetime should be timezone-naive')
502
503        if not self.use_cache and self.resume:
504            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
505
506        # Check for obvious dry-run responses
507        if self.dry_run and (not self.use_cache or not self.resume):
508            raise ArvPutUploadIsPending()
509
510        # Load cached data if any and if needed
511        self._setup_state(update_collection)
512
513        # Build the upload file list, excluding requested files and counting the
514        # bytes expected to be uploaded.
515        self._build_upload_list()
CACHE_DIR = '.cache/arvados/arv-put'
EMPTY_STATE = {'manifest': None, 'files': {}}
paths
resume
use_cache
batch_mode
update
reporter
bytes_expected
bytes_written
bytes_skipped
name
owner_uuid
ensure_unique_name
num_retries
replication_desired
put_threads
filename
storage_classes
logger
dry_run
exclude_paths
exclude_names
def start(self, save_collection):
608    def start(self, save_collection):
609        """
610        Start supporting thread & file uploading
611        """
612        self._checkpointer.start()
613        try:
614            # Update bytes_written from current local collection and
615            # report initial progress.
616            self._update()
617            # Actual file upload
618            self._upload_started = True # Used by the update thread to start checkpointing
619            self._upload_files()
620        except (SystemExit, Exception) as e:
621            self._checkpoint_before_quit = False
622            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
623            # Note: We're expecting SystemExit instead of
624            # KeyboardInterrupt because we have a custom signal
625            # handler in place that raises SystemExit with the catched
626            # signal's code.
627            if isinstance(e, PathDoesNotExistError):
628                # We aren't interested in the traceback for this case
629                pass
630            elif not isinstance(e, SystemExit) or e.code != -2:
631                self.logger.warning("Abnormal termination:\n{}".format(
632                    traceback.format_exc()))
633            raise
634        finally:
635            if not self.dry_run:
636                # Stop the thread before doing anything else
637                self._stop_checkpointer.set()
638                self._checkpointer.join()
639                if self._checkpoint_before_quit:
640                    # Commit all pending blocks & one last _update()
641                    self._local_collection.manifest_text()
642                    self._update(final=True)
643                    if save_collection:
644                        self.save_collection()
645            if self.use_cache:
646                self._cache_file.close()

Start supporting thread & file uploading

def save_collection(self):
659    def save_collection(self):
660        if self.update:
661            # Check if files should be updated on the remote collection.
662            for fp in self._file_paths:
663                remote_file = self._remote_collection.find(fp)
664                if not remote_file:
665                    # File don't exist on remote collection, copy it.
666                    self._remote_collection.copy(fp, fp, self._local_collection)
667                elif remote_file != self._local_collection.find(fp):
668                    # A different file exist on remote collection, overwrite it.
669                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
670                else:
671                    # The file already exist on remote collection, skip it.
672                    pass
673            self._remote_collection.save(num_retries=self.num_retries,
674                                         trash_at=self._collection_trash_at())
675        else:
676            if len(self._local_collection) == 0:
677                self.logger.warning("No files were uploaded, skipping collection creation.")
678                return
679            self._local_collection.save_new(
680                name=self.name, owner_uuid=self.owner_uuid,
681                ensure_unique_name=self.ensure_unique_name,
682                num_retries=self.num_retries,
683                trash_at=self._collection_trash_at())
def destroy_cache(self):
685    def destroy_cache(self):
686        if self.use_cache:
687            try:
688                os.unlink(self._cache_filename)
689            except OSError as error:
690                # That's what we wanted anyway.
691                if error.errno != errno.ENOENT:
692                    raise
693            self._cache_file.close()
def report_progress(self):
752    def report_progress(self):
753        if self.reporter is not None:
754            self.reporter(self.bytes_written, self.bytes_expected)
def collection_file_paths(self, col, path_prefix='.'):
979    def collection_file_paths(self, col, path_prefix='.'):
980        """Return a list of file paths by recursively go through the entire collection `col`"""
981        file_paths = []
982        for name, item in listitems(col):
983            if isinstance(item, arvados.arvfile.ArvadosFile):
984                file_paths.append(os.path.join(path_prefix, name))
985            elif isinstance(item, arvados.collection.Subcollection):
986                new_prefix = os.path.join(path_prefix, name)
987                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
988        return file_paths

Return a list of file paths by recursively go through the entire collection col

def collection_name(self):
1024    def collection_name(self):
1025        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
def collection_trash_at(self):
1027    def collection_trash_at(self):
1028        return self._my_collection().get_trash_at()
def manifest_locator(self):
1030    def manifest_locator(self):
1031        return self._my_collection().manifest_locator()
def portable_data_hash(self):
1033    def portable_data_hash(self):
1034        pdh = self._my_collection().portable_data_hash()
1035        m = self._my_collection().stripped_manifest().encode()
1036        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1037        if pdh != local_pdh:
1038            self.logger.warning("\n".join([
1039                "arv-put: API server provided PDH differs from local manifest.",
1040                "         This should not happen; showing API server version."]))
1041        return pdh
def manifest_text(self, stream_name='.', strip=False, normalize=False):
1043    def manifest_text(self, stream_name=".", strip=False, normalize=False):
1044        return self._my_collection().manifest_text(stream_name, strip, normalize)
def data_locators(self):
1069    def data_locators(self):
1070        with self._collection_lock:
1071            # Make sure all datablocks are flushed before getting the locators
1072            self._my_collection().manifest_text()
1073            datablocks = self._datablocks_on_item(self._my_collection())
1074        return datablocks
def pathname_match(pathname, pattern):
1083def pathname_match(pathname, pattern):
1084    name = pathname.split(os.sep)
1085    # Fix patterns like 'some/subdir/' or 'some//subdir'
1086    pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1087    if len(name) != len(pat):
1088        return False
1089    for i in range(len(name)):
1090        if not fnmatch.fnmatch(name[i], pat[i]):
1091            return False
1092    return True
def machine_progress(bytes_written, bytes_expected):
1094def machine_progress(bytes_written, bytes_expected):
1095    return _machine_format.format(
1096        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
def human_progress(bytes_written, bytes_expected):
1098def human_progress(bytes_written, bytes_expected):
1099    if bytes_expected:
1100        return "\r{}M / {}M {:.1%} ".format(
1101            bytes_written >> 20, bytes_expected >> 20,
1102            float(bytes_written) / bytes_expected)
1103    else:
1104        return "\r{} ".format(bytes_written)
def progress_writer(progress_func, outfile=<_io.StringIO object>):
1106def progress_writer(progress_func, outfile=sys.stderr):
1107    def write_progress(bytes_written, bytes_expected):
1108        outfile.write(progress_func(bytes_written, bytes_expected))
1109    return write_progress
def desired_project_uuid(api_client, project_uuid, num_retries):
1111def desired_project_uuid(api_client, project_uuid, num_retries):
1112    if not project_uuid:
1113        query = api_client.users().current()
1114    elif arvados.util.user_uuid_pattern.match(project_uuid):
1115        query = api_client.users().get(uuid=project_uuid)
1116    elif arvados.util.group_uuid_pattern.match(project_uuid):
1117        query = api_client.groups().get(uuid=project_uuid)
1118    else:
1119        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1120    return query.execute(num_retries=num_retries)['uuid']
def main( arguments=None, stdout=<_io.StringIO object>, stderr=<_io.StringIO object>, install_sig_handlers=True):
1122def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1123         install_sig_handlers=True):
1124    global api_client
1125
1126    args = parse_arguments(arguments)
1127    logger = logging.getLogger('arvados.arv_put')
1128    if args.silent:
1129        logger.setLevel(logging.WARNING)
1130    else:
1131        logger.setLevel(logging.INFO)
1132    status = 0
1133
1134    request_id = arvados.util.new_request_id()
1135
1136    formatter = ArvPutLogFormatter(request_id)
1137    logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1138
1139    if api_client is None:
1140        api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1141
1142    if install_sig_handlers:
1143        arv_cmd.install_signal_handlers()
1144
1145    # Trash arguments validation
1146    trash_at = None
1147    if args.trash_at is not None:
1148        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1149        # make sure the user provides a complete YYYY-MM-DD date.
1150        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1151            logger.error("--trash-at argument format invalid, use --help to see examples.")
1152            sys.exit(1)
1153        # Check if no time information was provided. In that case, assume end-of-day.
1154        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1155            args.trash_at += 'T23:59:59'
1156        try:
1157            trash_at = ciso8601.parse_datetime(args.trash_at)
1158        except:
1159            logger.error("--trash-at argument format invalid, use --help to see examples.")
1160            sys.exit(1)
1161        else:
1162            if trash_at.tzinfo is not None:
1163                # Timezone aware datetime provided.
1164                utcoffset = -trash_at.utcoffset()
1165            else:
1166                # Timezone naive datetime provided. Assume is local.
1167                if time.daylight:
1168                    utcoffset = datetime.timedelta(seconds=time.altzone)
1169                else:
1170                    utcoffset = datetime.timedelta(seconds=time.timezone)
1171            # Convert to UTC timezone naive datetime.
1172            trash_at = trash_at.replace(tzinfo=None) + utcoffset
1173
1174        if trash_at <= datetime.datetime.utcnow():
1175            logger.error("--trash-at argument must be set in the future")
1176            sys.exit(1)
1177    if args.trash_after is not None:
1178        if args.trash_after < 1:
1179            logger.error("--trash-after argument must be >= 1")
1180            sys.exit(1)
1181        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1182
1183    # Determine the name to use
1184    if args.name:
1185        if args.stream or args.raw:
1186            logger.error("Cannot use --name with --stream or --raw")
1187            sys.exit(1)
1188        elif args.update_collection:
1189            logger.error("Cannot use --name with --update-collection")
1190            sys.exit(1)
1191        collection_name = args.name
1192    else:
1193        collection_name = "Saved at {} by {}@{}".format(
1194            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1195            pwd.getpwuid(os.getuid()).pw_name,
1196            socket.gethostname())
1197
1198    if args.project_uuid and (args.stream or args.raw):
1199        logger.error("Cannot use --project-uuid with --stream or --raw")
1200        sys.exit(1)
1201
1202    # Determine the parent project
1203    try:
1204        project_uuid = desired_project_uuid(api_client, args.project_uuid,
1205                                            args.retries)
1206    except (apiclient_errors.Error, ValueError) as error:
1207        logger.error(error)
1208        sys.exit(1)
1209
1210    if args.progress:
1211        reporter = progress_writer(human_progress)
1212    elif args.batch_progress:
1213        reporter = progress_writer(machine_progress)
1214    else:
1215        reporter = None
1216
1217    #  Split storage-classes argument
1218    storage_classes = None
1219    if args.storage_classes:
1220        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1221
1222    # Setup exclude regex from all the --exclude arguments provided
1223    name_patterns = []
1224    exclude_paths = []
1225    exclude_names = None
1226    if len(args.exclude) > 0:
1227        # We're supporting 2 kinds of exclusion patterns:
1228        # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1229        #                            the name, wherever the file is on the tree)
1230        # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1231        #                            entire path, and should be relative to
1232        #                            any input dir argument)
1233        # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1234        #                            placed directly underneath the input dir)
1235        for p in args.exclude:
1236            # Only relative paths patterns allowed
1237            if p.startswith(os.sep):
1238                logger.error("Cannot use absolute paths with --exclude")
1239                sys.exit(1)
1240            if os.path.dirname(p):
1241                # We don't support of path patterns with '..'
1242                p_parts = p.split(os.sep)
1243                if '..' in p_parts:
1244                    logger.error(
1245                        "Cannot use path patterns that include or '..'")
1246                    sys.exit(1)
1247                # Path search pattern
1248                exclude_paths.append(p)
1249            else:
1250                # Name-only search pattern
1251                name_patterns.append(p)
1252        # For name only matching, we can combine all patterns into a single
1253        # regexp, for better performance.
1254        exclude_names = re.compile('|'.join(
1255            [fnmatch.translate(p) for p in name_patterns]
1256        )) if len(name_patterns) > 0 else None
1257        # Show the user the patterns to be used, just in case they weren't
1258        # specified inside quotes and got changed by the shell expansion.
1259        logger.info("Exclude patterns: {}".format(args.exclude))
1260
1261    # If this is used by a human, and there's at least one directory to be
1262    # uploaded, the expected bytes calculation can take a moment.
1263    if args.progress and any([os.path.isdir(f) for f in args.paths]):
1264        logger.info("Calculating upload size, this could take some time...")
1265    try:
1266        writer = ArvPutUploadJob(paths = args.paths,
1267                                 resume = args.resume,
1268                                 use_cache = args.use_cache,
1269                                 batch_mode= args.batch,
1270                                 filename = args.filename,
1271                                 reporter = reporter,
1272                                 api_client = api_client,
1273                                 num_retries = args.retries,
1274                                 replication_desired = args.replication,
1275                                 put_threads = args.threads,
1276                                 name = collection_name,
1277                                 owner_uuid = project_uuid,
1278                                 ensure_unique_name = True,
1279                                 update_collection = args.update_collection,
1280                                 storage_classes=storage_classes,
1281                                 logger=logger,
1282                                 dry_run=args.dry_run,
1283                                 follow_links=args.follow_links,
1284                                 exclude_paths=exclude_paths,
1285                                 exclude_names=exclude_names,
1286                                 trash_at=trash_at)
1287    except ResumeCacheConflict:
1288        logger.error("\n".join([
1289            "arv-put: Another process is already uploading this data.",
1290            "         Use --no-cache if this is really what you want."]))
1291        sys.exit(1)
1292    except ResumeCacheInvalidError:
1293        logger.error("\n".join([
1294            "arv-put: Resume cache contains invalid signature: it may have expired",
1295            "         or been created with another Arvados user's credentials.",
1296            "         Switch user or use one of the following options to restart upload:",
1297            "         --no-resume to start a new resume cache.",
1298            "         --no-cache to disable resume cache.",
1299            "         --batch to ignore the resume cache if invalid."]))
1300        sys.exit(1)
1301    except (CollectionUpdateError, PathDoesNotExistError) as error:
1302        logger.error("\n".join([
1303            "arv-put: %s" % str(error)]))
1304        sys.exit(1)
1305    except ArvPutUploadIsPending:
1306        # Dry run check successful, return proper exit code.
1307        sys.exit(2)
1308    except ArvPutUploadNotPending:
1309        # No files pending for upload
1310        sys.exit(0)
1311
1312    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1313        logger.warning("\n".join([
1314            "arv-put: Resuming previous upload from last checkpoint.",
1315            "         Use the --no-resume option to start over."]))
1316
1317    if not args.dry_run:
1318        writer.report_progress()
1319    output = None
1320    try:
1321        writer.start(save_collection=not(args.stream or args.raw))
1322    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1323        logger.error("\n".join([
1324            "arv-put: %s" % str(error)]))
1325        sys.exit(1)
1326
1327    if args.progress:  # Print newline to split stderr from stdout for humans.
1328        logger.info("\n")
1329
1330    if args.stream:
1331        if args.normalize:
1332            output = writer.manifest_text(normalize=True)
1333        else:
1334            output = writer.manifest_text()
1335    elif args.raw:
1336        output = ','.join(writer.data_locators())
1337    elif writer.manifest_locator() is not None:
1338        try:
1339            expiration_notice = ""
1340            if writer.collection_trash_at() is not None:
1341                # Get the local timezone-naive version, and log it with timezone information.
1342                if time.daylight:
1343                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1344                else:
1345                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1346                expiration_notice = ". It will expire on {} {}.".format(
1347                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1348            if args.update_collection:
1349                logger.info(u"Collection updated: '{}'{}".format(
1350                    writer.collection_name(), expiration_notice))
1351            else:
1352                logger.info(u"Collection saved as '{}'{}".format(
1353                    writer.collection_name(), expiration_notice))
1354            if args.portable_data_hash:
1355                output = writer.portable_data_hash()
1356            else:
1357                output = writer.manifest_locator()
1358        except apiclient_errors.Error as error:
1359            logger.error(
1360                "arv-put: Error creating Collection on project: {}.".format(
1361                    error))
1362            status = 1
1363    else:
1364        status = 1
1365
1366    # Print the locator (uuid) of the new collection.
1367    if output is None:
1368        status = status or 1
1369    elif not args.silent:
1370        stdout.write(output)
1371        if not output.endswith('\n'):
1372            stdout.write('\n')
1373
1374    if install_sig_handlers:
1375        arv_cmd.restore_signal_handlers()
1376
1377    if status != 0:
1378        sys.exit(status)
1379
1380    # Success!
1381    return output