arvados.commands.arv_copy

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5# arv-copy [--recursive] [--no-recursive] object-uuid
  6#
  7# Copies an object from Arvados instance src to instance dst.
  8#
  9# By default, arv-copy recursively copies any dependent objects
 10# necessary to make the object functional in the new instance
 11# (e.g. for a workflow, arv-copy copies the workflow,
 12# input collections, and docker images). If
 13# --no-recursive is given, arv-copy copies only the single record
 14# identified by object-uuid.
 15#
 16# The user must have configuration files {src}.conf and
 17# {dst}.conf in a standard configuration directory with valid login credentials
 18# for instances src and dst.  If either of these files is not found,
 19# arv-copy will issue an error.
 20
 21import argparse
 22import contextlib
 23import getpass
 24import os
 25import re
 26import shutil
 27import subprocess
 28import sys
 29import logging
 30import tempfile
 31import urllib.parse
 32import io
 33import json
 34import queue
 35import threading
 36import errno
 37
 38import httplib2.error
 39import googleapiclient
 40
 41import arvados
 42import arvados.config
 43import arvados.keep
 44import arvados.util
 45import arvados.commands._util as arv_cmd
 46import arvados.commands.keepdocker
 47from arvados.logging import log_handler
 48
 49from arvados._internal import basedirs, http_to_keep
 50from arvados._version import __version__
 51
 52COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
 53
 54logger = logging.getLogger('arvados.arv-copy')
 55
 56# Set this up so connection errors get logged.
 57googleapi_logger = logging.getLogger('googleapiclient.http')
 58
 59# local_repo_dir records which git repositories from the Arvados source
 60# instance have been checked out locally during this run, and to which
 61# directories.
 62# e.g. if repository 'twp' from src_arv has been cloned into
 63# /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
 64#
 65local_repo_dir = {}
 66
 67# List of collections that have been copied in this session, and their
 68# destination collection UUIDs.
 69collections_copied = {}
 70
 71# Set of (repository, script_version) two-tuples of commits copied in git.
 72scripts_copied = set()
 73
 74# The owner_uuid of the object being copied
 75src_owner_uuid = None
 76
 77def main():
 78    copy_opts = argparse.ArgumentParser(add_help=False)
 79
 80    copy_opts.add_argument(
 81        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
 82        help='Print version and exit.')
 83    copy_opts.add_argument(
 84        '-v', '--verbose', dest='verbose', action='store_true',
 85        help='Verbose output.')
 86    copy_opts.add_argument(
 87        '--progress', dest='progress', action='store_true',
 88        help='Report progress on copying collections. (default)')
 89    copy_opts.add_argument(
 90        '--no-progress', dest='progress', action='store_false',
 91        help='Do not report progress on copying collections.')
 92    copy_opts.add_argument(
 93        '-f', '--force', dest='force', action='store_true',
 94        help='Perform copy even if the object appears to exist at the remote destination.')
 95    copy_opts.add_argument(
 96        '--src', dest='source_arvados',
 97        help="""
 98Client configuration location for the source Arvados cluster.
 99May be either a configuration file path, or a plain identifier like `foo`
100to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
101If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
102""",
103    )
104    copy_opts.add_argument(
105        '--dst', dest='destination_arvados',
106        help="""
107Client configuration location for the destination Arvados cluster.
108May be either a configuration file path, or a plain identifier like `foo`
109to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
110If not provided, will use the default client configuration from the environment or `settings.conf`.
111""",
112    )
113    copy_opts.add_argument(
114        '--recursive', dest='recursive', action='store_true',
115        help='Recursively copy any dependencies for this object, and subprojects. (default)')
116    copy_opts.add_argument(
117        '--no-recursive', dest='recursive', action='store_false',
118        help='Do not copy any dependencies or subprojects.')
119    copy_opts.add_argument(
120        '--project-uuid', dest='project_uuid',
121        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
122    copy_opts.add_argument(
123        '--replication',
124        type=arv_cmd.RangedValue(int, range(1, sys.maxsize)),
125        metavar='N',
126        help="""
127Number of replicas per storage class for the copied collections at the destination.
128If not provided (or if provided with invalid value),
129use the destination's default replication-level setting (if found),
130or the fallback value 2.
131""")
132    copy_opts.add_argument(
133        '--storage-classes',
134        type=arv_cmd.UniqueSplit(),
135        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
136    copy_opts.add_argument("--varying-url-params", type=str, default="",
137                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
138
139    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
140                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
141
142    copy_opts.add_argument(
143        'object_uuid',
144        help='The UUID of the object to be copied.')
145    copy_opts.set_defaults(progress=True)
146    copy_opts.set_defaults(recursive=True)
147
148    parser = argparse.ArgumentParser(
149        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
150        parents=[copy_opts, arv_cmd.retry_opt])
151    args = parser.parse_args()
152
153    if args.verbose:
154        logger.setLevel(logging.DEBUG)
155    else:
156        logger.setLevel(logging.INFO)
157
158    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
159        args.source_arvados = args.object_uuid[:5]
160
161    if not args.destination_arvados and args.project_uuid:
162        args.destination_arvados = args.project_uuid[:5]
163
164    # Make sure errors trying to connect to clusters get logged.
165    googleapi_logger.setLevel(logging.WARN)
166    googleapi_logger.addHandler(log_handler)
167
168    # Create API clients for the source and destination instances
169    src_arv = api_for_instance(args.source_arvados, args.retries)
170    dst_arv = api_for_instance(args.destination_arvados, args.retries)
171
172    # Once we've successfully contacted the clusters, we probably
173    # don't want to see logging about retries (unless the user asked
174    # for verbose output).
175    if not args.verbose:
176        googleapi_logger.setLevel(logging.ERROR)
177
178    if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]:
179        logger.info("Copying within cluster %s", src_arv.config()["ClusterID"])
180    else:
181        logger.info("Source cluster is %s", src_arv.config()["ClusterID"])
182        logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"])
183
184    if not args.project_uuid:
185        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
186
187    # Identify the kind of object we have been given, and begin copying.
188    t = uuid_type(src_arv, args.object_uuid)
189
190    try:
191        if t == 'Collection':
192            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
193            result = copy_collection(args.object_uuid,
194                                     src_arv, dst_arv,
195                                     args)
196        elif t == 'Workflow':
197            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
198            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
199        elif t == 'Group':
200            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
201            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
202        elif t == 'httpURL':
203            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
204        else:
205            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
206    except Exception as e:
207        logger.error("%s", e, exc_info=args.verbose)
208        exit(1)
209
210    # Clean up any outstanding temp git repositories.
211    for d in local_repo_dir.values():
212        shutil.rmtree(d, ignore_errors=True)
213
214    if not result:
215        exit(1)
216
217    # If no exception was thrown and the response does not have an
218    # error_token field, presume success
219    if result is None or 'error_token' in result or 'uuid' not in result:
220        if result:
221            logger.error("API server returned an error result: {}".format(result))
222        exit(1)
223
224    print(result['uuid'])
225
226    if result.get('partial_error'):
227        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
228        exit(1)
229
230    logger.info("Success: created copy with uuid {}".format(result['uuid']))
231    exit(0)
232
233def set_src_owner_uuid(resource, uuid, args):
234    global src_owner_uuid
235    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
236    src_owner_uuid = c.get("owner_uuid")
237
238# api_for_instance(instance_name)
239#
240#     Creates an API client for the Arvados instance identified by
241#     instance_name.
242#
243#     If instance_name contains a slash, it is presumed to be a path
244#     (either local or absolute) to a file with Arvados configuration
245#     settings.
246#
247#     Otherwise, it is presumed to be the name of a file in a standard
248#     configuration directory.
249#
250def api_for_instance(instance_name, num_retries):
251    msg = []
252    if instance_name:
253        if '/' in instance_name:
254            config_file = instance_name
255        else:
256            dirs = basedirs.BaseDirectories('CONFIG')
257            config_file = next(dirs.search(f'{instance_name}.conf'), '')
258
259        try:
260            cfg = arvados.config.load(config_file)
261
262            if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
263                api_is_insecure = (
264                    cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
265                        ['1', 't', 'true', 'y', 'yes']))
266                return arvados.api('v1',
267                                     host=cfg['ARVADOS_API_HOST'],
268                                     token=cfg['ARVADOS_API_TOKEN'],
269                                     insecure=api_is_insecure,
270                                     num_retries=num_retries,
271                                     )
272            else:
273                msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file))
274        except OSError as e:
275            if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH):
276                verb = 'connect to instance from'
277            elif config_file:
278                verb = 'open'
279            else:
280                verb = 'find'
281                searchlist = ":".join(str(p) for p in dirs.search_paths())
282                config_file = f'{instance_name}.conf in path {searchlist}'
283            msg.append(("Could not {} config file {}: {}").format(
284                       verb, config_file, e.strerror))
285        except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e:
286            msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e))
287
288    default_api = None
289    default_instance = None
290    try:
291        default_api = arvados.api('v1', num_retries=num_retries)
292        default_instance = default_api.config()["ClusterID"]
293    except ValueError:
294        pass
295    except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e:
296        msg.append("Failed to connect to default instance, error was {}".format(e))
297
298    if default_api is not None and (not instance_name or instance_name == default_instance):
299        # Use default settings
300        return default_api
301
302    if instance_name and default_instance and instance_name != default_instance:
303        msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name))
304
305    for m in msg:
306        logger.error(m)
307
308    abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
309
310# Check if git is available
311def check_git_availability():
312    try:
313        subprocess.run(
314            ['git', '--version'],
315            check=True,
316            stdout=subprocess.DEVNULL,
317        )
318    except FileNotFoundError:
319        abort('git command is not available. Please ensure git is installed.')
320
321
322def filter_iter(arg):
323    """Iterate a filter string-or-list.
324
325    Pass in a filter field that can either be a string or list.
326    This will iterate elements as if the field had been written as a list.
327    """
328    if isinstance(arg, str):
329        yield arg
330    else:
331        yield from arg
332
333def migrate_repository_filter(repo_filter, src_repository, dst_repository):
334    """Update a single repository filter in-place for the destination.
335
336    If the filter checks that the repository is src_repository, it is
337    updated to check that the repository is dst_repository.  If it does
338    anything else, this function raises ValueError.
339    """
340    if src_repository is None:
341        raise ValueError("component does not specify a source repository")
342    elif dst_repository is None:
343        raise ValueError("no destination repository specified to update repository filter")
344    elif repo_filter[1:] == ['=', src_repository]:
345        repo_filter[2] = dst_repository
346    elif repo_filter[1:] == ['in', [src_repository]]:
347        repo_filter[2] = [dst_repository]
348    else:
349        raise ValueError("repository filter is not a simple source match")
350
351def migrate_script_version_filter(version_filter):
352    """Update a single script_version filter in-place for the destination.
353
354    Currently this function checks that all the filter operands are Git
355    commit hashes.  If they're not, it raises ValueError to indicate that
356    the filter is not portable.  It could be extended to make other
357    transformations in the future.
358    """
359    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
360        raise ValueError("script_version filter is not limited to commit hashes")
361
362def attr_filtered(filter_, *attr_names):
363    """Return True if filter_ applies to any of attr_names, else False."""
364    return any((name == 'any') or (name in attr_names)
365               for name in filter_iter(filter_[0]))
366
367@contextlib.contextmanager
368def exception_handler(handler, *exc_types):
369    """If any exc_types are raised in the block, call handler on the exception."""
370    try:
371        yield
372    except exc_types as error:
373        handler(error)
374
375
376# copy_workflow(wf_uuid, src, dst, args)
377#
378#    Copies a workflow identified by wf_uuid from src to dst.
379#
380#    If args.recursive is True, also copy any collections
381#      referenced in the workflow definition yaml.
382#
383#    The owner_uuid of the new workflow is set to any given
384#      project_uuid or the user who copied the template.
385#
386#    Returns the copied workflow object.
387#
388def copy_workflow(wf_uuid, src, dst, args):
389    # fetch the workflow from the source instance
390    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
391
392    if not wf["definition"]:
393        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
394
395    # copy collections and docker images
396    if args.recursive and wf["definition"]:
397        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
398               "ARVADOS_API_TOKEN": src.api_token,
399               "PATH": os.environ["PATH"]}
400        try:
401            result = subprocess.run(
402                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
403                env=env,
404                stdout=subprocess.PIPE,
405                universal_newlines=True,
406            )
407        except FileNotFoundError:
408            no_arv_copy = True
409        else:
410            no_arv_copy = result.returncode == 2
411
412        if no_arv_copy:
413            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
414        elif result.returncode != 0:
415            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
416
417        locations = json.loads(result.stdout)
418
419        if locations:
420            copy_collections(locations, src, dst, args)
421
422    # copy the workflow itself
423    del wf['uuid']
424    wf['owner_uuid'] = args.project_uuid
425
426    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
427                                             ["name", "=", wf["name"]]]).execute()
428    if len(existing["items"]) == 0:
429        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
430    else:
431        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
432
433
434def workflow_collections(obj, locations, docker_images):
435    if isinstance(obj, dict):
436        loc = obj.get('location', None)
437        if loc is not None:
438            if loc.startswith("keep:"):
439                locations.append(loc[5:])
440
441        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
442        if docker_image is not None:
443            ds = docker_image.split(":", 1)
444            tag = ds[1] if len(ds)==2 else 'latest'
445            docker_images[ds[0]] = tag
446
447        for x in obj:
448            workflow_collections(obj[x], locations, docker_images)
449    elif isinstance(obj, list):
450        for x in obj:
451            workflow_collections(x, locations, docker_images)
452
453# copy_collections(obj, src, dst, args)
454#
455#    Recursively copies all collections referenced by 'obj' from src
456#    to dst.  obj may be a dict or a list, in which case we run
457#    copy_collections on every value it contains. If it is a string,
458#    search it for any substring that matches a collection hash or uuid
459#    (this will find hidden references to collections like
460#      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
461#
462#    Returns a copy of obj with any old collection uuids replaced by
463#    the new ones.
464#
465def copy_collections(obj, src, dst, args):
466
467    def copy_collection_fn(collection_match):
468        """Helper function for regex substitution: copies a single collection,
469        identified by the collection_match MatchObject, to the
470        destination.  Returns the destination collection uuid (or the
471        portable data hash if that's what src_id is).
472
473        """
474        src_id = collection_match.group(0)
475        if src_id not in collections_copied:
476            dst_col = copy_collection(src_id, src, dst, args)
477            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
478                collections_copied[src_id] = src_id
479            else:
480                collections_copied[src_id] = dst_col['uuid']
481        return collections_copied[src_id]
482
483    if isinstance(obj, str):
484        # Copy any collections identified in this string to dst, replacing
485        # them with the dst uuids as necessary.
486        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
487        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
488        return obj
489    elif isinstance(obj, dict):
490        return type(obj)((v, copy_collections(obj[v], src, dst, args))
491                         for v in obj)
492    elif isinstance(obj, list):
493        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
494    return obj
495
496
497def total_collection_size(manifest_text):
498    """Return the total number of bytes in this collection (excluding
499    duplicate blocks)."""
500
501    total_bytes = 0
502    locators_seen = {}
503    for line in manifest_text.splitlines():
504        words = line.split()
505        for word in words[1:]:
506            try:
507                loc = arvados.KeepLocator(word)
508            except ValueError:
509                continue  # this word isn't a locator, skip it
510            if loc.md5sum not in locators_seen:
511                locators_seen[loc.md5sum] = True
512                total_bytes += loc.size
513
514    return total_bytes
515
516def create_collection_from(c, src, dst, args):
517    """Create a new collection record on dst, and copy Docker metadata if
518    available."""
519
520    collection_uuid = c['uuid']
521    body = {}
522    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
523        body[d] = c[d]
524
525    if not body["name"]:
526        body['name'] = "copied from " + collection_uuid
527
528    if args.storage_classes:
529        body['storage_classes_desired'] = args.storage_classes
530
531    body['owner_uuid'] = args.project_uuid
532
533    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
534
535    # Create docker_image_repo+tag and docker_image_hash links
536    # at the destination.
537    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
538        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
539
540        for src_link in docker_links:
541            body = {key: src_link[key]
542                    for key in ['link_class', 'name', 'properties']}
543            body['head_uuid'] = dst_collection['uuid']
544            body['owner_uuid'] = args.project_uuid
545
546            lk = dst.links().create(body=body).execute(num_retries=args.retries)
547            logger.debug('created dst link {}'.format(lk))
548
549    return dst_collection
550
551# copy_collection(obj_uuid, src, dst, args)
552#
553#    Copies the collection identified by obj_uuid from src to dst.
554#    Returns the collection object created at dst.
555#
556#    If args.progress is True, produce a human-friendly progress
557#    report.
558#
559#    If a collection with the desired portable_data_hash already
560#    exists at dst, and args.force is False, copy_collection returns
561#    the existing collection without copying any blocks.  Otherwise
562#    (if no collection exists or if args.force is True)
563#    copy_collection copies all of the collection data blocks from src
564#    to dst.
565#
566#    For this application, it is critical to preserve the
567#    collection's manifest hash, which is not guaranteed with the
568#    arvados.CollectionReader and arvados.CollectionWriter classes.
569#    Copying each block in the collection manually, followed by
570#    the manifest block, ensures that the collection's manifest
571#    hash will not change.
572#
573def copy_collection(obj_uuid, src, dst, args):
574    if arvados.util.keep_locator_pattern.match(obj_uuid):
575        # If the obj_uuid is a portable data hash, it might not be
576        # uniquely identified with a particular collection.  As a
577        # result, it is ambiguous as to what name to use for the copy.
578        # Apply some heuristics to pick which collection to get the
579        # name from.
580        srccol = src.collections().list(
581            filters=[['portable_data_hash', '=', obj_uuid]],
582            order="created_at asc"
583            ).execute(num_retries=args.retries)
584
585        items = srccol.get("items")
586
587        if not items:
588            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
589            return
590
591        c = None
592
593        if len(items) == 1:
594            # There's only one collection with the PDH, so use that.
595            c = items[0]
596        if not c:
597            # See if there is a collection that's in the same project
598            # as the root item (usually a workflow) being copied.
599            for i in items:
600                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
601                    c = i
602                    break
603        if not c:
604            # Didn't find any collections located in the same project, so
605            # pick the oldest collection that has a name assigned to it.
606            for i in items:
607                if i.get("name"):
608                    c = i
609                    break
610        if not c:
611            # None of the collections have names (?!), so just pick the
612            # first one.
613            c = items[0]
614
615        # list() doesn't return manifest text (and we don't want it to,
616        # because we don't need the same maninfest text sent to us 50
617        # times) so go and retrieve the collection object directly
618        # which will include the manifest text.
619        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
620    else:
621        # Assume this is an actual collection uuid, so fetch it directly.
622        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
623
624    # If a collection with this hash already exists at the
625    # destination, and 'force' is not true, just return that
626    # collection.
627    if not args.force:
628        if 'portable_data_hash' in c:
629            colhash = c['portable_data_hash']
630        else:
631            colhash = c['uuid']
632        dstcol = dst.collections().list(
633            filters=[['portable_data_hash', '=', colhash]]
634        ).execute(num_retries=args.retries)
635        if dstcol['items_available'] > 0:
636            for d in dstcol['items']:
637                if ((args.project_uuid == d['owner_uuid']) and
638                    (c.get('name') == d['name']) and
639                    (c['portable_data_hash'] == d['portable_data_hash'])):
640                    return d
641            c['manifest_text'] = dst.collections().get(
642                uuid=dstcol['items'][0]['uuid']
643            ).execute(num_retries=args.retries)['manifest_text']
644            return create_collection_from(c, src, dst, args)
645
646    if args.replication is None:
647        # Obtain default or fallback collection replication setting on the
648        # destination
649        try:
650            args.replication = int(dst.config()["Collections"]["DefaultReplication"])
651        except (KeyError, TypeError, ValueError):
652            args.replication = 2
653
654    # Fetch the collection's manifest.
655    manifest = c['manifest_text']
656    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
657
658    # Copy each block from src_keep to dst_keep.
659    # Use the newly signed locators returned from dst_keep to build
660    # a new manifest as we go.
661    src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
662    dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
663    dst_manifest = io.StringIO()
664    dst_locators = {}
665    bytes_written = 0
666    bytes_expected = total_collection_size(manifest)
667    if args.progress:
668        progress_writer = ProgressWriter(human_progress)
669    else:
670        progress_writer = None
671
672    # go through the words
673    # put each block loc into 'get' queue
674    # 'get' threads get block and put it into 'put' queue
675    # 'put' threads put block and then update dst_locators
676    #
677    # after going through the whole manifest we go back through it
678    # again and build dst_manifest
679
680    lock = threading.Lock()
681
682    # the get queue should be unbounded because we'll add all the
683    # block hashes we want to get, but these are small
684    get_queue = queue.Queue()
685
686    threadcount = 4
687
688    # the put queue contains full data blocks
689    # and if 'get' is faster than 'put' we could end up consuming
690    # a great deal of RAM if it isn't bounded.
691    put_queue = queue.Queue(threadcount)
692    transfer_error = []
693
694    def get_thread():
695        while True:
696            word = get_queue.get()
697            if word is None:
698                put_queue.put(None)
699                get_queue.task_done()
700                return
701
702            blockhash = arvados.KeepLocator(word).md5sum
703            with lock:
704                if blockhash in dst_locators:
705                    # Already uploaded
706                    get_queue.task_done()
707                    continue
708
709            try:
710                logger.debug("Getting block %s", word)
711                data = src_keep.get(word)
712                put_queue.put((word, data))
713            except Exception as e:
714                logger.error("Error getting block %s: %s", word, e)
715                transfer_error.append(e)
716                try:
717                    # Drain the 'get' queue so we end early
718                    while True:
719                        get_queue.get(False)
720                        get_queue.task_done()
721                except queue.Empty:
722                    pass
723            finally:
724                get_queue.task_done()
725
726    def put_thread():
727        nonlocal bytes_written
728        while True:
729            item = put_queue.get()
730            if item is None:
731                put_queue.task_done()
732                return
733
734            word, data = item
735            loc = arvados.KeepLocator(word)
736            blockhash = loc.md5sum
737            with lock:
738                if blockhash in dst_locators:
739                    # Already uploaded
740                    put_queue.task_done()
741                    continue
742
743            try:
744                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
745                dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or []))
746                with lock:
747                    dst_locators[blockhash] = dst_locator
748                    bytes_written += loc.size
749                    if progress_writer:
750                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
751            except Exception as e:
752                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
753                try:
754                    # Drain the 'get' queue so we end early
755                    while True:
756                        get_queue.get(False)
757                        get_queue.task_done()
758                except queue.Empty:
759                    pass
760                transfer_error.append(e)
761            finally:
762                put_queue.task_done()
763
764    for line in manifest.splitlines():
765        words = line.split()
766        for word in words[1:]:
767            try:
768                loc = arvados.KeepLocator(word)
769            except ValueError:
770                # If 'word' can't be parsed as a locator,
771                # presume it's a filename.
772                continue
773
774            get_queue.put(word)
775
776    for i in range(0, threadcount):
777        get_queue.put(None)
778
779    for i in range(0, threadcount):
780        threading.Thread(target=get_thread, daemon=True).start()
781
782    for i in range(0, threadcount):
783        threading.Thread(target=put_thread, daemon=True).start()
784
785    get_queue.join()
786    put_queue.join()
787
788    if len(transfer_error) > 0:
789        return {"error_token": "Failed to transfer blocks"}
790
791    for line in manifest.splitlines():
792        words = line.split()
793        dst_manifest.write(words[0])
794        for word in words[1:]:
795            try:
796                loc = arvados.KeepLocator(word)
797            except ValueError:
798                # If 'word' can't be parsed as a locator,
799                # presume it's a filename.
800                dst_manifest.write(' ')
801                dst_manifest.write(word)
802                continue
803            blockhash = loc.md5sum
804            dst_manifest.write(' ')
805            dst_manifest.write(dst_locators[blockhash])
806        dst_manifest.write("\n")
807
808    if progress_writer:
809        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
810        progress_writer.finish()
811
812    # Copy the manifest and save the collection.
813    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
814
815    c['manifest_text'] = dst_manifest.getvalue()
816    return create_collection_from(c, src, dst, args)
817
818def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
819    """Copy the docker image identified by docker_image and
820    docker_image_tag from src to dst. Create appropriate
821    docker_image_repo+tag and docker_image_hash links at dst.
822
823    """
824
825    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
826
827    # Find the link identifying this docker image.
828    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
829        src, args.retries, docker_image, docker_image_tag)
830    if docker_image_list:
831        image_uuid, image_info = docker_image_list[0]
832        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
833
834        # Copy the collection it refers to.
835        dst_image_col = copy_collection(image_uuid, src, dst, args)
836    elif arvados.util.keep_locator_pattern.match(docker_image):
837        dst_image_col = copy_collection(docker_image, src, dst, args)
838    else:
839        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
840
841def copy_project(obj_uuid, src, dst, owner_uuid, args):
842
843    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
844
845    # Create/update the destination project
846    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
847                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
848    if len(existing["items"]) == 0:
849        project_record = dst.groups().create(body={"group": {"group_class": "project",
850                                                             "owner_uuid": owner_uuid,
851                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
852    else:
853        project_record = existing["items"][0]
854
855    dst.groups().update(uuid=project_record["uuid"],
856                        body={"group": {
857                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
858
859    args.project_uuid = project_record["uuid"]
860
861    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
862
863
864    partial_error = ""
865
866    # Copy collections
867    try:
868        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
869                         src, dst, args)
870    except Exception as e:
871        partial_error += "\n" + str(e)
872
873    # Copy workflows
874    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
875        try:
876            copy_workflow(w["uuid"], src, dst, args)
877        except Exception as e:
878            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
879
880    if args.recursive:
881        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
882            try:
883                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
884            except Exception as e:
885                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
886
887    project_record["partial_error"] = partial_error
888
889    return project_record
890
891# git_rev_parse(rev, repo)
892#
893#    Returns the 40-character commit hash corresponding to 'rev' in
894#    git repository 'repo' (which must be the path of a local git
895#    repository)
896#
897def git_rev_parse(rev, repo):
898    proc = subprocess.run(
899        ['git', 'rev-parse', rev],
900        check=True,
901        cwd=repo,
902        stdout=subprocess.PIPE,
903        text=True,
904    )
905    return proc.stdout.read().strip()
906
907# uuid_type(api, object_uuid)
908#
909#    Returns the name of the class that object_uuid belongs to, based on
910#    the second field of the uuid.  This function consults the api's
911#    schema to identify the object class.
912#
913#    It returns a string such as 'Collection', 'Workflow', etc.
914#
915#    Special case: if handed a Keep locator hash, return 'Collection'.
916#
917def uuid_type(api, object_uuid):
918    if re.match(arvados.util.keep_locator_pattern, object_uuid):
919        return 'Collection'
920
921    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
922        return 'httpURL'
923
924    p = object_uuid.split('-')
925    if len(p) == 3:
926        type_prefix = p[1]
927        for k in api._schema.schemas:
928            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
929            if type_prefix == obj_class:
930                return k
931    return None
932
933
934def copy_from_http(url, src, dst, args):
935
936    project_uuid = args.project_uuid
937    # Ensure string of varying parameters is well-formed
938    prefer_cached_downloads = args.prefer_cached_downloads
939
940    cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
941                                           varying_url_params=args.varying_url_params,
942                                           prefer_cached_downloads=prefer_cached_downloads)
943    if cached[2] is not None:
944        return copy_collection(cached[2], src, dst, args)
945
946    cached = http_to_keep.http_to_keep(dst, project_uuid, url,
947                                       varying_url_params=args.varying_url_params,
948                                       prefer_cached_downloads=prefer_cached_downloads)
949
950    if cached is not None:
951        return {"uuid": cached[2]}
952
953
954def abort(msg, code=1):
955    logger.info("arv-copy: %s", msg)
956    exit(code)
957
958
959# Code for reporting on the progress of a collection upload.
960# Stolen from arvados.commands.put.ArvPutCollectionWriter
961# TODO(twp): figure out how to refactor into a shared library
962# (may involve refactoring some arvados.commands.arv_copy.copy_collection
963# code)
964
965def machine_progress(obj_uuid, bytes_written, bytes_expected):
966    return "{} {}: {} {} written {} total\n".format(
967        sys.argv[0],
968        os.getpid(),
969        obj_uuid,
970        bytes_written,
971        -1 if (bytes_expected is None) else bytes_expected)
972
973def human_progress(obj_uuid, bytes_written, bytes_expected):
974    if bytes_expected:
975        return "\r{}: {}M / {}M {:.1%} ".format(
976            obj_uuid,
977            bytes_written >> 20, bytes_expected >> 20,
978            float(bytes_written) / bytes_expected)
979    else:
980        return "\r{}: {} ".format(obj_uuid, bytes_written)
981
982class ProgressWriter(object):
983    _progress_func = None
984    outfile = sys.stderr
985
986    def __init__(self, progress_func):
987        self._progress_func = progress_func
988
989    def report(self, obj_uuid, bytes_written, bytes_expected):
990        if self._progress_func is not None:
991            self.outfile.write(
992                self._progress_func(obj_uuid, bytes_written, bytes_expected))
993
994    def finish(self):
995        self.outfile.write("\n")
996
997if __name__ == '__main__':
998    main()
COMMIT_HASH_RE = re.compile('^[0-9a-f]{1,40}$')
logger = <Logger arvados.arv-copy (WARNING)>
googleapi_logger = <Logger googleapiclient.http (WARNING)>
local_repo_dir = {}
collections_copied = {}
scripts_copied = set()
src_owner_uuid = None
def main():
 78def main():
 79    copy_opts = argparse.ArgumentParser(add_help=False)
 80
 81    copy_opts.add_argument(
 82        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
 83        help='Print version and exit.')
 84    copy_opts.add_argument(
 85        '-v', '--verbose', dest='verbose', action='store_true',
 86        help='Verbose output.')
 87    copy_opts.add_argument(
 88        '--progress', dest='progress', action='store_true',
 89        help='Report progress on copying collections. (default)')
 90    copy_opts.add_argument(
 91        '--no-progress', dest='progress', action='store_false',
 92        help='Do not report progress on copying collections.')
 93    copy_opts.add_argument(
 94        '-f', '--force', dest='force', action='store_true',
 95        help='Perform copy even if the object appears to exist at the remote destination.')
 96    copy_opts.add_argument(
 97        '--src', dest='source_arvados',
 98        help="""
 99Client configuration location for the source Arvados cluster.
100May be either a configuration file path, or a plain identifier like `foo`
101to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
102If not provided, will search for a configuration file named after the cluster ID of the source object UUID.
103""",
104    )
105    copy_opts.add_argument(
106        '--dst', dest='destination_arvados',
107        help="""
108Client configuration location for the destination Arvados cluster.
109May be either a configuration file path, or a plain identifier like `foo`
110to search for a configuration file `foo.conf` under a systemd or XDG configuration directory.
111If not provided, will use the default client configuration from the environment or `settings.conf`.
112""",
113    )
114    copy_opts.add_argument(
115        '--recursive', dest='recursive', action='store_true',
116        help='Recursively copy any dependencies for this object, and subprojects. (default)')
117    copy_opts.add_argument(
118        '--no-recursive', dest='recursive', action='store_false',
119        help='Do not copy any dependencies or subprojects.')
120    copy_opts.add_argument(
121        '--project-uuid', dest='project_uuid',
122        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
123    copy_opts.add_argument(
124        '--replication',
125        type=arv_cmd.RangedValue(int, range(1, sys.maxsize)),
126        metavar='N',
127        help="""
128Number of replicas per storage class for the copied collections at the destination.
129If not provided (or if provided with invalid value),
130use the destination's default replication-level setting (if found),
131or the fallback value 2.
132""")
133    copy_opts.add_argument(
134        '--storage-classes',
135        type=arv_cmd.UniqueSplit(),
136        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
137    copy_opts.add_argument("--varying-url-params", type=str, default="",
138                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
139
140    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
141                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
142
143    copy_opts.add_argument(
144        'object_uuid',
145        help='The UUID of the object to be copied.')
146    copy_opts.set_defaults(progress=True)
147    copy_opts.set_defaults(recursive=True)
148
149    parser = argparse.ArgumentParser(
150        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
151        parents=[copy_opts, arv_cmd.retry_opt])
152    args = parser.parse_args()
153
154    if args.verbose:
155        logger.setLevel(logging.DEBUG)
156    else:
157        logger.setLevel(logging.INFO)
158
159    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
160        args.source_arvados = args.object_uuid[:5]
161
162    if not args.destination_arvados and args.project_uuid:
163        args.destination_arvados = args.project_uuid[:5]
164
165    # Make sure errors trying to connect to clusters get logged.
166    googleapi_logger.setLevel(logging.WARN)
167    googleapi_logger.addHandler(log_handler)
168
169    # Create API clients for the source and destination instances
170    src_arv = api_for_instance(args.source_arvados, args.retries)
171    dst_arv = api_for_instance(args.destination_arvados, args.retries)
172
173    # Once we've successfully contacted the clusters, we probably
174    # don't want to see logging about retries (unless the user asked
175    # for verbose output).
176    if not args.verbose:
177        googleapi_logger.setLevel(logging.ERROR)
178
179    if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]:
180        logger.info("Copying within cluster %s", src_arv.config()["ClusterID"])
181    else:
182        logger.info("Source cluster is %s", src_arv.config()["ClusterID"])
183        logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"])
184
185    if not args.project_uuid:
186        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
187
188    # Identify the kind of object we have been given, and begin copying.
189    t = uuid_type(src_arv, args.object_uuid)
190
191    try:
192        if t == 'Collection':
193            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
194            result = copy_collection(args.object_uuid,
195                                     src_arv, dst_arv,
196                                     args)
197        elif t == 'Workflow':
198            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
199            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
200        elif t == 'Group':
201            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
202            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
203        elif t == 'httpURL':
204            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
205        else:
206            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
207    except Exception as e:
208        logger.error("%s", e, exc_info=args.verbose)
209        exit(1)
210
211    # Clean up any outstanding temp git repositories.
212    for d in local_repo_dir.values():
213        shutil.rmtree(d, ignore_errors=True)
214
215    if not result:
216        exit(1)
217
218    # If no exception was thrown and the response does not have an
219    # error_token field, presume success
220    if result is None or 'error_token' in result or 'uuid' not in result:
221        if result:
222            logger.error("API server returned an error result: {}".format(result))
223        exit(1)
224
225    print(result['uuid'])
226
227    if result.get('partial_error'):
228        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
229        exit(1)
230
231    logger.info("Success: created copy with uuid {}".format(result['uuid']))
232    exit(0)
def set_src_owner_uuid(resource, uuid, args):
234def set_src_owner_uuid(resource, uuid, args):
235    global src_owner_uuid
236    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
237    src_owner_uuid = c.get("owner_uuid")
def api_for_instance(instance_name, num_retries):
251def api_for_instance(instance_name, num_retries):
252    msg = []
253    if instance_name:
254        if '/' in instance_name:
255            config_file = instance_name
256        else:
257            dirs = basedirs.BaseDirectories('CONFIG')
258            config_file = next(dirs.search(f'{instance_name}.conf'), '')
259
260        try:
261            cfg = arvados.config.load(config_file)
262
263            if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
264                api_is_insecure = (
265                    cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
266                        ['1', 't', 'true', 'y', 'yes']))
267                return arvados.api('v1',
268                                     host=cfg['ARVADOS_API_HOST'],
269                                     token=cfg['ARVADOS_API_TOKEN'],
270                                     insecure=api_is_insecure,
271                                     num_retries=num_retries,
272                                     )
273            else:
274                msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file))
275        except OSError as e:
276            if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH):
277                verb = 'connect to instance from'
278            elif config_file:
279                verb = 'open'
280            else:
281                verb = 'find'
282                searchlist = ":".join(str(p) for p in dirs.search_paths())
283                config_file = f'{instance_name}.conf in path {searchlist}'
284            msg.append(("Could not {} config file {}: {}").format(
285                       verb, config_file, e.strerror))
286        except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e:
287            msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e))
288
289    default_api = None
290    default_instance = None
291    try:
292        default_api = arvados.api('v1', num_retries=num_retries)
293        default_instance = default_api.config()["ClusterID"]
294    except ValueError:
295        pass
296    except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e:
297        msg.append("Failed to connect to default instance, error was {}".format(e))
298
299    if default_api is not None and (not instance_name or instance_name == default_instance):
300        # Use default settings
301        return default_api
302
303    if instance_name and default_instance and instance_name != default_instance:
304        msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name))
305
306    for m in msg:
307        logger.error(m)
308
309    abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
def check_git_availability():
312def check_git_availability():
313    try:
314        subprocess.run(
315            ['git', '--version'],
316            check=True,
317            stdout=subprocess.DEVNULL,
318        )
319    except FileNotFoundError:
320        abort('git command is not available. Please ensure git is installed.')
def filter_iter(arg):
323def filter_iter(arg):
324    """Iterate a filter string-or-list.
325
326    Pass in a filter field that can either be a string or list.
327    This will iterate elements as if the field had been written as a list.
328    """
329    if isinstance(arg, str):
330        yield arg
331    else:
332        yield from arg

