arvados.commands.arv_copy

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5# arv-copy [--recursive] [--no-recursive] object-uuid
   6#
   7# Copies an object from Arvados instance src to instance dst.
   8#
   9# By default, arv-copy recursively copies any dependent objects
  10# necessary to make the object functional in the new instance
  11# (e.g. for a workflow, arv-copy copies the workflow,
  12# input collections, and docker images). If
  13# --no-recursive is given, arv-copy copies only the single record
  14# identified by object-uuid.
  15#
  16# The user must have configuration files {src}.conf and
  17# {dst}.conf in a standard configuration directory with valid login credentials
  18# for instances src and dst.  If either of these files is not found,
  19# arv-copy will issue an error.
  20
  21import argparse
  22import contextlib
  23import getpass
  24import os
  25import re
  26import shutil
  27import subprocess
  28import sys
  29import logging
  30import tempfile
  31import urllib.parse
  32import io
  33import json
  34import queue
  35import threading
  36import errno
  37
  38import httplib2.error
  39import googleapiclient
  40
  41import arvados
  42import arvados.config
  43import arvados.keep
  44import arvados.util
  45import arvados.commands._util as arv_cmd
  46import arvados.commands.keepdocker
  47from arvados.logging import log_handler
  48
  49from arvados._internal import basedirs, http_to_keep, s3_to_keep, to_keep_util
  50from arvados._version import __version__
  51
  52COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
  53
  54arvlogger = logging.getLogger('arvados')
  55keeplogger = logging.getLogger('arvados.keep')
  56logger = logging.getLogger('arvados.arv-copy')
  57
  58# Set this up so connection errors get logged.
  59googleapi_logger = logging.getLogger('googleapiclient.http')
  60
  61# local_repo_dir records which git repositories from the Arvados source
  62# instance have been checked out locally during this run, and to which
  63# directories.
  64# e.g. if repository 'twp' from src_arv has been cloned into
  65# /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
  66#
  67local_repo_dir = {}
  68
  69# List of collections that have been copied in this session, and their
  70# destination collection UUIDs.
  71collections_copied = {}
  72
  73# Set of (repository, script_version) two-tuples of commits copied in git.
  74scripts_copied = set()
  75
  76# The owner_uuid of the object being copied
  77src_owner_uuid = None
  78
  79def main():
  80    copy_opts = argparse.ArgumentParser(add_help=False)
  81
  82    copy_opts.add_argument(
  83        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
  84        help='Print version and exit.')
  85    copy_opts.add_argument(
  86        '-v', '--verbose', dest='verbose', action='store_true',
  87        help='Verbose output.')
  88    copy_opts.add_argument(
  89        '--progress', dest='progress', action='store_true',
  90        help='Report progress on copying collections. (default)')
  91    copy_opts.add_argument(
  92        '--no-progress', dest='progress', action='store_false',
  93        help='Do not report progress on copying collections.')
  94    copy_opts.add_argument(
  95        '-f', '--force', dest='force', action='store_true',
  96        help='Perform copy even if the object appears to exist at the remote destination.')
  97    copy_opts.add_argument(
  98        '--src', dest='source_arvados',
  99        help="""
 100Client configuration location for the source Arvados cluster.
 101May be either a configuration file path, or a plain identifier like `foo`
 102to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
 103If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
 104""",
 105    )
 106    copy_opts.add_argument(
 107        '--dst', dest='destination_arvados',
 108        help="""
 109Client configuration location for the destination Arvados cluster.
 110May be either a configuration file path, or a plain identifier like `foo`
 111to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
 112If not provided, will use the default client configuration from the environment or `settings.conf`.
 113""",
 114    )
 115    copy_opts.add_argument(
 116        '--recursive', dest='recursive', action='store_true',
 117        help='Recursively copy any dependencies for this object, and subprojects. (default)')
 118    copy_opts.add_argument(
 119        '--no-recursive', dest='recursive', action='store_false',
 120        help='Do not copy any dependencies or subprojects.')
 121    copy_opts.add_argument(
 122        '--project-uuid', dest='project_uuid',
 123        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
 124    copy_opts.add_argument(
 125        '--replication',
 126        type=arv_cmd.RangedValue(int, range(1, sys.maxsize)),
 127        metavar='N',
 128        help="""
 129Number of replicas per storage class for the copied collections at the destination.
 130If not provided (or if provided with invalid value),
 131use the destination's default replication-level setting (if found),
 132or the fallback value 2.
 133""")
 134    copy_opts.add_argument(
 135        '--storage-classes',
 136        type=arv_cmd.UniqueSplit(),
 137        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
 138    copy_opts.add_argument("--varying-url-params", type=str, default="",
 139                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
 140
 141    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
 142                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
 143
 144    copy_opts.add_argument(
 145        'object_uuid',
 146        help='The UUID of the object to be copied.')
 147    copy_opts.set_defaults(progress=True)
 148    copy_opts.set_defaults(recursive=True)
 149
 150    parser = argparse.ArgumentParser(
 151        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
 152        parents=[copy_opts, arv_cmd.retry_opt])
 153    args = parser.parse_args()
 154
 155    if args.verbose:
 156        arvlogger.setLevel(logging.DEBUG)
 157    else:
 158        arvlogger.setLevel(logging.INFO)
 159        keeplogger.setLevel(logging.WARNING)
 160
 161    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
 162        args.source_arvados = args.object_uuid[:5]
 163
 164    if not args.destination_arvados and args.project_uuid:
 165        args.destination_arvados = args.project_uuid[:5]
 166
 167    # Make sure errors trying to connect to clusters get logged.
 168    googleapi_logger.setLevel(logging.WARN)
 169    googleapi_logger.addHandler(log_handler)
 170
 171    # Create API clients for the source and destination instances
 172    src_arv = api_for_instance(args.source_arvados, args.retries)
 173    dst_arv = api_for_instance(args.destination_arvados, args.retries)
 174
 175    # Once we've successfully contacted the clusters, we probably
 176    # don't want to see logging about retries (unless the user asked
 177    # for verbose output).
 178    if not args.verbose:
 179        googleapi_logger.setLevel(logging.ERROR)
 180
 181    if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]:
 182        logger.info("Copying within cluster %s", src_arv.config()["ClusterID"])
 183    else:
 184        logger.info("Source cluster is %s", src_arv.config()["ClusterID"])
 185        logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"])
 186
 187    if not args.project_uuid:
 188        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
 189
 190    # Identify the kind of object we have been given, and begin copying.
 191    t = uuid_type(src_arv, args.object_uuid)
 192
 193    try:
 194        if t == 'Collection':
 195            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
 196            result = copy_collection(args.object_uuid,
 197                                     src_arv, dst_arv,
 198                                     args)
 199        elif t == 'Workflow':
 200            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
 201            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
 202        elif t == 'Group':
 203            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
 204            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
 205        elif t == 'httpURL' or t == 's3URL':
 206            result = copy_from_url(args.object_uuid, src_arv, dst_arv, args)
 207        else:
 208            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
 209    except Exception as e:
 210        logger.error("%s", e, exc_info=args.verbose)
 211        exit(1)
 212
 213    # Clean up any outstanding temp git repositories.
 214    for d in local_repo_dir.values():
 215        shutil.rmtree(d, ignore_errors=True)
 216
 217    if not result:
 218        exit(1)
 219
 220    # If no exception was thrown and the response does not have an
 221    # error_token field, presume success
 222    if result is None or 'error_token' in result or 'uuid' not in result:
 223        if result:
 224            logger.error("API server returned an error result: {}".format(result))
 225        exit(1)
 226
 227    print(result['uuid'])
 228
 229    if result.get('partial_error'):
 230        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
 231        exit(1)
 232
 233    logger.info("Success: created copy with uuid {}".format(result['uuid']))
 234    exit(0)
 235
 236def set_src_owner_uuid(resource, uuid, args):
 237    global src_owner_uuid
 238    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
 239    src_owner_uuid = c.get("owner_uuid")
 240
 241# api_for_instance(instance_name)
 242#
 243#     Creates an API client for the Arvados instance identified by
 244#     instance_name.
 245#
 246#     If instance_name contains a slash, it is presumed to be a path
 247#     (either local or absolute) to a file with Arvados configuration
 248#     settings.
 249#
 250#     Otherwise, it is presumed to be the name of a file in a standard
 251#     configuration directory.
 252#
 253def api_for_instance(instance_name, num_retries):
 254    msg = []
 255    if instance_name:
 256        if '/' in instance_name:
 257            config_file = instance_name
 258        else:
 259            dirs = basedirs.BaseDirectories('CONFIG')
 260            config_file = next(dirs.search(f'{instance_name}.conf'), '')
 261
 262        try:
 263            cfg = arvados.config.load(config_file)
 264
 265            if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
 266                api_is_insecure = (
 267                    cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
 268                        ['1', 't', 'true', 'y', 'yes']))
 269                return arvados.api('v1',
 270                                     host=cfg['ARVADOS_API_HOST'],
 271                                     token=cfg['ARVADOS_API_TOKEN'],
 272                                     insecure=api_is_insecure,
 273                                     num_retries=num_retries,
 274                                     )
 275            else:
 276                msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file))
 277        except OSError as e:
 278            if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH):
 279                verb = 'connect to instance from'
 280            elif config_file:
 281                verb = 'open'
 282            else:
 283                verb = 'find'
 284                searchlist = ":".join(str(p) for p in dirs.search_paths())
 285                config_file = f'{instance_name}.conf in path {searchlist}'
 286            msg.append(("Could not {} config file {}: {}").format(
 287                       verb, config_file, e.strerror))
 288        except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e:
 289            msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e))
 290
 291    default_api = None
 292    default_instance = None
 293    try:
 294        default_api = arvados.api('v1', num_retries=num_retries)
 295        default_instance = default_api.config()["ClusterID"]
 296    except ValueError:
 297        pass
 298    except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e:
 299        msg.append("Failed to connect to default instance, error was {}".format(e))
 300
 301    if default_api is not None and (not instance_name or instance_name == default_instance):
 302        # Use default settings
 303        return default_api
 304
 305    if instance_name and default_instance and instance_name != default_instance:
 306        msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name))
 307
 308    for m in msg:
 309        logger.error(m)
 310
 311    abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
 312
 313# Check if git is available
 314def check_git_availability():
 315    try:
 316        subprocess.run(
 317            ['git', '--version'],
 318            check=True,
 319            stdout=subprocess.DEVNULL,
 320        )
 321    except FileNotFoundError:
 322        abort('git command is not available. Please ensure git is installed.')
 323
 324
 325def filter_iter(arg):
 326    """Iterate a filter string-or-list.
 327
 328    Pass in a filter field that can either be a string or list.
 329    This will iterate elements as if the field had been written as a list.
 330    """
 331    if isinstance(arg, str):
 332        yield arg
 333    else:
 334        yield from arg
 335
 336def migrate_repository_filter(repo_filter, src_repository, dst_repository):
 337    """Update a single repository filter in-place for the destination.
 338
 339    If the filter checks that the repository is src_repository, it is
 340    updated to check that the repository is dst_repository.  If it does
 341    anything else, this function raises ValueError.
 342    """
 343    if src_repository is None:
 344        raise ValueError("component does not specify a source repository")
 345    elif dst_repository is None:
 346        raise ValueError("no destination repository specified to update repository filter")
 347    elif repo_filter[1:] == ['=', src_repository]:
 348        repo_filter[2] = dst_repository
 349    elif repo_filter[1:] == ['in', [src_repository]]:
 350        repo_filter[2] = [dst_repository]
 351    else:
 352        raise ValueError("repository filter is not a simple source match")
 353
 354def migrate_script_version_filter(version_filter):
 355    """Update a single script_version filter in-place for the destination.
 356
 357    Currently this function checks that all the filter operands are Git
 358    commit hashes.  If they're not, it raises ValueError to indicate that
 359    the filter is not portable.  It could be extended to make other
 360    transformations in the future.
 361    """
 362    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
 363        raise ValueError("script_version filter is not limited to commit hashes")
 364
 365def attr_filtered(filter_, *attr_names):
 366    """Return True if filter_ applies to any of attr_names, else False."""
 367    return any((name == 'any') or (name in attr_names)
 368               for name in filter_iter(filter_[0]))
 369
 370@contextlib.contextmanager
 371def exception_handler(handler, *exc_types):
 372    """If any exc_types are raised in the block, call handler on the exception."""
 373    try:
 374        yield
 375    except exc_types as error:
 376        handler(error)
 377
 378
 379# copy_workflow(wf_uuid, src, dst, args)
 380#
 381#    Copies a workflow identified by wf_uuid from src to dst.
 382#
 383#    If args.recursive is True, also copy any collections
 384#      referenced in the workflow definition yaml.
 385#
 386#    The owner_uuid of the new workflow is set to any given
 387#      project_uuid or the user who copied the template.
 388#
 389#    Returns the copied workflow object.
 390#
 391def copy_workflow(wf_uuid, src, dst, args):
 392    # fetch the workflow from the source instance
 393    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
 394
 395    if not wf["definition"]:
 396        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
 397
 398    # copy collections and docker images
 399    if args.recursive and wf["definition"]:
 400        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
 401               "ARVADOS_API_TOKEN": src.api_token,
 402               "PATH": os.environ["PATH"]}
 403        try:
 404            result = subprocess.run(
 405                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
 406                env=env,
 407                stdout=subprocess.PIPE,
 408                universal_newlines=True,
 409            )
 410        except FileNotFoundError:
 411            no_arv_copy = True
 412        else:
 413            no_arv_copy = result.returncode == 2
 414
 415        if no_arv_copy:
 416            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
 417        elif result.returncode != 0:
 418            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
 419
 420        locations = json.loads(result.stdout)
 421
 422        if locations:
 423            copy_collections(locations, src, dst, args)
 424
 425    # copy the workflow itself
 426    del wf['uuid']
 427    wf['owner_uuid'] = args.project_uuid
 428
 429    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
 430                                             ["name", "=", wf["name"]]]).execute()
 431    if len(existing["items"]) == 0:
 432        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
 433    else:
 434        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
 435
 436
 437def workflow_collections(obj, locations, docker_images):
 438    if isinstance(obj, dict):
 439        loc = obj.get('location', None)
 440        if loc is not None:
 441            if loc.startswith("keep:"):
 442                locations.append(loc[5:])
 443
 444        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
 445        if docker_image is not None:
 446            ds = docker_image.split(":", 1)
 447            tag = ds[1] if len(ds)==2 else 'latest'
 448            docker_images[ds[0]] = tag
 449
 450        for x in obj:
 451            workflow_collections(obj[x], locations, docker_images)
 452    elif isinstance(obj, list):
 453        for x in obj:
 454            workflow_collections(x, locations, docker_images)
 455
 456# copy_collections(obj, src, dst, args)
 457#
 458#    Recursively copies all collections referenced by 'obj' from src
 459#    to dst.  obj may be a dict or a list, in which case we run
 460#    copy_collections on every value it contains. If it is a string,
 461#    search it for any substring that matches a collection hash or uuid
 462#    (this will find hidden references to collections like
 463#      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
 464#
 465#    Returns a copy of obj with any old collection uuids replaced by
 466#    the new ones.
 467#
 468def copy_collections(obj, src, dst, args):
 469
 470    def copy_collection_fn(collection_match):
 471        """Helper function for regex substitution: copies a single collection,
 472        identified by the collection_match MatchObject, to the
 473        destination.  Returns the destination collection uuid (or the
 474        portable data hash if that's what src_id is).
 475
 476        """
 477        src_id = collection_match.group(0)
 478        if src_id not in collections_copied:
 479            dst_col = copy_collection(src_id, src, dst, args)
 480            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
 481                collections_copied[src_id] = src_id
 482            else:
 483                collections_copied[src_id] = dst_col['uuid']
 484        return collections_copied[src_id]
 485
 486    if isinstance(obj, str):
 487        # Copy any collections identified in this string to dst, replacing
 488        # them with the dst uuids as necessary.
 489        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
 490        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
 491        return obj
 492    elif isinstance(obj, dict):
 493        return type(obj)((v, copy_collections(obj[v], src, dst, args))
 494                         for v in obj)
 495    elif isinstance(obj, list):
 496        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
 497    return obj
 498
 499
 500def total_collection_size(manifest_text):
 501    """Return the total number of bytes in this collection (excluding
 502    duplicate blocks)."""
 503
 504    total_bytes = 0
 505    locators_seen = {}
 506    for line in manifest_text.splitlines():
 507        words = line.split()
 508        for word in words[1:]:
 509            try:
 510                loc = arvados.KeepLocator(word)
 511            except ValueError:
 512                continue  # this word isn't a locator, skip it
 513            if loc.md5sum not in locators_seen:
 514                locators_seen[loc.md5sum] = True
 515                total_bytes += loc.size
 516
 517    return total_bytes
 518
 519def create_collection_from(c, src, dst, args):
 520    """Create a new collection record on dst, and copy Docker metadata if
 521    available."""
 522
 523    collection_uuid = c['uuid']
 524    body = {}
 525    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
 526        body[d] = c[d]
 527
 528    if not body["name"]:
 529        body['name'] = "copied from " + collection_uuid
 530
 531    if args.storage_classes:
 532        body['storage_classes_desired'] = args.storage_classes
 533
 534    body['owner_uuid'] = args.project_uuid
 535
 536    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
 537
 538    # Create docker_image_repo+tag and docker_image_hash links
 539    # at the destination.
 540    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
 541        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
 542
 543        for src_link in docker_links:
 544            body = {key: src_link[key]
 545                    for key in ['link_class', 'name', 'properties']}
 546            body['head_uuid'] = dst_collection['uuid']
 547            body['owner_uuid'] = args.project_uuid
 548
 549            lk = dst.links().create(body=body).execute(num_retries=args.retries)
 550            logger.debug('created dst link {}'.format(lk))
 551
 552    return dst_collection
 553
 554# copy_collection(obj_uuid, src, dst, args)
 555#
 556#    Copies the collection identified by obj_uuid from src to dst.
 557#    Returns the collection object created at dst.
 558#
 559#    If args.progress is True, produce a human-friendly progress
 560#    report.
 561#
 562#    If a collection with the desired portable_data_hash already
 563#    exists at dst, and args.force is False, copy_collection returns
 564#    the existing collection without copying any blocks.  Otherwise
 565#    (if no collection exists or if args.force is True)
 566#    copy_collection copies all of the collection data blocks from src
 567#    to dst.
 568#
 569#    For this application, it is critical to preserve the
 570#    collection's manifest hash, which is not guaranteed with the
 571#    arvados.CollectionReader and arvados.CollectionWriter classes.
 572#    Copying each block in the collection manually, followed by
 573#    the manifest block, ensures that the collection's manifest
 574#    hash will not change.
 575#
 576def copy_collection(obj_uuid, src, dst, args):
 577    if arvados.util.keep_locator_pattern.match(obj_uuid):
 578        # If the obj_uuid is a portable data hash, it might not be
 579        # uniquely identified with a particular collection.  As a
 580        # result, it is ambiguous as to what name to use for the copy.
 581        # Apply some heuristics to pick which collection to get the
 582        # name from.
 583        srccol = src.collections().list(
 584            filters=[['portable_data_hash', '=', obj_uuid]],
 585            order="created_at asc"
 586            ).execute(num_retries=args.retries)
 587
 588        items = srccol.get("items")
 589
 590        if not items:
 591            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
 592            return
 593
 594        c = None
 595
 596        if len(items) == 1:
 597            # There's only one collection with the PDH, so use that.
 598            c = items[0]
 599        if not c:
 600            # See if there is a collection that's in the same project
 601            # as the root item (usually a workflow) being copied.
 602            for i in items:
 603                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
 604                    c = i
 605                    break
 606        if not c:
 607            # Didn't find any collections located in the same project, so
 608            # pick the oldest collection that has a name assigned to it.
 609            for i in items:
 610                if i.get("name"):
 611                    c = i
 612                    break
 613        if not c:
 614            # None of the collections have names (?!), so just pick the
 615            # first one.
 616            c = items[0]
 617
 618        # list() doesn't return manifest text (and we don't want it to,
 619        # because we don't need the same maninfest text sent to us 50
 620        # times) so go and retrieve the collection object directly
 621        # which will include the manifest text.
 622        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
 623    else:
 624        # Assume this is an actual collection uuid, so fetch it directly.
 625        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
 626
 627    # If a collection with this hash already exists at the
 628    # destination, and 'force' is not true, just return that
 629    # collection.
 630    if not args.force:
 631        if 'portable_data_hash' in c:
 632            colhash = c['portable_data_hash']
 633        else:
 634            colhash = c['uuid']
 635        dstcol = dst.collections().list(
 636            filters=[['portable_data_hash', '=', colhash]]
 637        ).execute(num_retries=args.retries)
 638        if dstcol['items_available'] > 0:
 639            for d in dstcol['items']:
 640                if ((args.project_uuid == d['owner_uuid']) and
 641                    (c.get('name') == d['name']) and
 642                    (c['portable_data_hash'] == d['portable_data_hash'])):
 643                    return d
 644            c['manifest_text'] = dst.collections().get(
 645                uuid=dstcol['items'][0]['uuid']
 646            ).execute(num_retries=args.retries)['manifest_text']
 647            return create_collection_from(c, src, dst, args)
 648
 649    if args.replication is None:
 650        # Obtain default or fallback collection replication setting on the
 651        # destination
 652        try:
 653            args.replication = int(dst.config()["Collections"]["DefaultReplication"])
 654        except (KeyError, TypeError, ValueError):
 655            args.replication = 2
 656
 657    # Fetch the collection's manifest.
 658    manifest = c['manifest_text']
 659    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
 660
 661    # Copy each block from src_keep to dst_keep.
 662    # Use the newly signed locators returned from dst_keep to build
 663    # a new manifest as we go.
 664    src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
 665    dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
 666    dst_manifest = io.StringIO()
 667    dst_locators = {}
 668    bytes_written = 0
 669    bytes_expected = total_collection_size(manifest)
 670    if args.progress:
 671        progress_writer = ProgressWriter(human_progress)
 672    else:
 673        progress_writer = None
 674
 675    # go through the words
 676    # put each block loc into 'get' queue
 677    # 'get' threads get block and put it into 'put' queue
 678    # 'put' threads put block and then update dst_locators
 679    #
 680    # after going through the whole manifest we go back through it
 681    # again and build dst_manifest
 682
 683    lock = threading.Lock()
 684
 685    # the get queue should be unbounded because we'll add all the
 686    # block hashes we want to get, but these are small
 687    get_queue = queue.Queue()
 688
 689    threadcount = 4
 690
 691    # the put queue contains full data blocks
 692    # and if 'get' is faster than 'put' we could end up consuming
 693    # a great deal of RAM if it isn't bounded.
 694    put_queue = queue.Queue(threadcount)
 695    transfer_error = []
 696
 697    def get_thread():
 698        while True:
 699            word = get_queue.get()
 700            if word is None:
 701                put_queue.put(None)
 702                get_queue.task_done()
 703                return
 704
 705            blockhash = arvados.KeepLocator(word).md5sum
 706            with lock:
 707                if blockhash in dst_locators:
 708                    # Already uploaded
 709                    get_queue.task_done()
 710                    continue
 711
 712            try:
 713                logger.debug("Getting block %s", word)
 714                data = src_keep.get(word)
 715                put_queue.put((word, data))
 716            except Exception as e:
 717                logger.error("Error getting block %s: %s", word, e)
 718                transfer_error.append(e)
 719                try:
 720                    # Drain the 'get' queue so we end early
 721                    while True:
 722                        get_queue.get(False)
 723                        get_queue.task_done()
 724                except queue.Empty:
 725                    pass
 726            finally:
 727                get_queue.task_done()
 728
 729    def put_thread():
 730        nonlocal bytes_written
 731        while True:
 732            item = put_queue.get()
 733            if item is None:
 734                put_queue.task_done()
 735                return
 736
 737            word, data = item
 738            loc = arvados.KeepLocator(word)
 739            blockhash = loc.md5sum
 740            with lock:
 741                if blockhash in dst_locators:
 742                    # Already uploaded
 743                    put_queue.task_done()
 744                    continue
 745
 746            try:
 747                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
 748                dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or []))
 749                with lock:
 750                    dst_locators[blockhash] = dst_locator
 751                    bytes_written += loc.size
 752                    if progress_writer:
 753                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
 754            except Exception as e:
 755                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
 756                try:
 757                    # Drain the 'get' queue so we end early
 758                    while True:
 759                        get_queue.get(False)
 760                        get_queue.task_done()
 761                except queue.Empty:
 762                    pass
 763                transfer_error.append(e)
 764            finally:
 765                put_queue.task_done()
 766
 767    for line in manifest.splitlines():
 768        words = line.split()
 769        for word in words[1:]:
 770            try:
 771                loc = arvados.KeepLocator(word)
 772            except ValueError:
 773                # If 'word' can't be parsed as a locator,
 774                # presume it's a filename.
 775                continue
 776
 777            get_queue.put(word)
 778
 779    for i in range(0, threadcount):
 780        get_queue.put(None)
 781
 782    for i in range(0, threadcount):
 783        threading.Thread(target=get_thread, daemon=True).start()
 784
 785    for i in range(0, threadcount):
 786        threading.Thread(target=put_thread, daemon=True).start()
 787
 788    get_queue.join()
 789    put_queue.join()
 790
 791    if len(transfer_error) > 0:
 792        return {"error_token": "Failed to transfer blocks"}
 793
 794    for line in manifest.splitlines():
 795        words = line.split()
 796        dst_manifest.write(words[0])
 797        for word in words[1:]:
 798            try:
 799                loc = arvados.KeepLocator(word)
 800            except ValueError:
 801                # If 'word' can't be parsed as a locator,
 802                # presume it's a filename.
 803                dst_manifest.write(' ')
 804                dst_manifest.write(word)
 805                continue
 806            blockhash = loc.md5sum
 807            dst_manifest.write(' ')
 808            dst_manifest.write(dst_locators[blockhash])
 809        dst_manifest.write("\n")
 810
 811    if progress_writer:
 812        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
 813        progress_writer.finish()
 814
 815    # Copy the manifest and save the collection.
 816    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
 817
 818    c['manifest_text'] = dst_manifest.getvalue()
 819    return create_collection_from(c, src, dst, args)
 820
 821def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
 822    """Copy the docker image identified by docker_image and
 823    docker_image_tag from src to dst. Create appropriate
 824    docker_image_repo+tag and docker_image_hash links at dst.
 825
 826    """
 827
 828    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
 829
 830    # Find the link identifying this docker image.
 831    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
 832        src, args.retries, docker_image, docker_image_tag)
 833    if docker_image_list:
 834        image_uuid, image_info = docker_image_list[0]
 835        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
 836
 837        # Copy the collection it refers to.
 838        dst_image_col = copy_collection(image_uuid, src, dst, args)
 839    elif arvados.util.keep_locator_pattern.match(docker_image):
 840        dst_image_col = copy_collection(docker_image, src, dst, args)
 841    else:
 842        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
 843
 844def copy_project(obj_uuid, src, dst, owner_uuid, args):
 845
 846    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
 847
 848    # Create/update the destination project
 849    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
 850                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
 851    if len(existing["items"]) == 0:
 852        project_record = dst.groups().create(body={"group": {"group_class": "project",
 853                                                             "owner_uuid": owner_uuid,
 854                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
 855    else:
 856        project_record = existing["items"][0]
 857
 858    dst.groups().update(uuid=project_record["uuid"],
 859                        body={"group": {
 860                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
 861
 862    args.project_uuid = project_record["uuid"]
 863
 864    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
 865
 866
 867    partial_error = ""
 868
 869    # Copy collections
 870    try:
 871        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
 872                         src, dst, args)
 873    except Exception as e:
 874        partial_error += "\n" + str(e)
 875
 876    # Copy workflows
 877    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
 878        try:
 879            copy_workflow(w["uuid"], src, dst, args)
 880        except Exception as e:
 881            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
 882
 883    if args.recursive:
 884        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
 885            try:
 886                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
 887            except Exception as e:
 888                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
 889
 890    project_record["partial_error"] = partial_error
 891
 892    return project_record
 893
 894# git_rev_parse(rev, repo)
 895#
 896#    Returns the 40-character commit hash corresponding to 'rev' in
 897#    git repository 'repo' (which must be the path of a local git
 898#    repository)
 899#
 900def git_rev_parse(rev, repo):
 901    proc = subprocess.run(
 902        ['git', 'rev-parse', rev],
 903        check=True,
 904        cwd=repo,
 905        stdout=subprocess.PIPE,
 906        text=True,
 907    )
 908    return proc.stdout.read().strip()
 909
 910# uuid_type(api, object_uuid)
 911#
 912#    Returns the name of the class that object_uuid belongs to, based on
 913#    the second field of the uuid.  This function consults the api's
 914#    schema to identify the object class.
 915#
 916#    It returns a string such as 'Collection', 'Workflow', etc.
 917#
 918#    Special case: if handed a Keep locator hash, return 'Collection'.
 919#
 920def uuid_type(api, object_uuid):
 921    if re.match(arvados.util.keep_locator_pattern, object_uuid):
 922        return 'Collection'
 923
 924    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
 925        return 'httpURL'
 926
 927    if object_uuid.startswith("s3:"):
 928        return 's3URL'
 929
 930    p = object_uuid.split('-')
 931    if len(p) == 3:
 932        type_prefix = p[1]
 933        for k in api._schema.schemas:
 934            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
 935            if type_prefix == obj_class:
 936                return k
 937    return None
 938
 939
 940def copy_from_url(url, src, dst, args):
 941
 942    project_uuid = args.project_uuid
 943    # Ensure string of varying parameters is well-formed
 944    prefer_cached_downloads = args.prefer_cached_downloads
 945
 946    cached = to_keep_util.CheckCacheResult(None, None, None, None, None)
 947
 948    if url.startswith("http:") or url.startswith("https:"):
 949        cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
 950                                               varying_url_params=args.varying_url_params,
 951                                               prefer_cached_downloads=prefer_cached_downloads)
 952    elif url.startswith("s3:"):
 953        import boto3.session
 954        botosession = boto3.session.Session()
 955        cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {},
 956                                             prefer_cached_downloads=prefer_cached_downloads)
 957
 958    if cached[2] is not None:
 959        return copy_collection(cached[2], src, dst, args)
 960
 961    if url.startswith("http:") or url.startswith("https:"):
 962        cached = http_to_keep.http_to_keep(dst, project_uuid, url,
 963                                           varying_url_params=args.varying_url_params,
 964                                           prefer_cached_downloads=prefer_cached_downloads)
 965    elif url.startswith("s3:"):
 966        cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url,
 967                                       prefer_cached_downloads=prefer_cached_downloads)
 968
 969    if cached is not None:
 970        return {"uuid": cached[2]}
 971
 972
 973def abort(msg, code=1):
 974    logger.info("arv-copy: %s", msg)
 975    exit(code)
 976
 977
 978# Code for reporting on the progress of a collection upload.
 979# Stolen from arvados.commands.put.ArvPutCollectionWriter
 980# TODO(twp): figure out how to refactor into a shared library
 981# (may involve refactoring some arvados.commands.arv_copy.copy_collection
 982# code)
 983
 984def machine_progress(obj_uuid, bytes_written, bytes_expected):
 985    return "{} {}: {} {} written {} total\n".format(
 986        sys.argv[0],
 987        os.getpid(),
 988        obj_uuid,
 989        bytes_written,
 990        -1 if (bytes_expected is None) else bytes_expected)
 991
 992def human_progress(obj_uuid, bytes_written, bytes_expected):
 993    if bytes_expected:
 994        return "\r{}: {}M / {}M {:.1%} ".format(
 995            obj_uuid,
 996            bytes_written >> 20, bytes_expected >> 20,
 997            float(bytes_written) / bytes_expected)
 998    else:
 999        return "\r{}: {} ".format(obj_uuid, bytes_written)
1000
1001class ProgressWriter(object):
1002    _progress_func = None
1003    outfile = sys.stderr
1004
1005    def __init__(self, progress_func):
1006        self._progress_func = progress_func
1007
1008    def report(self, obj_uuid, bytes_written, bytes_expected):
1009        if self._progress_func is not None:
1010            self.outfile.write(
1011                self._progress_func(obj_uuid, bytes_written, bytes_expected))
1012
1013    def finish(self):
1014        self.outfile.write("\n")
1015
1016if __name__ == '__main__':
1017    main()
COMMIT_HASH_RE = re.compile('^[0-9a-f]{1,40}$')
arvlogger = <Logger arvados (WARNING)>
keeplogger = <Logger arvados.keep (WARNING)>
logger = <Logger arvados.arv-copy (WARNING)>
googleapi_logger = <Logger googleapiclient.http (WARNING)>
local_repo_dir = {}
collections_copied = {}
scripts_copied = set()
src_owner_uuid = None
def main():
 80def main():
 81    copy_opts = argparse.ArgumentParser(add_help=False)
 82
 83    copy_opts.add_argument(
 84        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
 85        help='Print version and exit.')
 86    copy_opts.add_argument(
 87        '-v', '--verbose', dest='verbose', action='store_true',
 88        help='Verbose output.')
 89    copy_opts.add_argument(
 90        '--progress', dest='progress', action='store_true',
 91        help='Report progress on copying collections. (default)')
 92    copy_opts.add_argument(
 93        '--no-progress', dest='progress', action='store_false',
 94        help='Do not report progress on copying collections.')
 95    copy_opts.add_argument(
 96        '-f', '--force', dest='force', action='store_true',
 97        help='Perform copy even if the object appears to exist at the remote destination.')
 98    copy_opts.add_argument(
 99        '--src', dest='source_arvados',
100        help="""
101Client configuration location for the source Arvados cluster.
102May be either a configuration file path, or a plain identifier like `foo`
103to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
104If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
105""",
106    )
107    copy_opts.add_argument(
108        '--dst', dest='destination_arvados',
109        help="""
110Client configuration location for the destination Arvados cluster.
111May be either a configuration file path, or a plain identifier like `foo`
112to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
113If not provided, will use the default client configuration from the environment or `settings.conf`.
114""",
115    )
116    copy_opts.add_argument(
117        '--recursive', dest='recursive', action='store_true',
118        help='Recursively copy any dependencies for this object, and subprojects. (default)')
119    copy_opts.add_argument(
120        '--no-recursive', dest='recursive', action='store_false',
121        help='Do not copy any dependencies or subprojects.')
122    copy_opts.add_argument(
123        '--project-uuid', dest='project_uuid',
124        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
125    copy_opts.add_argument(
126        '--replication',
127        type=arv_cmd.RangedValue(int, range(1, sys.maxsize)),
128        metavar='N',
129        help="""
130Number of replicas per storage class for the copied collections at the destination.
131If not provided (or if provided with invalid value),
132use the destination's default replication-level setting (if found),
133or the fallback value 2.
134""")
135    copy_opts.add_argument(
136        '--storage-classes',
137        type=arv_cmd.UniqueSplit(),
138        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
139    copy_opts.add_argument("--varying-url-params", type=str, default="",
140                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
141
142    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
143                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
144
145    copy_opts.add_argument(
146        'object_uuid',
147        help='The UUID of the object to be copied.')
148    copy_opts.set_defaults(progress=True)
149    copy_opts.set_defaults(recursive=True)
150
151    parser = argparse.ArgumentParser(
152        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
153        parents=[copy_opts, arv_cmd.retry_opt])
154    args = parser.parse_args()
155
156    if args.verbose:
157        arvlogger.setLevel(logging.DEBUG)
158    else:
159        arvlogger.setLevel(logging.INFO)
160        keeplogger.setLevel(logging.WARNING)
161
162    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
163        args.source_arvados = args.object_uuid[:5]
164
165    if not args.destination_arvados and args.project_uuid:
166        args.destination_arvados = args.project_uuid[:5]
167
168    # Make sure errors trying to connect to clusters get logged.
169    googleapi_logger.setLevel(logging.WARN)
170    googleapi_logger.addHandler(log_handler)
171
172    # Create API clients for the source and destination instances
173    src_arv = api_for_instance(args.source_arvados, args.retries)
174    dst_arv = api_for_instance(args.destination_arvados, args.retries)
175
176    # Once we've successfully contacted the clusters, we probably
177    # don't want to see logging about retries (unless the user asked
178    # for verbose output).
179    if not args.verbose:
180        googleapi_logger.setLevel(logging.ERROR)
181
182    if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]:
183        logger.info("Copying within cluster %s", src_arv.config()["ClusterID"])
184    else:
185        logger.info("Source cluster is %s", src_arv.config()["ClusterID"])
186        logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"])
187
188    if not args.project_uuid:
189        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
190
191    # Identify the kind of object we have been given, and begin copying.
192    t = uuid_type(src_arv, args.object_uuid)
193
194    try:
195        if t == 'Collection':
196            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
197            result = copy_collection(args.object_uuid,
198                                     src_arv, dst_arv,
199                                     args)
200        elif t == 'Workflow':
201            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
202            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
203        elif t == 'Group':
204            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
205            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
206        elif t == 'httpURL' or t == 's3URL':
207            result = copy_from_url(args.object_uuid, src_arv, dst_arv, args)
208        else:
209            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
210    except Exception as e:
211        logger.error("%s", e, exc_info=args.verbose)
212        exit(1)
213
214    # Clean up any outstanding temp git repositories.
215    for d in local_repo_dir.values():
216        shutil.rmtree(d, ignore_errors=True)
217
218    if not result:
219        exit(1)
220
221    # If no exception was thrown and the response does not have an
222    # error_token field, presume success
223    if result is None or 'error_token' in result or 'uuid' not in result:
224        if result:
225            logger.error("API server returned an error result: {}".format(result))
226        exit(1)
227
228    print(result['uuid'])
229
230    if result.get('partial_error'):
231        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
232        exit(1)
233
234    logger.info("Success: created copy with uuid {}".format(result['uuid']))
235    exit(0)
def set_src_owner_uuid(resource, uuid, args):
237def set_src_owner_uuid(resource, uuid, args):
238    global src_owner_uuid
239    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
240    src_owner_uuid = c.get("owner_uuid")
def api_for_instance(instance_name, num_retries):
254def api_for_instance(instance_name, num_retries):
255    msg = []
256    if instance_name:
257        if '/' in instance_name:
258            config_file = instance_name
259        else:
260            dirs = basedirs.BaseDirectories('CONFIG')
261            config_file = next(dirs.search(f'{instance_name}.conf'), '')
262
263        try:
264            cfg = arvados.config.load(config_file)
265
266            if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
267                api_is_insecure = (
268                    cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
269                        ['1', 't', 'true', 'y', 'yes']))
270                return arvados.api('v1',
271                                     host=cfg['ARVADOS_API_HOST'],
272                                     token=cfg['ARVADOS_API_TOKEN'],
273                                     insecure=api_is_insecure,
274                                     num_retries=num_retries,
275                                     )
276            else:
277                msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file))
278        except OSError as e:
279            if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH):
280                verb = 'connect to instance from'
281            elif config_file:
282                verb = 'open'
283            else:
284                verb = 'find'
285                searchlist = ":".join(str(p) for p in dirs.search_paths())
286                config_file = f'{instance_name}.conf in path {searchlist}'
287            msg.append(("Could not {} config file {}: {}").format(
288                       verb, config_file, e.strerror))
289        except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e:
290            msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e))
291
292    default_api = None
293    default_instance = None
294    try:
295        default_api = arvados.api('v1', num_retries=num_retries)
296        default_instance = default_api.config()["ClusterID"]
297    except ValueError:
298        pass
299    except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e:
300        msg.append("Failed to connect to default instance, error was {}".format(e))
301
302    if default_api is not None and (not instance_name or instance_name == default_instance):
303        # Use default settings
304        return default_api
305
306    if instance_name and default_instance and instance_name != default_instance:
307        msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name))
308
309    for m in msg:
310        logger.error(m)
311
312    abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
def check_git_availability():
315def check_git_availability():
316    try:
317        subprocess.run(
318            ['git', '--version'],
319            check=True,
320            stdout=subprocess.DEVNULL,
321        )
322    except FileNotFoundError:
323        abort('git command is not available. Please ensure git is installed.')
def filter_iter(arg):
326def filter_iter(arg):
327    """Iterate a filter string-or-list.
328
329    Pass in a filter field that can either be a string or list.
330    This will iterate elements as if the field had been written as a list.
331    """
332    if isinstance(arg, str):
333        yield arg
334    else:
335        yield from arg

