arvados.commands.arv_copy

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5# arv-copy [--recursive] [--no-recursive] object-uuid
   6#
   7# Copies an object from Arvados instance src to instance dst.
   8#
   9# By default, arv-copy recursively copies any dependent objects
  10# necessary to make the object functional in the new instance
  11# (e.g. for a workflow, arv-copy copies the workflow,
  12# input collections, and docker images). If
  13# --no-recursive is given, arv-copy copies only the single record
  14# identified by object-uuid.
  15#
  16# The user must have configuration files {src}.conf and
  17# {dst}.conf in a standard configuration directory with valid login credentials
  18# for instances src and dst.  If either of these files is not found,
  19# arv-copy will issue an error.
  20
  21import argparse
  22import contextlib
  23import getpass
  24import os
  25import re
  26import shutil
  27import subprocess
  28import sys
  29import logging
  30import tempfile
  31import urllib.parse
  32import io
  33import json
  34import queue
  35import threading
  36import errno
  37
  38import httplib2.error
  39import googleapiclient
  40
  41import arvados
  42import arvados.api
  43import arvados.config
  44import arvados.keep
  45import arvados.util
  46import arvados.commands._util as arv_cmd
  47import arvados.commands.keepdocker
  48from arvados.logging import log_handler
  49
  50from arvados._internal import basedirs, http_to_keep, s3_to_keep, to_keep_util
  51from arvados._version import __version__
  52
  53COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
  54
  55arvlogger = logging.getLogger('arvados')
  56keeplogger = logging.getLogger('arvados.keep')
  57logger = logging.getLogger('arvados.arv-copy')
  58
  59# Set this up so connection errors get logged.
  60googleapi_logger = logging.getLogger('googleapiclient.http')
  61
  62# List of collections that have been copied in this session, and their
  63# destination collection UUIDs.
  64collections_copied = {}
  65
  66# Set of (repository, script_version) two-tuples of commits copied in git.
  67scripts_copied = set()
  68
  69# The owner_uuid of the object being copied
  70src_owner_uuid = None
  71
  72def main():
  73    copy_opts = argparse.ArgumentParser(add_help=False)
  74
  75    copy_opts.add_argument(
  76        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
  77        help='Print version and exit.')
  78    copy_opts.add_argument(
  79        '-v', '--verbose', dest='verbose', action='store_true',
  80        help='Verbose output.')
  81    copy_opts.add_argument(
  82        '--progress', dest='progress', action='store_true',
  83        help='Report progress on copying collections. (default)')
  84    copy_opts.add_argument(
  85        '--no-progress', dest='progress', action='store_false',
  86        help='Do not report progress on copying collections.')
  87    copy_opts.add_argument(
  88        '-f', '--force', dest='force', action='store_true',
  89        help='Perform copy even if the object appears to exist at the remote destination.')
  90    copy_opts.add_argument(
  91        '--src', dest='source_arvados',
  92        help="""
  93Client configuration location for the source Arvados cluster.
  94May be either a configuration file path, or a plain identifier like `foo`
  95to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
  96If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
  97""",
  98    )
  99    copy_opts.add_argument(
 100        '--dst', dest='destination_arvados',
 101        help="""
 102Client configuration location for the destination Arvados cluster.
 103May be either a configuration file path, or a plain identifier like `foo`
 104to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
 105If not provided, will use the default client configuration from the environment or `settings.conf`.
 106""",
 107    )
 108    copy_opts.add_argument(
 109        '--recursive', dest='recursive', action='store_true',
 110        help='Recursively copy any dependencies for this object, and subprojects. (default)')
 111    copy_opts.add_argument(
 112        '--no-recursive', dest='recursive', action='store_false',
 113        help='Do not copy any dependencies or subprojects.')
 114    copy_opts.add_argument(
 115        '--project-uuid', dest='project_uuid',
 116        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
 117    copy_opts.add_argument(
 118        '--replication',
 119        type=arv_cmd.RangedValue(int, range(1, sys.maxsize)),
 120        metavar='N',
 121        help="""
 122Number of replicas per storage class for the copied collections at the destination.
 123If not provided (or if provided with invalid value),
 124use the destination's default replication-level setting (if found),
 125or the fallback value 2.
 126""")
 127    copy_opts.add_argument(
 128        '--storage-classes',
 129        type=arv_cmd.UniqueSplit(),
 130        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
 131    copy_opts.add_argument(
 132        '--block-copy',
 133        dest='keep_block_copy',
 134        action='store_true',
 135        help="""Copy Keep blocks when copying collections (default).""",
 136    )
 137    copy_opts.add_argument(
 138        '--no-block-copy',
 139        dest='keep_block_copy',
 140        action='store_false',
 141        help="""Do not copy Keep blocks when copying collections. Must have
 142administrator privileges on the destination cluster to create collections.
 143""")
 144
 145    copy_opts.add_argument("--varying-url-params", type=str, default="",
 146                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
 147
 148    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
 149                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 150    copy_opts.add_argument(
 151        'object_uuid',
 152        help='The UUID of the object to be copied.')
 153
 154    copy_opts.set_defaults(
 155        # export_all_fields is used by external tools to make complete copies
 156        # of Arvados records.
 157        export_all_fields=False,
 158        keep_block_copy=True,
 159        progress=True,
 160        recursive=True,
 161    )
 162
 163    parser = argparse.ArgumentParser(
 164        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
 165        parents=[copy_opts, arv_cmd.retry_opt])
 166    args = parser.parse_args()
 167
 168    if args.verbose:
 169        arvlogger.setLevel(logging.DEBUG)
 170    else:
 171        arvlogger.setLevel(logging.INFO)
 172        keeplogger.setLevel(logging.WARNING)
 173
 174    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
 175        args.source_arvados = args.object_uuid[:5]
 176
 177    if not args.destination_arvados and args.project_uuid:
 178        args.destination_arvados = args.project_uuid[:5]
 179
 180    # Make sure errors trying to connect to clusters get logged.
 181    googleapi_logger.setLevel(logging.WARN)
 182    googleapi_logger.addHandler(log_handler)
 183
 184    # Create API clients for the source and destination instances
 185    src_arv = api_for_instance(args.source_arvados, args.retries)
 186    dst_arv = api_for_instance(args.destination_arvados, args.retries)
 187
 188    # Once we've successfully contacted the clusters, we probably
 189    # don't want to see logging about retries (unless the user asked
 190    # for verbose output).
 191    if not args.verbose:
 192        googleapi_logger.setLevel(logging.ERROR)
 193
 194    if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]:
 195        logger.info("Copying within cluster %s", src_arv.config()["ClusterID"])
 196    else:
 197        logger.info("Source cluster is %s", src_arv.config()["ClusterID"])
 198        logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"])
 199
 200    if not args.project_uuid:
 201        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
 202
 203    # Identify the kind of object we have been given, and begin copying.
 204    t = uuid_type(src_arv, args.object_uuid)
 205
 206    try:
 207        if t == 'Collection':
 208            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
 209            result = copy_collection(args.object_uuid,
 210                                     src_arv, dst_arv,
 211                                     args)
 212        elif t == 'Workflow':
 213            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
 214            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
 215        elif t == 'Group':
 216            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
 217            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
 218        elif t == 'httpURL' or t == 's3URL':
 219            result = copy_from_url(args.object_uuid, src_arv, dst_arv, args)
 220        else:
 221            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
 222    except Exception as e:
 223        logger.error("%s", e, exc_info=args.verbose)
 224        exit(1)
 225
 226    if not result:
 227        exit(1)
 228
 229    # If no exception was thrown and the response does not have an
 230    # error_token field, presume success
 231    if result is None or 'error_token' in result or 'uuid' not in result:
 232        if result:
 233            logger.error("API server returned an error result: {}".format(result))
 234        exit(1)
 235
 236    print(result['uuid'])
 237
 238    if result.get('partial_error'):
 239        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
 240        exit(1)
 241
 242    logger.info("Success: created copy with uuid {}".format(result['uuid']))
 243    exit(0)
 244
 245def set_src_owner_uuid(resource, uuid, args):
 246    global src_owner_uuid
 247    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
 248    src_owner_uuid = c.get("owner_uuid")
 249
 250# api_for_instance(instance_name)
 251#
 252#     Creates an API client for the Arvados instance identified by
 253#     instance_name.
 254#
 255#     If instance_name contains a slash, it is presumed to be a path
 256#     (either local or absolute) to a file with Arvados configuration
 257#     settings.
 258#
 259#     Otherwise, it is presumed to be the name of a file in a standard
 260#     configuration directory.
 261#
 262def api_for_instance(instance_name, num_retries):
 263    msg = []
 264    if instance_name:
 265        if '/' in instance_name:
 266            config_file = instance_name
 267        else:
 268            dirs = basedirs.BaseDirectories('CONFIG')
 269            config_file = next(dirs.search(f'{instance_name}.conf'), '')
 270
 271        arvados.api._reset_googleapiclient_logging()
 272        try:
 273            cfg = arvados.config.load(config_file)
 274
 275            if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
 276                api_is_insecure = (
 277                    cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
 278                        ['1', 't', 'true', 'y', 'yes']))
 279                return arvados.api('v1',
 280                                     host=cfg['ARVADOS_API_HOST'],
 281                                     token=cfg['ARVADOS_API_TOKEN'],
 282                                     insecure=api_is_insecure,
 283                                     num_retries=num_retries,
 284                                     )
 285            else:
 286                msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file))
 287        except OSError as e:
 288            if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH):
 289                verb = 'connect to instance from'
 290            elif config_file:
 291                verb = 'open'
 292            else:
 293                verb = 'find'
 294                searchlist = ":".join(str(p) for p in dirs.search_paths())
 295                config_file = f'{instance_name}.conf in path {searchlist}'
 296            msg.append(("Could not {} config file {}: {}").format(
 297                       verb, config_file, e.strerror))
 298        except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e:
 299            msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e))
 300
 301    arvados.api._reset_googleapiclient_logging()
 302    default_api = None
 303    default_instance = None
 304    try:
 305        default_api = arvados.api('v1', num_retries=num_retries)
 306        default_instance = default_api.config()["ClusterID"]
 307    except ValueError:
 308        pass
 309    except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e:
 310        msg.append("Failed to connect to default instance, error was {}".format(e))
 311
 312    if default_api is not None and (not instance_name or instance_name == default_instance):
 313        # Use default settings
 314        return default_api
 315
 316    if instance_name and default_instance and instance_name != default_instance:
 317        msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name))
 318
 319    for m in msg:
 320        logger.error(m)
 321
 322    abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
 323
 324# Check if git is available
 325def check_git_availability():
 326    try:
 327        subprocess.run(
 328            ['git', '--version'],
 329            check=True,
 330            stdout=subprocess.DEVNULL,
 331        )
 332    except FileNotFoundError:
 333        abort('git command is not available. Please ensure git is installed.')
 334
 335
 336def filter_iter(arg):
 337    """Iterate a filter string-or-list.
 338
 339    Pass in a filter field that can either be a string or list.
 340    This will iterate elements as if the field had been written as a list.
 341    """
 342    if isinstance(arg, str):
 343        yield arg
 344    else:
 345        yield from arg
 346
 347def migrate_repository_filter(repo_filter, src_repository, dst_repository):
 348    """Update a single repository filter in-place for the destination.
 349
 350    If the filter checks that the repository is src_repository, it is
 351    updated to check that the repository is dst_repository.  If it does
 352    anything else, this function raises ValueError.
 353    """
 354    if src_repository is None:
 355        raise ValueError("component does not specify a source repository")
 356    elif dst_repository is None:
 357        raise ValueError("no destination repository specified to update repository filter")
 358    elif repo_filter[1:] == ['=', src_repository]:
 359        repo_filter[2] = dst_repository
 360    elif repo_filter[1:] == ['in', [src_repository]]:
 361        repo_filter[2] = [dst_repository]
 362    else:
 363        raise ValueError("repository filter is not a simple source match")
 364
 365def migrate_script_version_filter(version_filter):
 366    """Update a single script_version filter in-place for the destination.
 367
 368    Currently this function checks that all the filter operands are Git
 369    commit hashes.  If they're not, it raises ValueError to indicate that
 370    the filter is not portable.  It could be extended to make other
 371    transformations in the future.
 372    """
 373    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
 374        raise ValueError("script_version filter is not limited to commit hashes")
 375
 376def attr_filtered(filter_, *attr_names):
 377    """Return True if filter_ applies to any of attr_names, else False."""
 378    return any((name == 'any') or (name in attr_names)
 379               for name in filter_iter(filter_[0]))
 380
 381@contextlib.contextmanager
 382def exception_handler(handler, *exc_types):
 383    """If any exc_types are raised in the block, call handler on the exception."""
 384    try:
 385        yield
 386    except exc_types as error:
 387        handler(error)
 388
 389
 390# copy_workflow(wf_uuid, src, dst, args)
 391#
 392#    Copies a workflow identified by wf_uuid from src to dst.
 393#
 394#    If args.recursive is True, also copy any collections
 395#      referenced in the workflow definition yaml.
 396#
 397#    The owner_uuid of the new workflow is set to any given
 398#      project_uuid or the user who copied the template.
 399#
 400#    Returns the copied workflow object.
 401#
 402def copy_workflow(wf_uuid, src, dst, args):
 403    # fetch the workflow from the source instance
 404    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
 405
 406    if not wf["definition"]:
 407        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
 408
 409    # copy collections and docker images
 410    if args.recursive and wf["definition"]:
 411        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
 412               "ARVADOS_API_TOKEN": src.api_token,
 413               "PATH": os.environ["PATH"]}
 414        try:
 415            result = subprocess.run(
 416                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
 417                env=env,
 418                stdout=subprocess.PIPE,
 419                universal_newlines=True,
 420            )
 421        except FileNotFoundError:
 422            no_arv_copy = True
 423        else:
 424            no_arv_copy = result.returncode == 2
 425
 426        if no_arv_copy:
 427            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
 428        elif result.returncode != 0:
 429            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
 430
 431        locations = json.loads(result.stdout)
 432
 433        if locations:
 434            copy_collections(locations, src, dst, args)
 435
 436    # copy the workflow itself
 437    del wf['uuid']
 438    wf['owner_uuid'] = args.project_uuid
 439
 440    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
 441                                             ["name", "=", wf["name"]]]).execute()
 442    if len(existing["items"]) == 0:
 443        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
 444    else:
 445        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
 446
 447
 448def workflow_collections(obj, locations, docker_images):
 449    if isinstance(obj, dict):
 450        loc = obj.get('location', None)
 451        if loc is not None:
 452            if loc.startswith("keep:"):
 453                locations.append(loc[5:])
 454
 455        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
 456        if docker_image is not None:
 457            ds = docker_image.split(":", 1)
 458            tag = ds[1] if len(ds)==2 else 'latest'
 459            docker_images[ds[0]] = tag
 460
 461        for x in obj:
 462            workflow_collections(obj[x], locations, docker_images)
 463    elif isinstance(obj, list):
 464        for x in obj:
 465            workflow_collections(x, locations, docker_images)
 466
 467# copy_collections(obj, src, dst, args)
 468#
 469#    Recursively copies all collections referenced by 'obj' from src
 470#    to dst.  obj may be a dict or a list, in which case we run
 471#    copy_collections on every value it contains. If it is a string,
 472#    search it for any substring that matches a collection hash or uuid
 473#    (this will find hidden references to collections like
 474#      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
 475#
 476#    Returns a copy of obj with any old collection uuids replaced by
 477#    the new ones.
 478#
 479def copy_collections(obj, src, dst, args):
 480
 481    def copy_collection_fn(collection_match):
 482        """Helper function for regex substitution: copies a single collection,
 483        identified by the collection_match MatchObject, to the
 484        destination.  Returns the destination collection uuid (or the
 485        portable data hash if that's what src_id is).
 486
 487        """
 488        src_id = collection_match.group(0)
 489        if src_id not in collections_copied:
 490            dst_col = copy_collection(src_id, src, dst, args)
 491            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
 492                collections_copied[src_id] = src_id
 493            else:
 494                collections_copied[src_id] = dst_col['uuid']
 495        return collections_copied[src_id]
 496
 497    if isinstance(obj, str):
 498        # Copy any collections identified in this string to dst, replacing
 499        # them with the dst uuids as necessary.
 500        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
 501        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
 502        return obj
 503    elif isinstance(obj, dict):
 504        return type(obj)((v, copy_collections(obj[v], src, dst, args))
 505                         for v in obj)
 506    elif isinstance(obj, list):
 507        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
 508    return obj
 509
 510
 511def total_collection_size(manifest_text):
 512    """Return the total number of bytes in this collection (excluding
 513    duplicate blocks)."""
 514
 515    total_bytes = 0
 516    locators_seen = {}
 517    for line in manifest_text.splitlines():
 518        words = line.split()
 519        for word in words[1:]:
 520            try:
 521                loc = arvados.KeepLocator(word)
 522            except ValueError:
 523                continue  # this word isn't a locator, skip it
 524            if loc.md5sum not in locators_seen:
 525                locators_seen[loc.md5sum] = True
 526                total_bytes += loc.size
 527
 528    return total_bytes
 529
 530def create_collection_from(c, src, dst, args):
 531    """Create a new collection record on dst, and copy Docker metadata if
 532    available."""
 533
 534    collection_uuid = c['uuid']
 535    if args.export_all_fields:
 536        body = c.copy()
 537    else:
 538        body = {key: c[key] for key in [
 539            'description',
 540            'manifest_text',
 541            'name',
 542            'portable_data_hash',
 543            'properties',
 544        ]}
 545        if not body['name']:
 546            body['name'] = f"copied from {collection_uuid}"
 547        if args.storage_classes:
 548            body['storage_classes_desired'] = args.storage_classes
 549        body['owner_uuid'] = args.project_uuid
 550
 551    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
 552
 553    # Create docker_image_repo+tag and docker_image_hash links
 554    # at the destination.
 555    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
 556        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
 557
 558        for src_link in docker_links:
 559            body = {key: src_link[key]
 560                    for key in ['link_class', 'name', 'properties']}
 561            body['head_uuid'] = dst_collection['uuid']
 562            body['owner_uuid'] = args.project_uuid
 563
 564            lk = dst.links().create(body=body).execute(num_retries=args.retries)
 565            logger.debug('created dst link {}'.format(lk))
 566
 567    return dst_collection
 568
 569# copy_collection(obj_uuid, src, dst, args)
 570#
 571#    Copies the collection identified by obj_uuid from src to dst.
 572#    Returns the collection object created at dst.
 573#
 574#    If args.progress is True, produce a human-friendly progress
 575#    report.
 576#
 577#    If a collection with the desired portable_data_hash already
 578#    exists at dst, and args.force is False, copy_collection returns
 579#    the existing collection without copying any blocks.  Otherwise
 580#    (if no collection exists or if args.force is True)
 581#    copy_collection copies all of the collection data blocks from src
 582#    to dst.
 583#
 584#    For this application, it is critical to preserve the
 585#    collection's manifest hash, which is not guaranteed with the
 586#    arvados.CollectionReader and arvados.CollectionWriter classes.
 587#    Copying each block in the collection manually, followed by
 588#    the manifest block, ensures that the collection's manifest
 589#    hash will not change.
 590#
 591def copy_collection(obj_uuid, src, dst, args):
 592    if arvados.util.keep_locator_pattern.match(obj_uuid):
 593        # If the obj_uuid is a portable data hash, it might not be
 594        # uniquely identified with a particular collection.  As a
 595        # result, it is ambiguous as to what name to use for the copy.
 596        # Apply some heuristics to pick which collection to get the
 597        # name from.
 598        srccol = src.collections().list(
 599            filters=[['portable_data_hash', '=', obj_uuid]],
 600            order="created_at asc"
 601            ).execute(num_retries=args.retries)
 602
 603        items = srccol.get("items")
 604
 605        if not items:
 606            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
 607            return
 608
 609        c = None
 610
 611        if len(items) == 1:
 612            # There's only one collection with the PDH, so use that.
 613            c = items[0]
 614        if not c:
 615            # See if there is a collection that's in the same project
 616            # as the root item (usually a workflow) being copied.
 617            for i in items:
 618                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
 619                    c = i
 620                    break
 621        if not c:
 622            # Didn't find any collections located in the same project, so
 623            # pick the oldest collection that has a name assigned to it.
 624            for i in items:
 625                if i.get("name"):
 626                    c = i
 627                    break
 628        if not c:
 629            # None of the collections have names (?!), so just pick the
 630            # first one.
 631            c = items[0]
 632
 633        # list() doesn't return manifest text (and we don't want it to,
 634        # because we don't need the same maninfest text sent to us 50
 635        # times) so go and retrieve the collection object directly
 636        # which will include the manifest text.
 637        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
 638    else:
 639        # Assume this is an actual collection uuid, so fetch it directly.
 640        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
 641
 642    # If a collection with this hash already exists at the
 643    # destination, and 'force' is not true, just return that
 644    # collection.
 645    if not args.force:
 646        if 'portable_data_hash' in c:
 647            colhash = c['portable_data_hash']
 648        else:
 649            colhash = c['uuid']
 650        dstcol = dst.collections().list(
 651            filters=[['portable_data_hash', '=', colhash]]
 652        ).execute(num_retries=args.retries)
 653        if dstcol['items_available'] > 0:
 654            for d in dstcol['items']:
 655                if ((args.project_uuid == d['owner_uuid']) and
 656                    (c.get('name') == d['name']) and
 657                    (c['portable_data_hash'] == d['portable_data_hash'])):
 658                    return d
 659            c['manifest_text'] = dst.collections().get(
 660                uuid=dstcol['items'][0]['uuid']
 661            ).execute(num_retries=args.retries)['manifest_text']
 662            return create_collection_from(c, src, dst, args)
 663
 664    if args.replication is None:
 665        # Obtain default or fallback collection replication setting on the
 666        # destination
 667        try:
 668            args.replication = int(dst.config()["Collections"]["DefaultReplication"])
 669        except (KeyError, TypeError, ValueError):
 670            args.replication = 2
 671
 672    # Fetch the collection's manifest.
 673    manifest = c['manifest_text']
 674    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
 675
 676    # Copy each block from src_keep to dst_keep.
 677    # Use the newly signed locators returned from dst_keep to build
 678    # a new manifest as we go.
 679    src_keep = src.keep
 680    dst_keep = dst.keep
 681    dst_manifest = io.StringIO()
 682    dst_locators = {}
 683    bytes_written = 0
 684    bytes_expected = total_collection_size(manifest)
 685    if args.progress:
 686        progress_writer = ProgressWriter(human_progress)
 687    else:
 688        progress_writer = None
 689
 690    # go through the words
 691    # put each block loc into 'get' queue
 692    # 'get' threads get block and put it into 'put' queue
 693    # 'put' threads put block and then update dst_locators
 694    #
 695    # after going through the whole manifest we go back through it
 696    # again and build dst_manifest
 697
 698    lock = threading.Lock()
 699
 700    # the get queue should be unbounded because we'll add all the
 701    # block hashes we want to get, but these are small
 702    get_queue = queue.Queue()
 703
 704    threadcount = 4
 705
 706    # the put queue contains full data blocks
 707    # and if 'get' is faster than 'put' we could end up consuming
 708    # a great deal of RAM if it isn't bounded.
 709    put_queue = queue.Queue(threadcount)
 710    transfer_error = []
 711
 712    def get_thread():
 713        while True:
 714            word = get_queue.get()
 715            if word is None:
 716                put_queue.put(None)
 717                get_queue.task_done()
 718                return
 719
 720            blockhash = arvados.KeepLocator(word).md5sum
 721            with lock:
 722                if blockhash in dst_locators:
 723                    # Already uploaded
 724                    get_queue.task_done()
 725                    continue
 726
 727            try:
 728                logger.debug("Getting block %s", word)
 729                data = src_keep.get(word)
 730                put_queue.put((word, data))
 731            except Exception as e:
 732                logger.error("Error getting block %s: %s", word, e)
 733                transfer_error.append(e)
 734                try:
 735                    # Drain the 'get' queue so we end early
 736                    while True:
 737                        get_queue.get(False)
 738                        get_queue.task_done()
 739                except queue.Empty:
 740                    pass
 741            finally:
 742                get_queue.task_done()
 743
 744    def put_thread():
 745        nonlocal bytes_written
 746        while True:
 747            item = put_queue.get()
 748            if item is None:
 749                put_queue.task_done()
 750                return
 751
 752            word, data = item
 753            loc = arvados.KeepLocator(word)
 754            blockhash = loc.md5sum
 755            with lock:
 756                if blockhash in dst_locators:
 757                    # Already uploaded
 758                    put_queue.task_done()
 759                    continue
 760
 761            try:
 762                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
 763                dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or []))
 764                with lock:
 765                    dst_locators[blockhash] = dst_locator
 766                    bytes_written += loc.size
 767                    if progress_writer:
 768                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
 769            except Exception as e:
 770                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
 771                try:
 772                    # Drain the 'get' queue so we end early
 773                    while True:
 774                        get_queue.get(False)
 775                        get_queue.task_done()
 776                except queue.Empty:
 777                    pass
 778                transfer_error.append(e)
 779            finally:
 780                put_queue.task_done()
 781
 782    if args.keep_block_copy:
 783        for line in manifest.splitlines():
 784            words = line.split()
 785            for word in words[1:]:
 786                try:
 787                    loc = arvados.KeepLocator(word)
 788                except ValueError:
 789                    # If 'word' can't be parsed as a locator,
 790                    # presume it's a filename.
 791                    continue
 792
 793                get_queue.put(word)
 794
 795        for i in range(0, threadcount):
 796            get_queue.put(None)
 797
 798        for i in range(0, threadcount):
 799            threading.Thread(target=get_thread, daemon=True).start()
 800
 801        for i in range(0, threadcount):
 802            threading.Thread(target=put_thread, daemon=True).start()
 803
 804        get_queue.join()
 805        put_queue.join()
 806
 807        if len(transfer_error) > 0:
 808            return {"error_token": "Failed to transfer blocks"}
 809
 810    for line in manifest.splitlines():
 811        words = iter(line.split())
 812        out_words = [next(words)]
 813        for word in words:
 814            try:
 815                loc = arvados.KeepLocator(word)
 816            except ValueError:
 817                # If 'word' can't be parsed as a locator,
 818                # presume it's a filename.
 819                out_words.append(word)
 820            else:
 821                if args.keep_block_copy:
 822                    out_words.append(dst_locators[loc.md5sum])
 823                else:
 824                    out_words.append(loc.stripped())
 825        dst_manifest.write(' '.join(out_words))
 826        dst_manifest.write("\n")
 827
 828    if progress_writer:
 829        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
 830        progress_writer.finish()
 831
 832    # Copy the manifest and save the collection.
 833    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
 834
 835    c['manifest_text'] = dst_manifest.getvalue()
 836    return create_collection_from(c, src, dst, args)
 837
 838def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
 839    """Copy the docker image identified by docker_image and
 840    docker_image_tag from src to dst. Create appropriate
 841    docker_image_repo+tag and docker_image_hash links at dst.
 842
 843    """
 844
 845    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
 846
 847    # Find the link identifying this docker image.
 848    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
 849        src, args.retries, docker_image, docker_image_tag)
 850    if docker_image_list:
 851        image_uuid, image_info = docker_image_list[0]
 852        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
 853
 854        # Copy the collection it refers to.
 855        dst_image_col = copy_collection(image_uuid, src, dst, args)
 856    elif arvados.util.keep_locator_pattern.match(docker_image):
 857        dst_image_col = copy_collection(docker_image, src, dst, args)
 858    else:
 859        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
 860
 861def copy_project(obj_uuid, src, dst, owner_uuid, args):
 862
 863    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
 864
 865    # Create/update the destination project
 866    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
 867                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
 868    try:
 869        existing_uuid = existing['items'][0]['uuid']
 870    except IndexError:
 871        body = src_project_record if args.export_all_fields else {'group': {
 872            'description': src_project_record['description'],
 873            'group_class': 'project',
 874            'name': src_project_record['name'],
 875            'owner_uuid': owner_uuid,
 876        }}
 877        project_req = dst.groups().create(body=body)
 878    else:
 879        project_req = dst.groups().update(
 880            uuid=existing_uuid,
 881            body={'group': {
 882                'description': src_project_record['description'],
 883            }},
 884        )
 885
 886    project_record = project_req.execute(num_retries=args.retries)
 887    args.project_uuid = project_record["uuid"]
 888    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
 889
 890    partial_error = ""
 891    # Copy collections
 892    try:
 893        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
 894                         src, dst, args)
 895    except Exception as e:
 896        partial_error += "\n" + str(e)
 897
 898    # Copy workflows
 899    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
 900        try:
 901            copy_workflow(w["uuid"], src, dst, args)
 902        except Exception as e:
 903            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
 904
 905    if args.recursive:
 906        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
 907            try:
 908                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
 909            except Exception as e:
 910                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
 911
 912    project_record["partial_error"] = partial_error
 913
 914    return project_record
 915
 916# git_rev_parse(rev, repo)
 917#
 918#    Returns the 40-character commit hash corresponding to 'rev' in
 919#    git repository 'repo' (which must be the path of a local git
 920#    repository)
 921#
 922def git_rev_parse(rev, repo):
 923    proc = subprocess.run(
 924        ['git', 'rev-parse', rev],
 925        check=True,
 926        cwd=repo,
 927        stdout=subprocess.PIPE,
 928        text=True,
 929    )
 930    return proc.stdout.read().strip()
 931
 932# uuid_type(api, object_uuid)
 933#
 934#    Returns the name of the class that object_uuid belongs to, based on
 935#    the second field of the uuid.  This function consults the api's
 936#    schema to identify the object class.
 937#
 938#    It returns a string such as 'Collection', 'Workflow', etc.
 939#
 940#    Special case: if handed a Keep locator hash, return 'Collection'.
 941#
 942def uuid_type(api, object_uuid):
 943    if re.match(arvados.util.keep_locator_pattern, object_uuid):
 944        return 'Collection'
 945
 946    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
 947        return 'httpURL'
 948
 949    if object_uuid.startswith("s3:"):
 950        return 's3URL'
 951
 952    p = object_uuid.split('-')
 953    if len(p) == 3:
 954        type_prefix = p[1]
 955        for k in api._schema.schemas:
 956            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
 957            if type_prefix == obj_class:
 958                return k
 959    return None
 960
 961
 962def copy_from_url(url, src, dst, args):
 963
 964    project_uuid = args.project_uuid
 965    # Ensure string of varying parameters is well-formed
 966    prefer_cached_downloads = args.prefer_cached_downloads
 967
 968    cached = to_keep_util.CheckCacheResult(None, None, None, None, None)
 969
 970    if url.startswith("http:") or url.startswith("https:"):
 971        cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
 972                                               varying_url_params=args.varying_url_params,
 973                                               prefer_cached_downloads=prefer_cached_downloads)
 974    elif url.startswith("s3:"):
 975        import boto3.session
 976        botosession = boto3.session.Session()
 977        cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {},
 978                                             prefer_cached_downloads=prefer_cached_downloads)
 979
 980    if cached[2] is not None:
 981        return copy_collection(cached[2], src, dst, args)
 982
 983    if url.startswith("http:") or url.startswith("https:"):
 984        cached = http_to_keep.http_to_keep(dst, project_uuid, url,
 985                                           varying_url_params=args.varying_url_params,
 986                                           prefer_cached_downloads=prefer_cached_downloads)
 987    elif url.startswith("s3:"):
 988        cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url,
 989                                       prefer_cached_downloads=prefer_cached_downloads)
 990
 991    if cached is not None:
 992        return {"uuid": cached[2]}
 993
 994
 995def abort(msg, code=1):
 996    logger.info("arv-copy: %s", msg)
 997    exit(code)
 998
 999