Iterate a filter string-or-list.

Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.

def migrate_repository_filter(repo_filter, src_repository, dst_repository):
334def migrate_repository_filter(repo_filter, src_repository, dst_repository):
335    """Update a single repository filter in-place for the destination.
336
337    If the filter checks that the repository is src_repository, it is
338    updated to check that the repository is dst_repository.  If it does
339    anything else, this function raises ValueError.
340    """
341    if src_repository is None:
342        raise ValueError("component does not specify a source repository")
343    elif dst_repository is None:
344        raise ValueError("no destination repository specified to update repository filter")
345    elif repo_filter[1:] == ['=', src_repository]:
346        repo_filter[2] = dst_repository
347    elif repo_filter[1:] == ['in', [src_repository]]:
348        repo_filter[2] = [dst_repository]
349    else:
350        raise ValueError("repository filter is not a simple source match")

Update a single repository filter in-place for the destination.

If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.

def migrate_script_version_filter(version_filter):
352def migrate_script_version_filter(version_filter):
353    """Update a single script_version filter in-place for the destination.
354
355    Currently this function checks that all the filter operands are Git
356    commit hashes.  If they're not, it raises ValueError to indicate that
357    the filter is not portable.  It could be extended to make other
358    transformations in the future.
359    """
360    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
361        raise ValueError("script_version filter is not limited to commit hashes")