Iterate a filter string-or-list.

Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.

def migrate_repository_filter(repo_filter, src_repository, dst_repository):
337def migrate_repository_filter(repo_filter, src_repository, dst_repository):
338    """Update a single repository filter in-place for the destination.
339
340    If the filter checks that the repository is src_repository, it is
341    updated to check that the repository is dst_repository.  If it does
342    anything else, this function raises ValueError.
343    """
344    if src_repository is None:
345        raise ValueError("component does not specify a source repository")
346    elif dst_repository is None:
347        raise ValueError("no destination repository specified to update repository filter")
348    elif repo_filter[1:] == ['=', src_repository]:
349        repo_filter[2] = dst_repository
350    elif repo_filter[1:] == ['in', [src_repository]]:
351        repo_filter[2] = [dst_repository]
352    else:
353        raise ValueError("repository filter is not a simple source match")

Update a single repository filter in-place for the destination.

If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.

def migrate_script_version_filter(version_filter):
355def migrate_script_version_filter(version_filter):
356    """Update a single script_version filter in-place for the destination.
357
358    Currently this function checks that all the filter operands are Git
359    commit hashes.  If they're not, it raises ValueError to indicate that
360    the filter is not portable.  It could be extended to make other
361    transformations in the future.
362    """
363    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
364        raise ValueError("script_version filter is not limited to commit hashes")