1000# Code for reporting on the progress of a collection upload.
1001# Stolen from arvados.commands.put.ArvPutCollectionWriter
1002# TODO(twp): figure out how to refactor into a shared library
1003# (may involve refactoring some arvados.commands.arv_copy.copy_collection
1004# code)
1005
1006def machine_progress(obj_uuid, bytes_written, bytes_expected):
1007    return "{} {}: {} {} written {} total\n".format(
1008        sys.argv[0],
1009        os.getpid(),
1010        obj_uuid,
1011        bytes_written,
1012        -1 if (bytes_expected is None) else bytes_expected)
1013
1014def human_progress(obj_uuid, bytes_written, bytes_expected):
1015    if bytes_expected:
1016        return "\r{}: {}M / {}M {:.1%} ".format(
1017            obj_uuid,
1018            bytes_written >> 20, bytes_expected >> 20,
1019            float(bytes_written) / bytes_expected)
1020    else:
1021        return "\r{}: {} ".format(obj_uuid, bytes_written)
1022
1023class ProgressWriter(object):
1024    _progress_func = None
1025    outfile = sys.stderr
1026
1027    def __init__(self, progress_func):
1028        self._progress_func = progress_func
1029
1030    def report(self, obj_uuid, bytes_written, bytes_expected):
1031        if self._progress_func is not None:
1032            self.outfile.write(
1033                self._progress_func(obj_uuid, bytes_written, bytes_expected))
1034
1035    def finish(self):
1036        self.outfile.write("\n")
1037
1038if __name__ == '__main__':
1039    main()
COMMIT_HASH_RE = re.compile('^[0-9a-f]{1,40}$')
arvlogger = <Logger arvados (WARNING)>
keeplogger = <Logger arvados.keep (WARNING)>
logger = <Logger arvados.arv-copy (WARNING)>
googleapi_logger = <Logger googleapiclient.http (WARNING)>
collections_copied = {}
scripts_copied = set()
src_owner_uuid = None
def main():
 73def main():
 74    copy_opts = argparse.ArgumentParser(add_help=False)
 75
 76    copy_opts.add_argument(
 77        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
 78        help='Print version and exit.')
 79    copy_opts.add_argument(
 80        '-v', '--verbose', dest='verbose', action='store_true',
 81        help='Verbose output.')
 82    copy_opts.add_argument(
 83        '--progress', dest='progress', action='store_true',
 84        help='Report progress on copying collections. (default)')
 85    copy_opts.add_argument(
 86        '--no-progress', dest='progress', action='store_false',
 87        help='Do not report progress on copying collections.')
 88    copy_opts.add_argument(
 89        '-f', '--force', dest='force', action='store_true',
 90        help='Perform copy even if the object appears to exist at the remote destination.')
 91    copy_opts.add_argument(
 92        '--src', dest='source_arvados',
 93        help="""
 94Client configuration location for the source Arvados cluster.
 95May be either a configuration file path, or a plain identifier like `foo`
 96to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
 97If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
 98""",
 99    )