Update a single script_version filter in-place for the destination.

Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.

def attr_filtered(filter_, *attr_names):
363def attr_filtered(filter_, *attr_names):
364    """Return True if filter_ applies to any of attr_names, else False."""
365    return any((name == 'any') or (name in attr_names)
366               for name in filter_iter(filter_[0]))

Return True if filter_ applies to any of attr_names, else False.

@contextlib.contextmanager
def exception_handler(handler, *exc_types):
368@contextlib.contextmanager
369def exception_handler(handler, *exc_types):
370    """If any exc_types are raised in the block, call handler on the exception."""
371    try:
372        yield
373    except exc_types as error:
374        handler(error)

If any exc_types are raised in the block, call handler on the exception.

def copy_workflow(wf_uuid, src, dst, args):
389def copy_workflow(wf_uuid, src, dst, args):
390    # fetch the workflow from the source instance
391    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
392
393    if not wf["definition"]:
394        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
395
396    # copy collections and docker images
397    if args.recursive and wf["definition"]:
398        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
399               "ARVADOS_API_TOKEN": src.api_token,
400               "PATH": os.environ["PATH"]}
401        try:
402            result = subprocess.run(
403                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
404                env=env,
405                stdout=subprocess.PIPE,
406                universal_newlines=True,
407            )
408        except FileNotFoundError:
409            no_arv_copy = True
410        else:
411            no_arv_copy = result.returncode == 2
412
413        if no_arv_copy:
414            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
415        elif result.returncode != 0:
416            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
417
418        locations = json.loads(result.stdout)
419
420        if locations:
421            copy_collections(locations, src, dst, args)
422
423    # copy the workflow itself
424    del wf['uuid']
425    wf['owner_uuid'] = args.project_uuid
426
427    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
428                                             ["name", "=", wf["name"]]]).execute()
429    if len(existing["items"]) == 0:
430        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
431    else:
432        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
def workflow_collections(obj, locations, docker_images):
435def workflow_collections(obj, locations, docker_images):
436    if isinstance(obj, dict):
437        loc = obj.get('location', None)
438        if loc is not None:
439            if loc.startswith("keep:"):
440                locations.append(loc[5:])
441
442        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
443        if docker_image is not None:
444            ds = docker_image.split(":", 1)
445            tag = ds[1] if len(ds)==2 else 'latest'
446            docker_images[ds[0]] = tag
447
448        for x in obj:
449            workflow_collections(obj[x], locations, docker_images)
450    elif isinstance(obj, list):
451        for x in obj:
452            workflow_collections(x, locations, docker_images)
def copy_collections(obj, src, dst, args):
466def copy_collections(obj, src, dst, args):
467
468    def copy_collection_fn(collection_match):
469        """Helper function for regex substitution: copies a single collection,
470        identified by the collection_match MatchObject, to the
471        destination.  Returns the destination collection uuid (or the
472        portable data hash if that's what src_id is).
473
474        """
475        src_id = collection_match.group(0)
476        if src_id not in collections_copied:
477            dst_col = copy_collection(src_id, src, dst, args)
478            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
479                collections_copied[src_id] = src_id
480            else:
481                collections_copied[src_id] = dst_col['uuid']
482        return collections_copied[src_id]
483
484    if isinstance(obj, str):
485        # Copy any collections identified in this string to dst, replacing
486        # them with the dst uuids as necessary.
487        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
488        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
489        return obj
490    elif isinstance(obj, dict):
491        return type(obj)((v, copy_collections(obj[v], src, dst, args))
492                         for v in obj)
493    elif isinstance(obj, list):
494        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
495    return obj
def total_collection_size(manifest_text):
498def total_collection_size(manifest_text):
499    """Return the total number of bytes in this collection (excluding
500    duplicate blocks)."""
501
502    total_bytes = 0
503    locators_seen = {}
504    for line in manifest_text.splitlines():
505        words = line.split()
506        for word in words[1:]:
507            try:
508                loc = arvados.KeepLocator(word)
509            except ValueError:
510                continue  # this word isn't a locator, skip it
511            if loc.md5sum not in locators_seen:
512                locators_seen[loc.md5sum] = True
513                total_bytes += loc.size
514
515    return total_bytes