Update a single script_version filter in-place for the destination.

Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.

def attr_filtered(filter_, *attr_names):
366def attr_filtered(filter_, *attr_names):
367    """Return True if filter_ applies to any of attr_names, else False."""
368    return any((name == 'any') or (name in attr_names)
369               for name in filter_iter(filter_[0]))

Return True if filter_ applies to any of attr_names, else False.

@contextlib.contextmanager
def exception_handler(handler, *exc_types):
371@contextlib.contextmanager
372def exception_handler(handler, *exc_types):
373    """If any exc_types are raised in the block, call handler on the exception."""
374    try:
375        yield
376    except exc_types as error:
377        handler(error)

If any exc_types are raised in the block, call handler on the exception.

def copy_workflow(wf_uuid, src, dst, args):
392def copy_workflow(wf_uuid, src, dst, args):
393    # fetch the workflow from the source instance
394    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
395
396    if not wf["definition"]:
397        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
398
399    # copy collections and docker images
400    if args.recursive and wf["definition"]:
401        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
402               "ARVADOS_API_TOKEN": src.api_token,
403               "PATH": os.environ["PATH"]}
404        try:
405            result = subprocess.run(
406                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
407                env=env,
408                stdout=subprocess.PIPE,
409                universal_newlines=True,
410            )
411        except FileNotFoundError:
412            no_arv_copy = True
413        else:
414            no_arv_copy = result.returncode == 2
415
416        if no_arv_copy:
417            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
418        elif result.returncode != 0:
419            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
420
421        locations = json.loads(result.stdout)
422
423        if locations:
424            copy_collections(locations, src, dst, args)
425
426    # copy the workflow itself
427    del wf['uuid']
428    wf['owner_uuid'] = args.project_uuid
429
430    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
431                                             ["name", "=", wf["name"]]]).execute()
432    if len(existing["items"]) == 0:
433        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
434    else:
435        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
def workflow_collections(obj, locations, docker_images):
438def workflow_collections(obj, locations, docker_images):
439    if isinstance(obj, dict):
440        loc = obj.get('location', None)
441        if loc is not None:
442            if loc.startswith("keep:"):
443                locations.append(loc[5:])
444
445        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
446        if docker_image is not None:
447            ds = docker_image.split(":", 1)
448            tag = ds[1] if len(ds)==2 else 'latest'
449            docker_images[ds[0]] = tag
450
451        for x in obj:
452            workflow_collections(obj[x], locations, docker_images)
453    elif isinstance(obj, list):
454        for x in obj:
455            workflow_collections(x, locations, docker_images)
def copy_collections(obj, src, dst, args):
469def copy_collections(obj, src, dst, args):
470
471    def copy_collection_fn(collection_match):
472        """Helper function for regex substitution: copies a single collection,
473        identified by the collection_match MatchObject, to the
474        destination.  Returns the destination collection uuid (or the
475        portable data hash if that's what src_id is).
476
477        """
478        src_id = collection_match.group(0)
479        if src_id not in collections_copied:
480            dst_col = copy_collection(src_id, src, dst, args)
481            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
482                collections_copied[src_id] = src_id
483            else:
484                collections_copied[src_id] = dst_col['uuid']
485        return collections_copied[src_id]
486
487    if isinstance(obj, str):
488        # Copy any collections identified in this string to dst, replacing
489        # them with the dst uuids as necessary.
490        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
491        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
492        return obj
493    elif isinstance(obj, dict):
494        return type(obj)((v, copy_collections(obj[v], src, dst, args))
495                         for v in obj)
496    elif isinstance(obj, list):
497        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
498    return obj
def total_collection_size(manifest_text):
501def total_collection_size(manifest_text):
502    """Return the total number of bytes in this collection (excluding
503    duplicate blocks)."""
504
505    total_bytes = 0
506    locators_seen = {}
507    for line in manifest_text.splitlines():
508        words = line.split()
509        for word in words[1:]:
510            try:
511                loc = arvados.KeepLocator(word)
512            except ValueError:
513                continue  # this word isn't a locator, skip it
514            if loc.md5sum not in locators_seen:
515                locators_seen[loc.md5sum] = True
516                total_bytes += loc.size
517
518    return total_bytes