100    copy_opts.add_argument(
101        '--dst', dest='destination_arvados',
102        help="""
103Client configuration location for the destination Arvados cluster.
104May be either a configuration file path, or a plain identifier like `foo`
105to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
106If not provided, will use the default client configuration from the environment or `settings.conf`.
107""",
108    )
109    copy_opts.add_argument(
110        '--recursive', dest='recursive', action='store_true',
111        help='Recursively copy any dependencies for this object, and subprojects. (default)')
112    copy_opts.add_argument(
113        '--no-recursive', dest='recursive', action='store_false',
114        help='Do not copy any dependencies or subprojects.')
115    copy_opts.add_argument(
116        '--project-uuid', dest='project_uuid',
117        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
118    copy_opts.add_argument(
119        '--replication',
120        type=arv_cmd.RangedValue(int, range(1, sys.maxsize)),
121        metavar='N',
122        help="""
123Number of replicas per storage class for the copied collections at the destination.
124If not provided (or if provided with invalid value),
125use the destination's default replication-level setting (if found),
126or the fallback value 2.
127""")
128    copy_opts.add_argument(
129        '--storage-classes',
130        type=arv_cmd.UniqueSplit(),
131        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
132    copy_opts.add_argument(
133        '--block-copy',
134        dest='keep_block_copy',
135        action='store_true',
136        help="""Copy Keep blocks when copying collections (default).""",
137    )
138    copy_opts.add_argument(
139        '--no-block-copy',
140        dest='keep_block_copy',
141        action='store_false',
142        help="""Do not copy Keep blocks when copying collections. Must have
143administrator privileges on the destination cluster to create collections.
144""")
145
146    copy_opts.add_argument("--varying-url-params", type=str, default="",
147                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
148
149    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
150                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
151    copy_opts.add_argument(
152        'object_uuid',
153        help='The UUID of the object to be copied.')
154
155    copy_opts.set_defaults(
156        # export_all_fields is used by external tools to make complete copies
157        # of Arvados records.
158        export_all_fields=False,
159        keep_block_copy=True,
160        progress=True,
161        recursive=True,
162    )
163
164    parser = argparse.ArgumentParser(
165        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
166        parents=[copy_opts, arv_cmd.retry_opt])
167    args = parser.parse_args()
168
169    if args.verbose:
170        arvlogger.setLevel(logging.DEBUG)
171    else:
172        arvlogger.setLevel(logging.INFO)
173        keeplogger.setLevel(logging.WARNING)
174
175    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
176        args.source_arvados = args.object_uuid[:5]
177
178    if not args.destination_arvados and args.project_uuid:
179        args.destination_arvados = args.project_uuid[:5]
180
181    # Make sure errors trying to connect to clusters get logged.
182    googleapi_logger.setLevel(logging.WARN)
183    googleapi_logger.addHandler(log_handler)
184
185    # Create API clients for the source and destination instances
186    src_arv = api_for_instance(args.source_arvados, args.retries)
187    dst_arv = api_for_instance(args.destination_arvados, args.retries)
188
189    # Once we've successfully contacted the clusters, we probably
190    # don't want to see logging about retries (unless the user asked
191    # for verbose output).
192    if not args.verbose:
193        googleapi_logger.setLevel(logging.ERROR)
194
195    if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]:
196        logger.info("Copying within cluster %s", src_arv.config()["ClusterID"])
197    else:
198        logger.info("Source cluster is %s", src_arv.config()["ClusterID"])
199        logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"])
200
201    if not args.project_uuid:
202        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
203
204    # Identify the kind of object we have been given, and begin copying.
205    t = uuid_type(src_arv, args.object_uuid)
206
207    try:
208        if t == 'Collection':
209            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
210            result = copy_collection(args.object_uuid,
211                                     src_arv, dst_arv,
212                                     args)
213        elif t == 'Workflow':
214            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
215            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
216        elif t == 'Group':
217            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
218            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
219        elif t == 'httpURL' or t == 's3URL':
220            result = copy_from_url(args.object_uuid, src_arv, dst_arv, args)
221        else:
222            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
223    except Exception as e:
224        logger.error("%s", e, exc_info=args.verbose)
225        exit(1)
226
227    if not result:
228        exit(1)
229
230    # If no exception was thrown and the response does not have an
231    # error_token field, presume success
232    if result is None or 'error_token' in result or 'uuid' not in result:
233        if result:
234            logger.error("API server returned an error result: {}".format(result))
235        exit(1)
236
237    print(result['uuid'])
238
239    if result.get('partial_error'):
240        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
241        exit(1)
242
243    logger.info("Success: created copy with uuid {}".format(result['uuid']))
244    exit(0)
def set_src_owner_uuid(resource, uuid, args):
246def set_src_owner_uuid(resource, uuid, args):
247    global src_owner_uuid
248    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
249    src_owner_uuid = c.get("owner_uuid")
def api_for_instance(instance_name, num_retries):
263def api_for_instance(instance_name, num_retries):
264    msg = []
265    if instance_name:
266        if '/' in instance_name:
267            config_file = instance_name
268        else:
269            dirs = basedirs.BaseDirectories('CONFIG')
270            config_file = next(dirs.search(f'{instance_name}.conf'), '')
271
272        arvados.api._reset_googleapiclient_logging()
273        try:
274            cfg = arvados.config.load(config_file)
275
276            if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
277                api_is_insecure = (
278                    cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
279                        ['1', 't', 'true', 'y', 'yes']))
280                return arvados.api('v1',
281                                     host=cfg['ARVADOS_API_HOST'],
282                                     token=cfg['ARVADOS_API_TOKEN'],
283                                     insecure=api_is_insecure,
284                                     num_retries=num_retries,
285                                     )
286            else:
287                msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file))
288        except OSError as e:
289            if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH):
290                verb = 'connect to instance from'
291            elif config_file:
292                verb = 'open'
293            else:
294                verb = 'find'
295                searchlist = ":".join(str(p) for p in dirs.search_paths())
296                config_file = f'{instance_name}.conf in path {searchlist}'
297            msg.append(("Could not {} config file {}: {}").format(
298                       verb, config_file, e.strerror))
299        except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e:
300            msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e))
301
302    arvados.api._reset_googleapiclient_logging()
303    default_api = None
304    default_instance = None
305    try:
306        default_api = arvados.api('v1', num_retries=num_retries)
307        default_instance = default_api.config()["ClusterID"]
308    except ValueError:
309        pass
310    except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e:
311        msg.append("Failed to connect to default instance, error was {}".format(e))
312
313    if default_api is not None and (not instance_name or instance_name == default_instance):
314        # Use default settings
315        return default_api
316
317    if instance_name and default_instance and instance_name != default_instance:
318        msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name))
319
320    for m in msg:
321        logger.error(m)
322
323    abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
def check_git_availability():
326def check_git_availability():
327    try:
328        subprocess.run(
329            ['git', '--version'],
330            check=True,
331            stdout=subprocess.DEVNULL,
332        )
333    except FileNotFoundError:
334        abort('git command is not available. Please ensure git is installed.')
def filter_iter(arg):
337def filter_iter(arg):
338    """Iterate a filter string-or-list.
339
340    Pass in a filter field that can either be a string or list.
341    This will iterate elements as if the field had been written as a list.
342    """
343    if isinstance(arg, str):
344        yield arg
345    else:
346        yield from arg