Return the total number of bytes in this collection (excluding duplicate blocks).

def create_collection_from(c, src, dst, args):
517def create_collection_from(c, src, dst, args):
518    """Create a new collection record on dst, and copy Docker metadata if
519    available."""
520
521    collection_uuid = c['uuid']
522    body = {}
523    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
524        body[d] = c[d]
525
526    if not body["name"]:
527        body['name'] = "copied from " + collection_uuid
528
529    if args.storage_classes:
530        body['storage_classes_desired'] = args.storage_classes
531
532    body['owner_uuid'] = args.project_uuid
533
534    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
535
536    # Create docker_image_repo+tag and docker_image_hash links
537    # at the destination.
538    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
539        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
540
541        for src_link in docker_links:
542            body = {key: src_link[key]
543                    for key in ['link_class', 'name', 'properties']}
544            body['head_uuid'] = dst_collection['uuid']
545            body['owner_uuid'] = args.project_uuid
546
547            lk = dst.links().create(body=body).execute(num_retries=args.retries)
548            logger.debug('created dst link {}'.format(lk))
549
550    return dst_collection

Create a new collection record on dst, and copy Docker metadata if available.

def copy_collection(obj_uuid, src, dst, args):
574def copy_collection(obj_uuid, src, dst, args):
575    if arvados.util.keep_locator_pattern.match(obj_uuid):
576        # If the obj_uuid is a portable data hash, it might not be
577        # uniquely identified with a particular collection.  As a
578        # result, it is ambiguous as to what name to use for the copy.
579        # Apply some heuristics to pick which collection to get the
580        # name from.
581        srccol = src.collections().list(
582            filters=[['portable_data_hash', '=', obj_uuid]],
583            order="created_at asc"
584            ).execute(num_retries=args.retries)
585
586        items = srccol.get("items")
587
588        if not items:
589            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
590            return
591
592        c = None
593
594        if len(items) == 1:
595            # There's only one collection with the PDH, so use that.
596            c = items[0]
597        if not c:
598            # See if there is a collection that's in the same project
599            # as the root item (usually a workflow) being copied.
600            for i in items:
601                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
602                    c = i
603                    break
604        if not c:
605            # Didn't find any collections located in the same project, so
606            # pick the oldest collection that has a name assigned to it.
607            for i in items:
608                if i.get("name"):
609                    c = i
610                    break
611        if not c:
612            # None of the collections have names (?!), so just pick the
613            # first one.
614            c = items[0]
615
616        # list() doesn't return manifest text (and we don't want it to,
617        # because we don't need the same maninfest text sent to us 50
618        # times) so go and retrieve the collection object directly
619        # which will include the manifest text.
620        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
621    else:
622        # Assume this is an actual collection uuid, so fetch it directly.
623        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
624
625    # If a collection with this hash already exists at the
626    # destination, and 'force' is not true, just return that
627    # collection.
628    if not args.force:
629        if 'portable_data_hash' in c:
630            colhash = c['portable_data_hash']
631        else:
632            colhash = c['uuid']
633        dstcol = dst.collections().list(
634            filters=[['portable_data_hash', '=', colhash]]
635        ).execute(num_retries=args.retries)
636        if dstcol['items_available'] > 0:
637            for d in dstcol['items']:
638                if ((args.project_uuid == d['owner_uuid']) and
639                    (c.get('name') == d['name']) and
640                    (c['portable_data_hash'] == d['portable_data_hash'])):
641                    return d
642            c['manifest_text'] = dst.collections().get(
643                uuid=dstcol['items'][0]['uuid']
644            ).execute(num_retries=args.retries)['manifest_text']
645            return create_collection_from(c, src, dst, args)
646
647    if args.replication is None:
648        # Obtain default or fallback collection replication setting on the
649        # destination
650        try:
651            args.replication = int(dst.config()["Collections"]["DefaultReplication"])
652        except (KeyError, TypeError, ValueError):
653            args.replication = 2
654
655    # Fetch the collection's manifest.
656    manifest = c['manifest_text']
657    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
658
659    # Copy each block from src_keep to dst_keep.
660    # Use the newly signed locators returned from dst_keep to build
661    # a new manifest as we go.
662    src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
663    dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
664    dst_manifest = io.StringIO()
665    dst_locators = {}
666    bytes_written = 0
667    bytes_expected = total_collection_size(manifest)
668    if args.progress:
669        progress_writer = ProgressWriter(human_progress)
670    else:
671        progress_writer = None
672
673    # go through the words
674    # put each block loc into 'get' queue
675    # 'get' threads get block and put it into 'put' queue
676    # 'put' threads put block and then update dst_locators
677    #
678    # after going through the whole manifest we go back through it
679    # again and build dst_manifest
680
681    lock = threading.Lock()
682
683    # the get queue should be unbounded because we'll add all the
684    # block hashes we want to get, but these are small
685    get_queue = queue.Queue()
686
687    threadcount = 4
688
689    # the put queue contains full data blocks
690    # and if 'get' is faster than 'put' we could end up consuming
691    # a great deal of RAM if it isn't bounded.
692    put_queue = queue.Queue(threadcount)
693    transfer_error = []
694
695    def get_thread():
696        while True:
697            word = get_queue.get()
698            if word is None:
699                put_queue.put(None)
700                get_queue.task_done()
701                return
702
703            blockhash = arvados.KeepLocator(word).md5sum
704            with lock:
705                if blockhash in dst_locators:
706                    # Already uploaded
707                    get_queue.task_done()
708                    continue
709
710            try:
711                logger.debug("Getting block %s", word)
712                data = src_keep.get(word)
713                put_queue.put((word, data))
714            except Exception as e:
715                logger.error("Error getting block %s: %s", word, e)
716                transfer_error.append(e)
717                try:
718                    # Drain the 'get' queue so we end early
719                    while True:
720                        get_queue.get(False)
721                        get_queue.task_done()
722                except queue.Empty:
723                    pass
724            finally:
725                get_queue.task_done()
726
727    def put_thread():
728        nonlocal bytes_written
729        while True:
730            item = put_queue.get()
731            if item is None:
732                put_queue.task_done()
733                return
734
735            word, data = item
736            loc = arvados.KeepLocator(word)
737            blockhash = loc.md5sum
738            with lock:
739                if blockhash in dst_locators:
740                    # Already uploaded
741                    put_queue.task_done()
742                    continue
743
744            try:
745                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
746                dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or []))
747                with lock:
748                    dst_locators[blockhash] = dst_locator
749                    bytes_written += loc.size
750                    if progress_writer:
751                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
752            except Exception as e:
753                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
754                try:
755                    # Drain the 'get' queue so we end early
756                    while True:
757                        get_queue.get(False)
758                        get_queue.task_done()
759                except queue.Empty:
760                    pass
761                transfer_error.append(e)
762            finally:
763                put_queue.task_done()
764
765    for line in manifest.splitlines():
766        words = line.split()
767        for word in words[1:]:
768            try:
769                loc = arvados.KeepLocator(word)
770            except ValueError:
771                # If 'word' can't be parsed as a locator,
772                # presume it's a filename.
773                continue
774
775            get_queue.put(word)
776
777    for i in range(0, threadcount):
778        get_queue.put(None)
779
780    for i in range(0, threadcount):
781        threading.Thread(target=get_thread, daemon=True).start()
782
783    for i in range(0, threadcount):
784        threading.Thread(target=put_thread, daemon=True).start()
785
786    get_queue.join()
787    put_queue.join()
788
789    if len(transfer_error) > 0:
790        return {"error_token": "Failed to transfer blocks"}
791
792    for line in manifest.splitlines():
793        words = line.split()
794        dst_manifest.write(words[0])
795        for word in words[1:]:
796            try:
797                loc = arvados.KeepLocator(word)
798            except ValueError:
799                # If 'word' can't be parsed as a locator,
800                # presume it's a filename.
801                dst_manifest.write(' ')
802                dst_manifest.write(word)
803                continue
804            blockhash = loc.md5sum
805            dst_manifest.write(' ')
806            dst_manifest.write(dst_locators[blockhash])
807        dst_manifest.write("\n")
808
809    if progress_writer:
810        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
811        progress_writer.finish()
812
813    # Copy the manifest and save the collection.
814    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
815
816    c['manifest_text'] = dst_manifest.getvalue()
817    return create_collection_from(c, src, dst, args)
def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
819def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
820    """Copy the docker image identified by docker_image and
821    docker_image_tag from src to dst. Create appropriate
822    docker_image_repo+tag and docker_image_hash links at dst.
823
824    """
825
826    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
827
828    # Find the link identifying this docker image.
829    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
830        src, args.retries, docker_image, docker_image_tag)
831    if docker_image_list:
832        image_uuid, image_info = docker_image_list[0]
833        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
834
835        # Copy the collection it refers to.
836        dst_image_col = copy_collection(image_uuid, src, dst, args)
837    elif arvados.util.keep_locator_pattern.match(docker_image):
838        dst_image_col = copy_collection(docker_image, src, dst, args)
839    else:
840        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))

Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.