Return the total number of bytes in this collection (excluding duplicate blocks).

def create_collection_from(c, src, dst, args):
520def create_collection_from(c, src, dst, args):
521    """Create a new collection record on dst, and copy Docker metadata if
522    available."""
523
524    collection_uuid = c['uuid']
525    body = {}
526    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
527        body[d] = c[d]
528
529    if not body["name"]:
530        body['name'] = "copied from " + collection_uuid
531
532    if args.storage_classes:
533        body['storage_classes_desired'] = args.storage_classes
534
535    body['owner_uuid'] = args.project_uuid
536
537    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
538
539    # Create docker_image_repo+tag and docker_image_hash links
540    # at the destination.
541    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
542        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
543
544        for src_link in docker_links:
545            body = {key: src_link[key]
546                    for key in ['link_class', 'name', 'properties']}
547            body['head_uuid'] = dst_collection['uuid']
548            body['owner_uuid'] = args.project_uuid
549
550            lk = dst.links().create(body=body).execute(num_retries=args.retries)
551            logger.debug('created dst link {}'.format(lk))
552
553    return dst_collection

Create a new collection record on dst, and copy Docker metadata if available.

def copy_collection(obj_uuid, src, dst, args):
577def copy_collection(obj_uuid, src, dst, args):
578    if arvados.util.keep_locator_pattern.match(obj_uuid):
579        # If the obj_uuid is a portable data hash, it might not be
580        # uniquely identified with a particular collection.  As a
581        # result, it is ambiguous as to what name to use for the copy.
582        # Apply some heuristics to pick which collection to get the
583        # name from.
584        srccol = src.collections().list(
585            filters=[['portable_data_hash', '=', obj_uuid]],
586            order="created_at asc"
587            ).execute(num_retries=args.retries)
588
589        items = srccol.get("items")
590
591        if not items:
592            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
593            return
594
595        c = None
596
597        if len(items) == 1:
598            # There's only one collection with the PDH, so use that.
599            c = items[0]
600        if not c:
601            # See if there is a collection that's in the same project
602            # as the root item (usually a workflow) being copied.
603            for i in items:
604                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
605                    c = i
606                    break
607        if not c:
608            # Didn't find any collections located in the same project, so
609            # pick the oldest collection that has a name assigned to it.
610            for i in items:
611                if i.get("name"):
612                    c = i
613                    break
614        if not c:
615            # None of the collections have names (?!), so just pick the
616            # first one.
617            c = items[0]
618
619        # list() doesn't return manifest text (and we don't want it to,
620        # because we don't need the same maninfest text sent to us 50
621        # times) so go and retrieve the collection object directly
622        # which will include the manifest text.
623        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
624    else:
625        # Assume this is an actual collection uuid, so fetch it directly.
626        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
627
628    # If a collection with this hash already exists at the
629    # destination, and 'force' is not true, just return that
630    # collection.
631    if not args.force:
632        if 'portable_data_hash' in c:
633            colhash = c['portable_data_hash']
634        else:
635            colhash = c['uuid']
636        dstcol = dst.collections().list(
637            filters=[['portable_data_hash', '=', colhash]]
638        ).execute(num_retries=args.retries)
639        if dstcol['items_available'] > 0:
640            for d in dstcol['items']:
641                if ((args.project_uuid == d['owner_uuid']) and
642                    (c.get('name') == d['name']) and
643                    (c['portable_data_hash'] == d['portable_data_hash'])):
644                    return d
645            c['manifest_text'] = dst.collections().get(
646                uuid=dstcol['items'][0]['uuid']
647            ).execute(num_retries=args.retries)['manifest_text']
648            return create_collection_from(c, src, dst, args)
649
650    if args.replication is None:
651        # Obtain default or fallback collection replication setting on the
652        # destination
653        try:
654            args.replication = int(dst.config()["Collections"]["DefaultReplication"])
655        except (KeyError, TypeError, ValueError):
656            args.replication = 2
657
658    # Fetch the collection's manifest.
659    manifest = c['manifest_text']
660    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
661
662    # Copy each block from src_keep to dst_keep.
663    # Use the newly signed locators returned from dst_keep to build
664    # a new manifest as we go.
665    src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
666    dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
667    dst_manifest = io.StringIO()
668    dst_locators = {}
669    bytes_written = 0
670    bytes_expected = total_collection_size(manifest)
671    if args.progress:
672        progress_writer = ProgressWriter(human_progress)
673    else:
674        progress_writer = None
675
676    # go through the words
677    # put each block loc into 'get' queue
678    # 'get' threads get block and put it into 'put' queue
679    # 'put' threads put block and then update dst_locators
680    #
681    # after going through the whole manifest we go back through it
682    # again and build dst_manifest
683
684    lock = threading.Lock()
685
686    # the get queue should be unbounded because we'll add all the
687    # block hashes we want to get, but these are small
688    get_queue = queue.Queue()
689
690    threadcount = 4
691
692    # the put queue contains full data blocks
693    # and if 'get' is faster than 'put' we could end up consuming
694    # a great deal of RAM if it isn't bounded.
695    put_queue = queue.Queue(threadcount)
696    transfer_error = []
697
698    def get_thread():
699        while True:
700            word = get_queue.get()
701            if word is None:
702                put_queue.put(None)
703                get_queue.task_done()
704                return
705
706            blockhash = arvados.KeepLocator(word).md5sum
707            with lock:
708                if blockhash in dst_locators:
709                    # Already uploaded
710                    get_queue.task_done()
711                    continue
712
713            try:
714                logger.debug("Getting block %s", word)
715                data = src_keep.get(word)
716                put_queue.put((word, data))
717            except Exception as e:
718                logger.error("Error getting block %s: %s", word, e)
719                transfer_error.append(e)
720                try:
721                    # Drain the 'get' queue so we end early
722                    while True:
723                        get_queue.get(False)
724                        get_queue.task_done()
725                except queue.Empty:
726                    pass
727            finally:
728                get_queue.task_done()
729
730    def put_thread():
731        nonlocal bytes_written
732        while True:
733            item = put_queue.get()
734            if item is None:
735                put_queue.task_done()
736                return
737
738            word, data = item
739            loc = arvados.KeepLocator(word)
740            blockhash = loc.md5sum
741            with lock:
742                if blockhash in dst_locators:
743                    # Already uploaded
744                    put_queue.task_done()
745                    continue
746
747            try:
748                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
749                dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or []))
750                with lock:
751                    dst_locators[blockhash] = dst_locator
752                    bytes_written += loc.size
753                    if progress_writer:
754                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
755            except Exception as e:
756                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
757                try:
758                    # Drain the 'get' queue so we end early
759                    while True:
760                        get_queue.get(False)
761                        get_queue.task_done()
762                except queue.Empty:
763                    pass
764                transfer_error.append(e)
765            finally:
766                put_queue.task_done()
767
768    for line in manifest.splitlines():
769        words = line.split()
770        for word in words[1:]:
771            try:
772                loc = arvados.KeepLocator(word)
773            except ValueError:
774                # If 'word' can't be parsed as a locator,
775                # presume it's a filename.
776                continue
777
778            get_queue.put(word)
779
780    for i in range(0, threadcount):
781        get_queue.put(None)
782
783    for i in range(0, threadcount):
784        threading.Thread(target=get_thread, daemon=True).start()
785
786    for i in range(0, threadcount):
787        threading.Thread(target=put_thread, daemon=True).start()
788
789    get_queue.join()
790    put_queue.join()
791
792    if len(transfer_error) > 0:
793        return {"error_token": "Failed to transfer blocks"}
794
795    for line in manifest.splitlines():
796        words = line.split()
797        dst_manifest.write(words[0])
798        for word in words[1:]:
799            try:
800                loc = arvados.KeepLocator(word)
801            except ValueError:
802                # If 'word' can't be parsed as a locator,
803                # presume it's a filename.
804                dst_manifest.write(' ')
805                dst_manifest.write(word)
806                continue
807            blockhash = loc.md5sum
808            dst_manifest.write(' ')
809            dst_manifest.write(dst_locators[blockhash])
810        dst_manifest.write("\n")
811
812    if progress_writer:
813        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
814        progress_writer.finish()
815
816    # Copy the manifest and save the collection.
817    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
818
819    c['manifest_text'] = dst_manifest.getvalue()
820    return create_collection_from(c, src, dst, args)
def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
822def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
823    """Copy the docker image identified by docker_image and
824    docker_image_tag from src to dst. Create appropriate
825    docker_image_repo+tag and docker_image_hash links at dst.
826
827    """
828
829    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
830
831    # Find the link identifying this docker image.
832    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
833        src, args.retries, docker_image, docker_image_tag)
834    if docker_image_list:
835        image_uuid, image_info = docker_image_list[0]
836        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
837
838        # Copy the collection it refers to.
839        dst_image_col = copy_collection(image_uuid, src, dst, args)
840    elif arvados.util.keep_locator_pattern.match(docker_image):
841        dst_image_col = copy_collection(docker_image, src, dst, args)
842    else:
843        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))

Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.