Iterate a filter string-or-list.

Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.

def migrate_repository_filter(repo_filter, src_repository, dst_repository):
348def migrate_repository_filter(repo_filter, src_repository, dst_repository):
349    """Update a single repository filter in-place for the destination.
350
351    If the filter checks that the repository is src_repository, it is
352    updated to check that the repository is dst_repository.  If it does
353    anything else, this function raises ValueError.
354    """
355    if src_repository is None:
356        raise ValueError("component does not specify a source repository")
357    elif dst_repository is None:
358        raise ValueError("no destination repository specified to update repository filter")
359    elif repo_filter[1:] == ['=', src_repository]:
360        repo_filter[2] = dst_repository
361    elif repo_filter[1:] == ['in', [src_repository]]:
362        repo_filter[2] = [dst_repository]
363    else:
364        raise ValueError("repository filter is not a simple source match")

Update a single repository filter in-place for the destination.

If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.

def migrate_script_version_filter(version_filter):
366def migrate_script_version_filter(version_filter):
367    """Update a single script_version filter in-place for the destination.
368
369    Currently this function checks that all the filter operands are Git
370    commit hashes.  If they're not, it raises ValueError to indicate that
371    the filter is not portable.  It could be extended to make other
372    transformations in the future.
373    """
374    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
375        raise ValueError("script_version filter is not limited to commit hashes")