def copy_project(obj_uuid, src, dst, owner_uuid, args):
842def copy_project(obj_uuid, src, dst, owner_uuid, args):
843
844    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
845
846    # Create/update the destination project
847    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
848                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
849    if len(existing["items"]) == 0:
850        project_record = dst.groups().create(body={"group": {"group_class": "project",
851                                                             "owner_uuid": owner_uuid,
852                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
853    else:
854        project_record = existing["items"][0]
855
856    dst.groups().update(uuid=project_record["uuid"],
857                        body={"group": {
858                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
859
860    args.project_uuid = project_record["uuid"]
861
862    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
863
864
865    partial_error = ""
866
867    # Copy collections
868    try:
869        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
870                         src, dst, args)
871    except Exception as e:
872        partial_error += "\n" + str(e)
873
874    # Copy workflows
875    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
876        try:
877            copy_workflow(w["uuid"], src, dst, args)
878        except Exception as e:
879            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
880
881    if args.recursive:
882        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
883            try:
884                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
885            except Exception as e:
886                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
887
888    project_record["partial_error"] = partial_error
889
890    return project_record
def git_rev_parse(rev, repo):
898def git_rev_parse(rev, repo):
899    proc = subprocess.run(
900        ['git', 'rev-parse', rev],
901        check=True,
902        cwd=repo,
903        stdout=subprocess.PIPE,
904        text=True,
905    )
906    return proc.stdout.read().strip()
def uuid_type(api, object_uuid):
918def uuid_type(api, object_uuid):
919    if re.match(arvados.util.keep_locator_pattern, object_uuid):
920        return 'Collection'
921
922    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
923        return 'httpURL'
924
925    p = object_uuid.split('-')
926    if len(p) == 3:
927        type_prefix = p[1]
928        for k in api._schema.schemas:
929            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
930            if type_prefix == obj_class:
931                return k
932    return None
def copy_from_http(url, src, dst, args):
935def copy_from_http(url, src, dst, args):
936
937    project_uuid = args.project_uuid
938    # Ensure string of varying parameters is well-formed
939    prefer_cached_downloads = args.prefer_cached_downloads
940
941    cached = http_to_keep.check_cached_url(src, project_uuid, url, {},
942                                           varying_url_params=args.varying_url_params,
943                                           prefer_cached_downloads=prefer_cached_downloads)
944    if cached[2] is not None:
945        return copy_collection(cached[2], src, dst, args)
946
947    cached = http_to_keep.http_to_keep(dst, project_uuid, url,
948                                       varying_url_params=args.varying_url_params,
949                                       prefer_cached_downloads=prefer_cached_downloads)
950
951    if cached is not None:
952        return {"uuid": cached[2]}
def abort(msg, code=1):
955def abort(msg, code=1):
956    logger.info("arv-copy: %s", msg)
957    exit(code)
def machine_progress(obj_uuid, bytes_written, bytes_expected):
966def machine_progress(obj_uuid, bytes_written, bytes_expected):
967    return "{} {}: {} {} written {} total\n".format(
968        sys.argv[0],
969        os.getpid(),
970        obj_uuid,
971        bytes_written,
972        -1 if (bytes_expected is None) else bytes_expected)
def human_progress(obj_uuid, bytes_written, bytes_expected):
974def human_progress(obj_uuid, bytes_written, bytes_expected):
975    if bytes_expected:
976        return "\r{}: {}M / {}M {:.1%} ".format(
977            obj_uuid,
978            bytes_written >> 20, bytes_expected >> 20,
979            float(bytes_written) / bytes_expected)
980    else:
981        return "\r{}: {} ".format(obj_uuid, bytes_written)
class ProgressWriter:
983class ProgressWriter(object):
984    _progress_func = None
985    outfile = sys.stderr
986
987    def __init__(self, progress_func):
988        self._progress_func = progress_func
989
990    def report(self, obj_uuid, bytes_written, bytes_expected):
991        if self._progress_func is not None:
992            self.outfile.write(
993                self._progress_func(obj_uuid, bytes_written, bytes_expected))
994
995    def finish(self):
996        self.outfile.write("\n")
ProgressWriter(progress_func)
987    def __init__(self, progress_func):
988        self._progress_func = progress_func
outfile = <_io.StringIO object>
def report(self, obj_uuid, bytes_written, bytes_expected):
990    def report(self, obj_uuid, bytes_written, bytes_expected):
991        if self._progress_func is not None:
992            self.outfile.write(
993                self._progress_func(obj_uuid, bytes_written, bytes_expected))
def finish(self):
995    def finish(self):
996        self.outfile.write("\n")