def copy_project(obj_uuid, src, dst, owner_uuid, args):
845def copy_project(obj_uuid, src, dst, owner_uuid, args):
846
847    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
848
849    # Create/update the destination project
850    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
851                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
852    if len(existing["items"]) == 0:
853        project_record = dst.groups().create(body={"group": {"group_class": "project",
854                                                             "owner_uuid": owner_uuid,
855                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
856    else:
857        project_record = existing["items"][0]
858
859    dst.groups().update(uuid=project_record["uuid"],
860                        body={"group": {
861                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
862
863    args.project_uuid = project_record["uuid"]
864
865    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
866
867
868    partial_error = ""
869
870    # Copy collections
871    try:
872        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
873                         src, dst, args)
874    except Exception as e:
875        partial_error += "\n" + str(e)
876
877    # Copy workflows
878    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
879        try:
880            copy_workflow(w["uuid"], src, dst, args)
881        except Exception as e:
882            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
883
884    if args.recursive:
885        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
886            try:
887                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
888            except Exception as e:
889                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
890
891    project_record["partial_error"] = partial_error
892
893    return project_record
def git_rev_parse(rev, repo):
901def git_rev_parse(rev, repo):
902    proc = subprocess.run(
903        ['git', 'rev-parse', rev],
904        check=True,
905        cwd=repo,
906        stdout=subprocess.PIPE,
907        text=True,
908    )
909    return proc.stdout.read().strip()
def uuid_type(api, object_uuid):
921def uuid_type(api, object_uuid):
922    if re.match(arvados.util.keep_locator_pattern, object_uuid):
923        return 'Collection'
924
925    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
926        return 'httpURL'
927
928    if object_uuid.startswith("s3:"):
929        return 's3URL'
930
931    p = object_uuid.split('-')
932    if len(p) == 3:
933        type_prefix = p[1]
934        for k in api._schema.schemas:
935            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
936            if type_prefix == obj_class:
937                return k
938    return None
def copy_from_url(url, src, dst, args):
941def copy_from_url(url, src, dst, args):
942
943    project_uuid = args.project_uuid
944    # Ensure string of varying parameters is well-formed
945    prefer_cached_downloads = args.prefer_cached_downloads
946
947    cached = to_keep_util.CheckCacheResult(None, None, None, None, None)
948
949    if url.startswith("http:") or url.startswith("https:"):
950        cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
951                                               varying_url_params=args.varying_url_params,
952                                               prefer_cached_downloads=prefer_cached_downloads)
953    elif url.startswith("s3:"):
954        import boto3.session
955        botosession = boto3.session.Session()
956        cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {},
957                                             prefer_cached_downloads=prefer_cached_downloads)
958
959    if cached[2] is not None:
960        return copy_collection(cached[2], src, dst, args)
961
962    if url.startswith("http:") or url.startswith("https:"):
963        cached = http_to_keep.http_to_keep(dst, project_uuid, url,
964                                           varying_url_params=args.varying_url_params,
965                                           prefer_cached_downloads=prefer_cached_downloads)
966    elif url.startswith("s3:"):
967        cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url,
968                                       prefer_cached_downloads=prefer_cached_downloads)
969
970    if cached is not None:
971        return {"uuid": cached[2]}
def abort(msg, code=1):
974def abort(msg, code=1):
975    logger.info("arv-copy: %s", msg)
976    exit(code)
def machine_progress(obj_uuid, bytes_written, bytes_expected):
985def machine_progress(obj_uuid, bytes_written, bytes_expected):
986    return "{} {}: {} {} written {} total\n".format(
987        sys.argv[0],
988        os.getpid(),
989        obj_uuid,
990        bytes_written,
991        -1 if (bytes_expected is None) else bytes_expected)
def human_progress(obj_uuid, bytes_written, bytes_expected):
 993def human_progress(obj_uuid, bytes_written, bytes_expected):
 994    if bytes_expected:
 995        return "\r{}: {}M / {}M {:.1%} ".format(
 996            obj_uuid,
 997            bytes_written >> 20, bytes_expected >> 20,
 998            float(bytes_written) / bytes_expected)
 999    else:
1000        return "\r{}: {} ".format(obj_uuid, bytes_written)
class ProgressWriter:
1002class ProgressWriter(object):
1003    _progress_func = None
1004    outfile = sys.stderr
1005
1006    def __init__(self, progress_func):
1007        self._progress_func = progress_func
1008
1009    def report(self, obj_uuid, bytes_written, bytes_expected):
1010        if self._progress_func is not None:
1011            self.outfile.write(
1012                self._progress_func(obj_uuid, bytes_written, bytes_expected))
1013
1014    def finish(self):
1015        self.outfile.write("\n")
ProgressWriter(progress_func)
1006    def __init__(self, progress_func):
1007        self._progress_func = progress_func
outfile = <_io.TextIOWrapper encoding='UTF-8'>
def report(self, obj_uuid, bytes_written, bytes_expected):
1009    def report(self, obj_uuid, bytes_written, bytes_expected):
1010        if self._progress_func is not None:
1011            self.outfile.write(
1012                self._progress_func(obj_uuid, bytes_written, bytes_expected))
def finish(self):
1014    def finish(self):
1015        self.outfile.write("\n")