Update a single script_version filter in-place for the destination.

Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.

def attr_filtered(filter_, *attr_names):
377def attr_filtered(filter_, *attr_names):
378    """Return True if filter_ applies to any of attr_names, else False."""
379    return any((name == 'any') or (name in attr_names)
380               for name in filter_iter(filter_[0]))

Return True if filter_ applies to any of attr_names, else False.

@contextlib.contextmanager
def exception_handler(handler, *exc_types):
382@contextlib.contextmanager
383def exception_handler(handler, *exc_types):
384    """If any exc_types are raised in the block, call handler on the exception."""
385    try:
386        yield
387    except exc_types as error:
388        handler(error)

If any exc_types are raised in the block, call handler on the exception.

def copy_workflow(wf_uuid, src, dst, args):
403def copy_workflow(wf_uuid, src, dst, args):
404    # fetch the workflow from the source instance
405    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
406
407    if not wf["definition"]:
408        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
409
410    # copy collections and docker images
411    if args.recursive and wf["definition"]:
412        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
413               "ARVADOS_API_TOKEN": src.api_token,
414               "PATH": os.environ["PATH"]}
415        try:
416            result = subprocess.run(
417                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
418                env=env,
419                stdout=subprocess.PIPE,
420                universal_newlines=True,
421            )
422        except FileNotFoundError:
423            no_arv_copy = True
424        else:
425            no_arv_copy = result.returncode == 2
426
427        if no_arv_copy:
428            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
429        elif result.returncode != 0:
430            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
431
432        locations = json.loads(result.stdout)
433
434        if locations:
435            copy_collections(locations, src, dst, args)
436
437    # copy the workflow itself
438    del wf['uuid']
439    wf['owner_uuid'] = args.project_uuid
440
441    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
442                                             ["name", "=", wf["name"]]]).execute()
443    if len(existing["items"]) == 0:
444        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
445    else:
446        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
def workflow_collections(obj, locations, docker_images):
449def workflow_collections(obj, locations, docker_images):
450    if isinstance(obj, dict):
451        loc = obj.get('location', None)
452        if loc is not None:
453            if loc.startswith("keep:"):
454                locations.append(loc[5:])
455
456        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
457        if docker_image is not None:
458            ds = docker_image.split(":", 1)
459            tag = ds[1] if len(ds)==2 else 'latest'
460            docker_images[ds[0]] = tag
461
462        for x in obj:
463            workflow_collections(obj[x], locations, docker_images)
464    elif isinstance(obj, list):
465        for x in obj:
466            workflow_collections(x, locations, docker_images)
def copy_collections(obj, src, dst, args):
480def copy_collections(obj, src, dst, args):
481
482    def copy_collection_fn(collection_match):
483        """Helper function for regex substitution: copies a single collection,
484        identified by the collection_match MatchObject, to the
485        destination.  Returns the destination collection uuid (or the
486        portable data hash if that's what src_id is).
487
488        """
489        src_id = collection_match.group(0)
490        if src_id not in collections_copied:
491            dst_col = copy_collection(src_id, src, dst, args)
492            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
493                collections_copied[src_id] = src_id
494            else:
495                collections_copied[src_id] = dst_col['uuid']
496        return collections_copied[src_id]
497
498    if isinstance(obj, str):
499        # Copy any collections identified in this string to dst, replacing
500        # them with the dst uuids as necessary.
501        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
502        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
503        return obj
504    elif isinstance(obj, dict):
505        return type(obj)((v, copy_collections(obj[v], src, dst, args))
506                         for v in obj)
507    elif isinstance(obj, list):
508        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
509    return obj
def total_collection_size(manifest_text):
512def total_collection_size(manifest_text):
513    """Return the total number of bytes in this collection (excluding
514    duplicate blocks)."""
515
516    total_bytes = 0
517    locators_seen = {}
518    for line in manifest_text.splitlines():
519        words = line.split()
520        for word in words[1:]:
521            try:
522                loc = arvados.KeepLocator(word)
523            except ValueError:
524                continue  # this word isn't a locator, skip it
525            if loc.md5sum not in locators_seen:
526                locators_seen[loc.md5sum] = True
527                total_bytes += loc.size
528
529    return total_bytes

Return the total number of bytes in this collection (excluding duplicate blocks).

def create_collection_from(c, src, dst, args):
531def create_collection_from(c, src, dst, args):
532    """Create a new collection record on dst, and copy Docker metadata if
533    available."""
534
535    collection_uuid = c['uuid']
536    if args.export_all_fields:
537        body = c.copy()
538    else:
539        body = {key: c[key] for key in [
540            'description',
541            'manifest_text',
542            'name',
543            'portable_data_hash',
544            'properties',
545        ]}
546        if not body['name']:
547            body['name'] = f"copied from {collection_uuid}"
548        if args.storage_classes:
549            body['storage_classes_desired'] = args.storage_classes
550        body['owner_uuid'] = args.project_uuid
551
552    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
553
554    # Create docker_image_repo+tag and docker_image_hash links
555    # at the destination.
556    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
557        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
558
559        for src_link in docker_links:
560            body = {key: src_link[key]
561                    for key in ['link_class', 'name', 'properties']}
562            body['head_uuid'] = dst_collection['uuid']
563            body['owner_uuid'] = args.project_uuid
564
565            lk = dst.links().create(body=body).execute(num_retries=args.retries)
566            logger.debug('created dst link {}'.format(lk))
567
568    return dst_collection

Create a new collection record on dst, and copy Docker metadata if available.

def copy_collection(obj_uuid, src, dst, args):
592def copy_collection(obj_uuid, src, dst, args):
593    if arvados.util.keep_locator_pattern.match(obj_uuid):
594        # If the obj_uuid is a portable data hash, it might not be
595        # uniquely identified with a particular collection.  As a
596        # result, it is ambiguous as to what name to use for the copy.
597        # Apply some heuristics to pick which collection to get the
598        # name from.
599        srccol = src.collections().list(
600            filters=[['portable_data_hash', '=', obj_uuid]],
601            order="created_at asc"
602            ).execute(num_retries=args.retries)
603
604        items = srccol.get("items")
605
606        if not items:
607            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
608            return
609
610        c = None
611
612        if len(items) == 1:
613            # There's only one collection with the PDH, so use that.
614            c = items[0]
615        if not c:
616            # See if there is a collection that's in the same project
617            # as the root item (usually a workflow) being copied.
618            for i in items:
619                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
620                    c = i
621                    break
622        if not c:
623            # Didn't find any collections located in the same project, so
624            # pick the oldest collection that has a name assigned to it.
625            for i in items:
626                if i.get("name"):
627                    c = i
628                    break
629        if not c:
630            # None of the collections have names (?!), so just pick the
631            # first one.
632            c = items[0]
633
634        # list() doesn't return manifest text (and we don't want it to,
635        # because we don't need the same maninfest text sent to us 50
636        # times) so go and retrieve the collection object directly
637        # which will include the manifest text.
638        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
639    else:
640        # Assume this is an actual collection uuid, so fetch it directly.
641        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
642
643    # If a collection with this hash already exists at the
644    # destination, and 'force' is not true, just return that
645    # collection.
646    if not args.force:
647        if 'portable_data_hash' in c:
648            colhash = c['portable_data_hash']
649        else:
650            colhash = c['uuid']
651        dstcol = dst.collections().list(
652            filters=[['portable_data_hash', '=', colhash]]
653        ).execute(num_retries=args.retries)
654        if dstcol['items_available'] > 0:
655            for d in dstcol['items']:
656                if ((args.project_uuid == d['owner_uuid']) and
657                    (c.get('name') == d['name']) and
658                    (c['portable_data_hash'] == d['portable_data_hash'])):
659                    return d
660            c['manifest_text'] = dst.collections().get(
661                uuid=dstcol['items'][0]['uuid']
662            ).execute(num_retries=args.retries)['manifest_text']
663            return create_collection_from(c, src, dst, args)
664
665    if args.replication is None:
666        # Obtain default or fallback collection replication setting on the
667        # destination
668        try:
669            args.replication = int(dst.config()["Collections"]["DefaultReplication"])
670        except (KeyError, TypeError, ValueError):
671            args.replication = 2
672
673    # Fetch the collection's manifest.
674    manifest = c['manifest_text']
675    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
676
677    # Copy each block from src_keep to dst_keep.
678    # Use the newly signed locators returned from dst_keep to build
679    # a new manifest as we go.
680    src_keep = src.keep
681    dst_keep = dst.keep
682    dst_manifest = io.StringIO()
683    dst_locators = {}
684    bytes_written = 0
685    bytes_expected = total_collection_size(manifest)
686    if args.progress:
687        progress_writer = ProgressWriter(human_progress)
688    else:
689        progress_writer = None
690
691    # go through the words
692    # put each block loc into 'get' queue
693    # 'get' threads get block and put it into 'put' queue
694    # 'put' threads put block and then update dst_locators
695    #
696    # after going through the whole manifest we go back through it
697    # again and build dst_manifest
698
699    lock = threading.Lock()
700
701    # the get queue should be unbounded because we'll add all the
702    # block hashes we want to get, but these are small
703    get_queue = queue.Queue()
704
705    threadcount = 4
706
707    # the put queue contains full data blocks
708    # and if 'get' is faster than 'put' we could end up consuming
709    # a great deal of RAM if it isn't bounded.
710    put_queue = queue.Queue(threadcount)
711    transfer_error = []
712
713    def get_thread():
714        while True:
715            word = get_queue.get()
716            if word is None:
717                put_queue.put(None)
718                get_queue.task_done()
719                return
720
721            blockhash = arvados.KeepLocator(word).md5sum
722            with lock:
723                if blockhash in dst_locators:
724                    # Already uploaded
725                    get_queue.task_done()
726                    continue
727
728            try:
729                logger.debug("Getting block %s", word)
730                data = src_keep.get(word)
731                put_queue.put((word, data))
732            except Exception as e:
733                logger.error("Error getting block %s: %s", word, e)
734                transfer_error.append(e)
735                try:
736                    # Drain the 'get' queue so we end early
737                    while True:
738                        get_queue.get(False)
739                        get_queue.task_done()
740                except queue.Empty:
741                    pass
742            finally:
743                get_queue.task_done()
744
745    def put_thread():
746        nonlocal bytes_written
747        while True:
748            item = put_queue.get()
749            if item is None:
750                put_queue.task_done()
751                return
752
753            word, data = item
754            loc = arvados.KeepLocator(word)
755            blockhash = loc.md5sum
756            with lock:
757                if blockhash in dst_locators:
758                    # Already uploaded
759                    put_queue.task_done()
760                    continue
761
762            try:
763                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
764                dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or []))
765                with lock:
766                    dst_locators[blockhash] = dst_locator
767                    bytes_written += loc.size
768                    if progress_writer:
769                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
770            except Exception as e:
771                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
772                try:
773                    # Drain the 'get' queue so we end early
774                    while True:
775                        get_queue.get(False)
776                        get_queue.task_done()
777                except queue.Empty:
778                    pass
779                transfer_error.append(e)
780            finally:
781                put_queue.task_done()
782
783    if args.keep_block_copy:
784        for line in manifest.splitlines():
785            words = line.split()
786            for word in words[1:]:
787                try:
788                    loc = arvados.KeepLocator(word)
789                except ValueError:
790                    # If 'word' can't be parsed as a locator,
791                    # presume it's a filename.
792                    continue
793
794                get_queue.put(word)
795
796        for i in range(0, threadcount):
797            get_queue.put(None)
798
799        for i in range(0, threadcount):
800            threading.Thread(target=get_thread, daemon=True).start()
801
802        for i in range(0, threadcount):
803            threading.Thread(target=put_thread, daemon=True).start()
804
805        get_queue.join()
806        put_queue.join()
807
808        if len(transfer_error) > 0:
809            return {"error_token": "Failed to transfer blocks"}
810
811    for line in manifest.splitlines():
812        words = iter(line.split())
813        out_words = [next(words)]
814        for word in words:
815            try:
816                loc = arvados.KeepLocator(word)
817            except ValueError:
818                # If 'word' can't be parsed as a locator,
819                # presume it's a filename.
820                out_words.append(word)
821            else:
822                if args.keep_block_copy:
823                    out_words.append(dst_locators[loc.md5sum])
824                else:
825                    out_words.append(loc.stripped())
826        dst_manifest.write(' '.join(out_words))
827        dst_manifest.write("\n")
828
829    if progress_writer:
830        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
831        progress_writer.finish()
832
833    # Copy the manifest and save the collection.
834    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
835
836    c['manifest_text'] = dst_manifest.getvalue()
837    return create_collection_from(c, src, dst, args)
def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
839def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
840    """Copy the docker image identified by docker_image and
841    docker_image_tag from src to dst. Create appropriate
842    docker_image_repo+tag and docker_image_hash links at dst.
843
844    """
845
846    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
847
848    # Find the link identifying this docker image.
849    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
850        src, args.retries, docker_image, docker_image_tag)
851    if docker_image_list:
852        image_uuid, image_info = docker_image_list[0]
853        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
854
855        # Copy the collection it refers to.
856        dst_image_col = copy_collection(image_uuid, src, dst, args)
857    elif arvados.util.keep_locator_pattern.match(docker_image):
858        dst_image_col = copy_collection(docker_image, src, dst, args)
859    else:
860        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))

Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.

def copy_project(obj_uuid, src, dst, owner_uuid, args):
862def copy_project(obj_uuid, src, dst, owner_uuid, args):
863
864    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
865
866    # Create/update the destination project
867    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
868                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
869    try:
870        existing_uuid = existing['items'][0]['uuid']
871    except IndexError:
872        body = src_project_record if args.export_all_fields else {'group': {
873            'description': src_project_record['description'],
874            'group_class': 'project',
875            'name': src_project_record['name'],
876            'owner_uuid': owner_uuid,
877        }}
878        project_req = dst.groups().create(body=body)
879    else:
880        project_req = dst.groups().update(
881            uuid=existing_uuid,
882            body={'group': {
883                'description': src_project_record['description'],
884            }},
885        )
886
887    project_record = project_req.execute(num_retries=args.retries)
888    args.project_uuid = project_record["uuid"]
889    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
890
891    partial_error = ""
892    # Copy collections
893    try:
894        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
895                         src, dst, args)
896    except Exception as e:
897        partial_error += "\n" + str(e)
898
899    # Copy workflows
900    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
901        try:
902            copy_workflow(w["uuid"], src, dst, args)
903        except Exception as e:
904            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
905
906    if args.recursive:
907        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
908            try:
909                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
910            except Exception as e:
911                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
912
913    project_record["partial_error"] = partial_error
914
915    return project_record
def git_rev_parse(rev, repo):
923def git_rev_parse(rev, repo):
924    proc = subprocess.run(
925        ['git', 'rev-parse', rev],
926        check=True,
927        cwd=repo,
928        stdout=subprocess.PIPE,
929        text=True,
930    )
931    return proc.stdout.read().strip()
def uuid_type(api, object_uuid):
943def uuid_type(api, object_uuid):
944    if re.match(arvados.util.keep_locator_pattern, object_uuid):
945        return 'Collection'
946
947    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
948        return 'httpURL'
949
950    if object_uuid.startswith("s3:"):
951        return 's3URL'
952
953    p = object_uuid.split('-')
954    if len(p) == 3:
955        type_prefix = p[1]
956        for k in api._schema.schemas:
957            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
958            if type_prefix == obj_class:
959                return k
960    return None
def copy_from_url(url, src, dst, args):
963def copy_from_url(url, src, dst, args):
964
965    project_uuid = args.project_uuid
966    # Ensure string of varying parameters is well-formed
967    prefer_cached_downloads = args.prefer_cached_downloads
968
969    cached = to_keep_util.CheckCacheResult(None, None, None, None, None)
970
971    if url.startswith("http:") or url.startswith("https:"):
972        cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
973                                               varying_url_params=args.varying_url_params,
974                                               prefer_cached_downloads=prefer_cached_downloads)
975    elif url.startswith("s3:"):
976        import boto3.session
977        botosession = boto3.session.Session()
978        cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {},
979                                             prefer_cached_downloads=prefer_cached_downloads)
980
981    if cached[2] is not None:
982        return copy_collection(cached[2], src, dst, args)
983
984    if url.startswith("http:") or url.startswith("https:"):
985        cached = http_to_keep.http_to_keep(dst, project_uuid, url,
986                                           varying_url_params=args.varying_url_params,
987                                           prefer_cached_downloads=prefer_cached_downloads)
988    elif url.startswith("s3:"):
989        cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url,
990                                       prefer_cached_downloads=prefer_cached_downloads)
991
992    if cached is not None:
993        return {"uuid": cached[2]}
def abort(msg, code=1):
996def abort(msg, code=1):
997    logger.info("arv-copy: %s", msg)
998    exit(code)
def machine_progress(obj_uuid, bytes_written, bytes_expected):
1007def machine_progress(obj_uuid, bytes_written, bytes_expected):
1008    return "{} {}: {} {} written {} total\n".format(
1009        sys.argv[0],
1010        os.getpid(),
1011        obj_uuid,
1012        bytes_written,
1013        -1 if (bytes_expected is None) else bytes_expected)
def human_progress(obj_uuid, bytes_written, bytes_expected):
1015def human_progress(obj_uuid, bytes_written, bytes_expected):
1016    if bytes_expected:
1017        return "\r{}: {}M / {}M {:.1%} ".format(
1018            obj_uuid,
1019            bytes_written >> 20, bytes_expected >> 20,
1020            float(bytes_written) / bytes_expected)
1021    else:
1022        return "\r{}: {} ".format(obj_uuid, bytes_written)
class ProgressWriter:
1024class ProgressWriter(object):
1025    _progress_func = None
1026    outfile = sys.stderr
1027
1028    def __init__(self, progress_func):
1029        self._progress_func = progress_func
1030
1031    def report(self, obj_uuid, bytes_written, bytes_expected):
1032        if self._progress_func is not None:
1033            self.outfile.write(
1034                self._progress_func(obj_uuid, bytes_written, bytes_expected))
1035
1036    def finish(self):
1037        self.outfile.write("\n")
ProgressWriter(progress_func)
1028    def __init__(self, progress_func):
1029        self._progress_func = progress_func
outfile = <_io.TextIOWrapper encoding='UTF-8'>
def report(self, obj_uuid, bytes_written, bytes_expected):
1031    def report(self, obj_uuid, bytes_written, bytes_expected):
1032        if self._progress_func is not None:
1033            self.outfile.write(
1034                self._progress_func(obj_uuid, bytes_written, bytes_expected))
def finish(self):
1036    def finish(self):
1037        self.outfile.write("\n")