arvados.commands.arv_copy

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5# arv-copy [--recursive] [--no-recursive] object-uuid
  6#
  7# Copies an object from Arvados instance src to instance dst.
  8#
  9# By default, arv-copy recursively copies any dependent objects
 10# necessary to make the object functional in the new instance
 11# (e.g. for a workflow, arv-copy copies the workflow,
 12# input collections, and docker images). If
 13# --no-recursive is given, arv-copy copies only the single record
 14# identified by object-uuid.
 15#
 16# The user must have files $HOME/.config/arvados/{src}.conf and
 17# $HOME/.config/arvados/{dst}.conf with valid login credentials for
 18# instances src and dst.  If either of these files is not found,
 19# arv-copy will issue an error.
 20
 21from __future__ import division
 22from future import standard_library
 23from future.utils import listvalues
 24standard_library.install_aliases()
 25from past.builtins import basestring
 26from builtins import object
 27import argparse
 28import contextlib
 29import getpass
 30import os
 31import re
 32import shutil
 33import subprocess
 34import sys
 35import logging
 36import tempfile
 37import urllib.parse
 38import io
 39import json
 40import queue
 41import threading
 42
 43import arvados
 44import arvados.config
 45import arvados.keep
 46import arvados.util
 47import arvados.commands._util as arv_cmd
 48import arvados.commands.keepdocker
 49import arvados.http_to_keep
 50import ruamel.yaml as yaml
 51
 52from arvados._version import __version__
 53
 54COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
 55
 56logger = logging.getLogger('arvados.arv-copy')
 57
 58# local_repo_dir records which git repositories from the Arvados source
 59# instance have been checked out locally during this run, and to which
 60# directories.
 61# e.g. if repository 'twp' from src_arv has been cloned into
 62# /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
 63#
 64local_repo_dir = {}
 65
 66# List of collections that have been copied in this session, and their
 67# destination collection UUIDs.
 68collections_copied = {}
 69
 70# Set of (repository, script_version) two-tuples of commits copied in git.
 71scripts_copied = set()
 72
 73# The owner_uuid of the object being copied
 74src_owner_uuid = None
 75
 76def main():
 77    copy_opts = argparse.ArgumentParser(add_help=False)
 78
 79    copy_opts.add_argument(
 80        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
 81        help='Print version and exit.')
 82    copy_opts.add_argument(
 83        '-v', '--verbose', dest='verbose', action='store_true',
 84        help='Verbose output.')
 85    copy_opts.add_argument(
 86        '--progress', dest='progress', action='store_true',
 87        help='Report progress on copying collections. (default)')
 88    copy_opts.add_argument(
 89        '--no-progress', dest='progress', action='store_false',
 90        help='Do not report progress on copying collections.')
 91    copy_opts.add_argument(
 92        '-f', '--force', dest='force', action='store_true',
 93        help='Perform copy even if the object appears to exist at the remote destination.')
 94    copy_opts.add_argument(
 95        '--src', dest='source_arvados',
 96        help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
 97    copy_opts.add_argument(
 98        '--dst', dest='destination_arvados',
 99        help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
100    copy_opts.add_argument(
101        '--recursive', dest='recursive', action='store_true',
102        help='Recursively copy any dependencies for this object, and subprojects. (default)')
103    copy_opts.add_argument(
104        '--no-recursive', dest='recursive', action='store_false',
105        help='Do not copy any dependencies or subprojects.')
106    copy_opts.add_argument(
107        '--project-uuid', dest='project_uuid',
108        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
109    copy_opts.add_argument(
110        '--storage-classes', dest='storage_classes',
111        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
112    copy_opts.add_argument("--varying-url-params", type=str, default="",
113                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
114
115    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
116                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
117
118    copy_opts.add_argument(
119        'object_uuid',
120        help='The UUID of the object to be copied.')
121    copy_opts.set_defaults(progress=True)
122    copy_opts.set_defaults(recursive=True)
123
124    parser = argparse.ArgumentParser(
125        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
126        parents=[copy_opts, arv_cmd.retry_opt])
127    args = parser.parse_args()
128
129    if args.storage_classes:
130        args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
131
132    if args.verbose:
133        logger.setLevel(logging.DEBUG)
134    else:
135        logger.setLevel(logging.INFO)
136
137    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
138        args.source_arvados = args.object_uuid[:5]
139
140    # Create API clients for the source and destination instances
141    src_arv = api_for_instance(args.source_arvados, args.retries)
142    dst_arv = api_for_instance(args.destination_arvados, args.retries)
143
144    if not args.project_uuid:
145        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
146
147    # Identify the kind of object we have been given, and begin copying.
148    t = uuid_type(src_arv, args.object_uuid)
149
150    try:
151        if t == 'Collection':
152            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
153            result = copy_collection(args.object_uuid,
154                                     src_arv, dst_arv,
155                                     args)
156        elif t == 'Workflow':
157            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
159        elif t == 'Group':
160            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
161            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
162        elif t == 'httpURL':
163            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
164        else:
165            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
166    except Exception as e:
167        logger.error("%s", e, exc_info=args.verbose)
168        exit(1)
169
170    # Clean up any outstanding temp git repositories.
171    for d in listvalues(local_repo_dir):
172        shutil.rmtree(d, ignore_errors=True)
173
174    if not result:
175        exit(1)
176
177    # If no exception was thrown and the response does not have an
178    # error_token field, presume success
179    if result is None or 'error_token' in result or 'uuid' not in result:
180        if result:
181            logger.error("API server returned an error result: {}".format(result))
182        exit(1)
183
184    print(result['uuid'])
185
186    if result.get('partial_error'):
187        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
188        exit(1)
189
190    logger.info("Success: created copy with uuid {}".format(result['uuid']))
191    exit(0)
192
193def set_src_owner_uuid(resource, uuid, args):
194    global src_owner_uuid
195    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
196    src_owner_uuid = c.get("owner_uuid")
197
198# api_for_instance(instance_name)
199#
200#     Creates an API client for the Arvados instance identified by
201#     instance_name.
202#
203#     If instance_name contains a slash, it is presumed to be a path
204#     (either local or absolute) to a file with Arvados configuration
205#     settings.
206#
207#     Otherwise, it is presumed to be the name of a file in
208#     $HOME/.config/arvados/instance_name.conf
209#
210def api_for_instance(instance_name, num_retries):
211    if not instance_name:
212        # Use environment
213        return arvados.api('v1')
214
215    if '/' in instance_name:
216        config_file = instance_name
217    else:
218        config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
219
220    try:
221        cfg = arvados.config.load(config_file)
222    except (IOError, OSError) as e:
223        abort(("Could not open config file {}: {}\n" +
224               "You must make sure that your configuration tokens\n" +
225               "for Arvados instance {} are in {} and that this\n" +
226               "file is readable.").format(
227                   config_file, e, instance_name, config_file))
228
229    if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
230        api_is_insecure = (
231            cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
232                ['1', 't', 'true', 'y', 'yes']))
233        client = arvados.api('v1',
234                             host=cfg['ARVADOS_API_HOST'],
235                             token=cfg['ARVADOS_API_TOKEN'],
236                             insecure=api_is_insecure,
237                             num_retries=num_retries,
238                             )
239    else:
240        abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
241    return client
242
243# Check if git is available
244def check_git_availability():
245    try:
246        subprocess.run(
247            ['git', '--version'],
248            check=True,
249            stdout=subprocess.DEVNULL,
250        )
251    except FileNotFoundError:
252        abort('git command is not available. Please ensure git is installed.')
253
254
255def filter_iter(arg):
256    """Iterate a filter string-or-list.
257
258    Pass in a filter field that can either be a string or list.
259    This will iterate elements as if the field had been written as a list.
260    """
261    if isinstance(arg, basestring):
262        return iter((arg,))
263    else:
264        return iter(arg)
265
266def migrate_repository_filter(repo_filter, src_repository, dst_repository):
267    """Update a single repository filter in-place for the destination.
268
269    If the filter checks that the repository is src_repository, it is
270    updated to check that the repository is dst_repository.  If it does
271    anything else, this function raises ValueError.
272    """
273    if src_repository is None:
274        raise ValueError("component does not specify a source repository")
275    elif dst_repository is None:
276        raise ValueError("no destination repository specified to update repository filter")
277    elif repo_filter[1:] == ['=', src_repository]:
278        repo_filter[2] = dst_repository
279    elif repo_filter[1:] == ['in', [src_repository]]:
280        repo_filter[2] = [dst_repository]
281    else:
282        raise ValueError("repository filter is not a simple source match")
283
284def migrate_script_version_filter(version_filter):
285    """Update a single script_version filter in-place for the destination.
286
287    Currently this function checks that all the filter operands are Git
288    commit hashes.  If they're not, it raises ValueError to indicate that
289    the filter is not portable.  It could be extended to make other
290    transformations in the future.
291    """
292    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
293        raise ValueError("script_version filter is not limited to commit hashes")
294
295def attr_filtered(filter_, *attr_names):
296    """Return True if filter_ applies to any of attr_names, else False."""
297    return any((name == 'any') or (name in attr_names)
298               for name in filter_iter(filter_[0]))
299
300@contextlib.contextmanager
301def exception_handler(handler, *exc_types):
302    """If any exc_types are raised in the block, call handler on the exception."""
303    try:
304        yield
305    except exc_types as error:
306        handler(error)
307
308
309# copy_workflow(wf_uuid, src, dst, args)
310#
311#    Copies a workflow identified by wf_uuid from src to dst.
312#
313#    If args.recursive is True, also copy any collections
314#      referenced in the workflow definition yaml.
315#
316#    The owner_uuid of the new workflow is set to any given
317#      project_uuid or the user who copied the template.
318#
319#    Returns the copied workflow object.
320#
321def copy_workflow(wf_uuid, src, dst, args):
322    # fetch the workflow from the source instance
323    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
324
325    if not wf["definition"]:
326        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
327
328    # copy collections and docker images
329    if args.recursive and wf["definition"]:
330        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
331               "ARVADOS_API_TOKEN": src.api_token,
332               "PATH": os.environ["PATH"]}
333        try:
334            result = subprocess.run(
335                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
336                env=env,
337                stdout=subprocess.PIPE,
338                universal_newlines=True,
339            )
340        except FileNotFoundError:
341            no_arv_copy = True
342        else:
343            no_arv_copy = result.returncode == 2
344
345        if no_arv_copy:
346            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
347        elif result.returncode != 0:
348            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
349
350        locations = json.loads(result.stdout)
351
352        if locations:
353            copy_collections(locations, src, dst, args)
354
355    # copy the workflow itself
356    del wf['uuid']
357    wf['owner_uuid'] = args.project_uuid
358
359    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
360                                             ["name", "=", wf["name"]]]).execute()
361    if len(existing["items"]) == 0:
362        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
363    else:
364        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
365
366
367def workflow_collections(obj, locations, docker_images):
368    if isinstance(obj, dict):
369        loc = obj.get('location', None)
370        if loc is not None:
371            if loc.startswith("keep:"):
372                locations.append(loc[5:])
373
374        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
375        if docker_image is not None:
376            ds = docker_image.split(":", 1)
377            tag = ds[1] if len(ds)==2 else 'latest'
378            docker_images[ds[0]] = tag
379
380        for x in obj:
381            workflow_collections(obj[x], locations, docker_images)
382    elif isinstance(obj, list):
383        for x in obj:
384            workflow_collections(x, locations, docker_images)
385
386# copy_collections(obj, src, dst, args)
387#
388#    Recursively copies all collections referenced by 'obj' from src
389#    to dst.  obj may be a dict or a list, in which case we run
390#    copy_collections on every value it contains. If it is a string,
391#    search it for any substring that matches a collection hash or uuid
392#    (this will find hidden references to collections like
393#      "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)")
394#
395#    Returns a copy of obj with any old collection uuids replaced by
396#    the new ones.
397#
398def copy_collections(obj, src, dst, args):
399
400    def copy_collection_fn(collection_match):
401        """Helper function for regex substitution: copies a single collection,
402        identified by the collection_match MatchObject, to the
403        destination.  Returns the destination collection uuid (or the
404        portable data hash if that's what src_id is).
405
406        """
407        src_id = collection_match.group(0)
408        if src_id not in collections_copied:
409            dst_col = copy_collection(src_id, src, dst, args)
410            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
411                collections_copied[src_id] = src_id
412            else:
413                collections_copied[src_id] = dst_col['uuid']
414        return collections_copied[src_id]
415
416    if isinstance(obj, basestring):
417        # Copy any collections identified in this string to dst, replacing
418        # them with the dst uuids as necessary.
419        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
420        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
421        return obj
422    elif isinstance(obj, dict):
423        return type(obj)((v, copy_collections(obj[v], src, dst, args))
424                         for v in obj)
425    elif isinstance(obj, list):
426        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
427    return obj
428
429
430def total_collection_size(manifest_text):
431    """Return the total number of bytes in this collection (excluding
432    duplicate blocks)."""
433
434    total_bytes = 0
435    locators_seen = {}
436    for line in manifest_text.splitlines():
437        words = line.split()
438        for word in words[1:]:
439            try:
440                loc = arvados.KeepLocator(word)
441            except ValueError:
442                continue  # this word isn't a locator, skip it
443            if loc.md5sum not in locators_seen:
444                locators_seen[loc.md5sum] = True
445                total_bytes += loc.size
446
447    return total_bytes
448
449def create_collection_from(c, src, dst, args):
450    """Create a new collection record on dst, and copy Docker metadata if
451    available."""
452
453    collection_uuid = c['uuid']
454    body = {}
455    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
456        body[d] = c[d]
457
458    if not body["name"]:
459        body['name'] = "copied from " + collection_uuid
460
461    if args.storage_classes:
462        body['storage_classes_desired'] = args.storage_classes
463
464    body['owner_uuid'] = args.project_uuid
465
466    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
467
468    # Create docker_image_repo+tag and docker_image_hash links
469    # at the destination.
470    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
471        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
472
473        for src_link in docker_links:
474            body = {key: src_link[key]
475                    for key in ['link_class', 'name', 'properties']}
476            body['head_uuid'] = dst_collection['uuid']
477            body['owner_uuid'] = args.project_uuid
478
479            lk = dst.links().create(body=body).execute(num_retries=args.retries)
480            logger.debug('created dst link {}'.format(lk))
481
482    return dst_collection
483
484# copy_collection(obj_uuid, src, dst, args)
485#
486#    Copies the collection identified by obj_uuid from src to dst.
487#    Returns the collection object created at dst.
488#
489#    If args.progress is True, produce a human-friendly progress
490#    report.
491#
492#    If a collection with the desired portable_data_hash already
493#    exists at dst, and args.force is False, copy_collection returns
494#    the existing collection without copying any blocks.  Otherwise
495#    (if no collection exists or if args.force is True)
496#    copy_collection copies all of the collection data blocks from src
497#    to dst.
498#
499#    For this application, it is critical to preserve the
500#    collection's manifest hash, which is not guaranteed with the
501#    arvados.CollectionReader and arvados.CollectionWriter classes.
502#    Copying each block in the collection manually, followed by
503#    the manifest block, ensures that the collection's manifest
504#    hash will not change.
505#
506def copy_collection(obj_uuid, src, dst, args):
507    if arvados.util.keep_locator_pattern.match(obj_uuid):
508        # If the obj_uuid is a portable data hash, it might not be
509        # uniquely identified with a particular collection.  As a
510        # result, it is ambiguous as to what name to use for the copy.
511        # Apply some heuristics to pick which collection to get the
512        # name from.
513        srccol = src.collections().list(
514            filters=[['portable_data_hash', '=', obj_uuid]],
515            order="created_at asc"
516            ).execute(num_retries=args.retries)
517
518        items = srccol.get("items")
519
520        if not items:
521            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
522            return
523
524        c = None
525
526        if len(items) == 1:
527            # There's only one collection with the PDH, so use that.
528            c = items[0]
529        if not c:
530            # See if there is a collection that's in the same project
531            # as the root item (usually a workflow) being copied.
532            for i in items:
533                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
534                    c = i
535                    break
536        if not c:
537            # Didn't find any collections located in the same project, so
538            # pick the oldest collection that has a name assigned to it.
539            for i in items:
540                if i.get("name"):
541                    c = i
542                    break
543        if not c:
544            # None of the collections have names (?!), so just pick the
545            # first one.
546            c = items[0]
547
548        # list() doesn't return manifest text (and we don't want it to,
549        # because we don't need the same maninfest text sent to us 50
550        # times) so go and retrieve the collection object directly
551        # which will include the manifest text.
552        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
553    else:
554        # Assume this is an actual collection uuid, so fetch it directly.
555        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
556
557    # If a collection with this hash already exists at the
558    # destination, and 'force' is not true, just return that
559    # collection.
560    if not args.force:
561        if 'portable_data_hash' in c:
562            colhash = c['portable_data_hash']
563        else:
564            colhash = c['uuid']
565        dstcol = dst.collections().list(
566            filters=[['portable_data_hash', '=', colhash]]
567        ).execute(num_retries=args.retries)
568        if dstcol['items_available'] > 0:
569            for d in dstcol['items']:
570                if ((args.project_uuid == d['owner_uuid']) and
571                    (c.get('name') == d['name']) and
572                    (c['portable_data_hash'] == d['portable_data_hash'])):
573                    return d
574            c['manifest_text'] = dst.collections().get(
575                uuid=dstcol['items'][0]['uuid']
576            ).execute(num_retries=args.retries)['manifest_text']
577            return create_collection_from(c, src, dst, args)
578
579    # Fetch the collection's manifest.
580    manifest = c['manifest_text']
581    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
582
583    # Copy each block from src_keep to dst_keep.
584    # Use the newly signed locators returned from dst_keep to build
585    # a new manifest as we go.
586    src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
587    dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
588    dst_manifest = io.StringIO()
589    dst_locators = {}
590    bytes_written = 0
591    bytes_expected = total_collection_size(manifest)
592    if args.progress:
593        progress_writer = ProgressWriter(human_progress)
594    else:
595        progress_writer = None
596
597    # go through the words
598    # put each block loc into 'get' queue
599    # 'get' threads get block and put it into 'put' queue
600    # 'put' threads put block and then update dst_locators
601    #
602    # after going through the whole manifest we go back through it
603    # again and build dst_manifest
604
605    lock = threading.Lock()
606
607    # the get queue should be unbounded because we'll add all the
608    # block hashes we want to get, but these are small
609    get_queue = queue.Queue()
610
611    threadcount = 4
612
613    # the put queue contains full data blocks
614    # and if 'get' is faster than 'put' we could end up consuming
615    # a great deal of RAM if it isn't bounded.
616    put_queue = queue.Queue(threadcount)
617    transfer_error = []
618
619    def get_thread():
620        while True:
621            word = get_queue.get()
622            if word is None:
623                put_queue.put(None)
624                get_queue.task_done()
625                return
626
627            blockhash = arvados.KeepLocator(word).md5sum
628            with lock:
629                if blockhash in dst_locators:
630                    # Already uploaded
631                    get_queue.task_done()
632                    continue
633
634            try:
635                logger.debug("Getting block %s", word)
636                data = src_keep.get(word)
637                put_queue.put((word, data))
638            except e:
639                logger.error("Error getting block %s: %s", word, e)
640                transfer_error.append(e)
641                try:
642                    # Drain the 'get' queue so we end early
643                    while True:
644                        get_queue.get(False)
645                        get_queue.task_done()
646                except queue.Empty:
647                    pass
648            finally:
649                get_queue.task_done()
650
651    def put_thread():
652        nonlocal bytes_written
653        while True:
654            item = put_queue.get()
655            if item is None:
656                put_queue.task_done()
657                return
658
659            word, data = item
660            loc = arvados.KeepLocator(word)
661            blockhash = loc.md5sum
662            with lock:
663                if blockhash in dst_locators:
664                    # Already uploaded
665                    put_queue.task_done()
666                    continue
667
668            try:
669                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
670                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
671                with lock:
672                    dst_locators[blockhash] = dst_locator
673                    bytes_written += loc.size
674                    if progress_writer:
675                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
676            except e:
677                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
678                try:
679                    # Drain the 'get' queue so we end early
680                    while True:
681                        get_queue.get(False)
682                        get_queue.task_done()
683                except queue.Empty:
684                    pass
685                transfer_error.append(e)
686            finally:
687                put_queue.task_done()
688
689    for line in manifest.splitlines():
690        words = line.split()
691        for word in words[1:]:
692            try:
693                loc = arvados.KeepLocator(word)
694            except ValueError:
695                # If 'word' can't be parsed as a locator,
696                # presume it's a filename.
697                continue
698
699            get_queue.put(word)
700
701    for i in range(0, threadcount):
702        get_queue.put(None)
703
704    for i in range(0, threadcount):
705        threading.Thread(target=get_thread, daemon=True).start()
706
707    for i in range(0, threadcount):
708        threading.Thread(target=put_thread, daemon=True).start()
709
710    get_queue.join()
711    put_queue.join()
712
713    if len(transfer_error) > 0:
714        return {"error_token": "Failed to transfer blocks"}
715
716    for line in manifest.splitlines():
717        words = line.split()
718        dst_manifest.write(words[0])
719        for word in words[1:]:
720            try:
721                loc = arvados.KeepLocator(word)
722            except ValueError:
723                # If 'word' can't be parsed as a locator,
724                # presume it's a filename.
725                dst_manifest.write(' ')
726                dst_manifest.write(word)
727                continue
728            blockhash = loc.md5sum
729            dst_manifest.write(' ')
730            dst_manifest.write(dst_locators[blockhash])
731        dst_manifest.write("\n")
732
733    if progress_writer:
734        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
735        progress_writer.finish()
736
737    # Copy the manifest and save the collection.
738    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
739
740    c['manifest_text'] = dst_manifest.getvalue()
741    return create_collection_from(c, src, dst, args)
742
743def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
744    r = api.repositories().list(
745        filters=[['name', '=', repo_name]]).execute(num_retries=retries)
746    if r['items_available'] != 1:
747        raise Exception('cannot identify repo {}; {} repos found'
748                        .format(repo_name, r['items_available']))
749
750    https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
751    http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
752    other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
753
754    priority = https_url + other_url + http_url
755
756    for url in priority:
757        if url.startswith("http"):
758            u = urllib.parse.urlsplit(url)
759            baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
760            git_config = ["-c", "credential.%s/.username=none" % baseurl,
761                          "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
762        else:
763            git_config = []
764
765        try:
766            logger.debug("trying %s", url)
767            subprocess.run(
768                ['git', *git_config, 'ls-remote', url],
769                check=True,
770                env={
771                    'ARVADOS_API_TOKEN': api.api_token,
772                    'GIT_ASKPASS': '/bin/false',
773                    'HOME': os.environ['HOME'],
774                },
775                stdout=subprocess.DEVNULL,
776            )
777        except subprocess.CalledProcessError:
778            pass
779        else:
780            git_url = url
781            break
782    else:
783        raise Exception('Cannot access git repository, tried {}'
784                        .format(priority))
785
786    if git_url.startswith("http:"):
787        if allow_insecure_http:
788            logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
789        else:
790            raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
791
792    return (git_url, git_config)
793
794
795def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
796    """Copy the docker image identified by docker_image and
797    docker_image_tag from src to dst. Create appropriate
798    docker_image_repo+tag and docker_image_hash links at dst.
799
800    """
801
802    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
803
804    # Find the link identifying this docker image.
805    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
806        src, args.retries, docker_image, docker_image_tag)
807    if docker_image_list:
808        image_uuid, image_info = docker_image_list[0]
809        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
810
811        # Copy the collection it refers to.
812        dst_image_col = copy_collection(image_uuid, src, dst, args)
813    elif arvados.util.keep_locator_pattern.match(docker_image):
814        dst_image_col = copy_collection(docker_image, src, dst, args)
815    else:
816        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
817
818def copy_project(obj_uuid, src, dst, owner_uuid, args):
819
820    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
821
822    # Create/update the destination project
823    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
824                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
825    if len(existing["items"]) == 0:
826        project_record = dst.groups().create(body={"group": {"group_class": "project",
827                                                             "owner_uuid": owner_uuid,
828                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
829    else:
830        project_record = existing["items"][0]
831
832    dst.groups().update(uuid=project_record["uuid"],
833                        body={"group": {
834                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
835
836    args.project_uuid = project_record["uuid"]
837
838    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
839
840
841    partial_error = ""
842
843    # Copy collections
844    try:
845        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
846                         src, dst, args)
847    except Exception as e:
848        partial_error += "\n" + str(e)
849
850    # Copy workflows
851    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
852        try:
853            copy_workflow(w["uuid"], src, dst, args)
854        except Exception as e:
855            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
856
857    if args.recursive:
858        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
859            try:
860                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
861            except Exception as e:
862                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
863
864    project_record["partial_error"] = partial_error
865
866    return project_record
867
868# git_rev_parse(rev, repo)
869#
870#    Returns the 40-character commit hash corresponding to 'rev' in
871#    git repository 'repo' (which must be the path of a local git
872#    repository)
873#
874def git_rev_parse(rev, repo):
875    proc = subprocess.run(
876        ['git', 'rev-parse', rev],
877        check=True,
878        cwd=repo,
879        stdout=subprocess.PIPE,
880        text=True,
881    )
882    return proc.stdout.read().strip()
883
884# uuid_type(api, object_uuid)
885#
886#    Returns the name of the class that object_uuid belongs to, based on
887#    the second field of the uuid.  This function consults the api's
888#    schema to identify the object class.
889#
890#    It returns a string such as 'Collection', 'Workflow', etc.
891#
892#    Special case: if handed a Keep locator hash, return 'Collection'.
893#
894def uuid_type(api, object_uuid):
895    if re.match(arvados.util.keep_locator_pattern, object_uuid):
896        return 'Collection'
897
898    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
899        return 'httpURL'
900
901    p = object_uuid.split('-')
902    if len(p) == 3:
903        type_prefix = p[1]
904        for k in api._schema.schemas:
905            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
906            if type_prefix == obj_class:
907                return k
908    return None
909
910
911def copy_from_http(url, src, dst, args):
912
913    project_uuid = args.project_uuid
914    varying_url_params = args.varying_url_params
915    prefer_cached_downloads = args.prefer_cached_downloads
916
917    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
918                                                   varying_url_params=varying_url_params,
919                                                   prefer_cached_downloads=prefer_cached_downloads)
920    if cached[2] is not None:
921        return copy_collection(cached[2], src, dst, args)
922
923    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
924                                               varying_url_params=varying_url_params,
925                                               prefer_cached_downloads=prefer_cached_downloads)
926
927    if cached is not None:
928        return {"uuid": cached[2]}
929
930
931def abort(msg, code=1):
932    logger.info("arv-copy: %s", msg)
933    exit(code)
934
935
936# Code for reporting on the progress of a collection upload.
937# Stolen from arvados.commands.put.ArvPutCollectionWriter
938# TODO(twp): figure out how to refactor into a shared library
939# (may involve refactoring some arvados.commands.arv_copy.copy_collection
940# code)
941
942def machine_progress(obj_uuid, bytes_written, bytes_expected):
943    return "{} {}: {} {} written {} total\n".format(
944        sys.argv[0],
945        os.getpid(),
946        obj_uuid,
947        bytes_written,
948        -1 if (bytes_expected is None) else bytes_expected)
949
950def human_progress(obj_uuid, bytes_written, bytes_expected):
951    if bytes_expected:
952        return "\r{}: {}M / {}M {:.1%} ".format(
953            obj_uuid,
954            bytes_written >> 20, bytes_expected >> 20,
955            float(bytes_written) / bytes_expected)
956    else:
957        return "\r{}: {} ".format(obj_uuid, bytes_written)
958
959class ProgressWriter(object):
960    _progress_func = None
961    outfile = sys.stderr
962
963    def __init__(self, progress_func):
964        self._progress_func = progress_func
965
966    def report(self, obj_uuid, bytes_written, bytes_expected):
967        if self._progress_func is not None:
968            self.outfile.write(
969                self._progress_func(obj_uuid, bytes_written, bytes_expected))
970
971    def finish(self):
972        self.outfile.write("\n")
973
974if __name__ == '__main__':
975    main()
COMMIT_HASH_RE = re.compile('^[0-9a-f]{1,40}$')
logger = <Logger arvados.arv-copy (WARNING)>
local_repo_dir = {}
collections_copied = {}
scripts_copied = set()
src_owner_uuid = None
def main():
 77def main():
 78    copy_opts = argparse.ArgumentParser(add_help=False)
 79
 80    copy_opts.add_argument(
 81        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
 82        help='Print version and exit.')
 83    copy_opts.add_argument(
 84        '-v', '--verbose', dest='verbose', action='store_true',
 85        help='Verbose output.')
 86    copy_opts.add_argument(
 87        '--progress', dest='progress', action='store_true',
 88        help='Report progress on copying collections. (default)')
 89    copy_opts.add_argument(
 90        '--no-progress', dest='progress', action='store_false',
 91        help='Do not report progress on copying collections.')
 92    copy_opts.add_argument(
 93        '-f', '--force', dest='force', action='store_true',
 94        help='Perform copy even if the object appears to exist at the remote destination.')
 95    copy_opts.add_argument(
 96        '--src', dest='source_arvados',
 97        help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will be inferred from the UUID of the object being copied.')
 98    copy_opts.add_argument(
 99        '--dst', dest='destination_arvados',
100        help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.  If not provided, will use ARVADOS_API_HOST from environment.')
101    copy_opts.add_argument(
102        '--recursive', dest='recursive', action='store_true',
103        help='Recursively copy any dependencies for this object, and subprojects. (default)')
104    copy_opts.add_argument(
105        '--no-recursive', dest='recursive', action='store_false',
106        help='Do not copy any dependencies or subprojects.')
107    copy_opts.add_argument(
108        '--project-uuid', dest='project_uuid',
109        help='The UUID of the project at the destination to which the collection or workflow should be copied.')
110    copy_opts.add_argument(
111        '--storage-classes', dest='storage_classes',
112        help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.')
113    copy_opts.add_argument("--varying-url-params", type=str, default="",
114                        help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
115
116    copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False,
117                        help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
118
119    copy_opts.add_argument(
120        'object_uuid',
121        help='The UUID of the object to be copied.')
122    copy_opts.set_defaults(progress=True)
123    copy_opts.set_defaults(recursive=True)
124
125    parser = argparse.ArgumentParser(
126        description='Copy a workflow, collection or project from one Arvados instance to another.  On success, the uuid of the copied object is printed to stdout.',
127        parents=[copy_opts, arv_cmd.retry_opt])
128    args = parser.parse_args()
129
130    if args.storage_classes:
131        args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x]
132
133    if args.verbose:
134        logger.setLevel(logging.DEBUG)
135    else:
136        logger.setLevel(logging.INFO)
137
138    if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid):
139        args.source_arvados = args.object_uuid[:5]
140
141    # Create API clients for the source and destination instances
142    src_arv = api_for_instance(args.source_arvados, args.retries)
143    dst_arv = api_for_instance(args.destination_arvados, args.retries)
144
145    if not args.project_uuid:
146        args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
147
148    # Identify the kind of object we have been given, and begin copying.
149    t = uuid_type(src_arv, args.object_uuid)
150
151    try:
152        if t == 'Collection':
153            set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
154            result = copy_collection(args.object_uuid,
155                                     src_arv, dst_arv,
156                                     args)
157        elif t == 'Workflow':
158            set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
159            result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
160        elif t == 'Group':
161            set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
162            result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
163        elif t == 'httpURL':
164            result = copy_from_http(args.object_uuid, src_arv, dst_arv, args)
165        else:
166            abort("cannot copy object {} of type {}".format(args.object_uuid, t))
167    except Exception as e:
168        logger.error("%s", e, exc_info=args.verbose)
169        exit(1)
170
171    # Clean up any outstanding temp git repositories.
172    for d in listvalues(local_repo_dir):
173        shutil.rmtree(d, ignore_errors=True)
174
175    if not result:
176        exit(1)
177
178    # If no exception was thrown and the response does not have an
179    # error_token field, presume success
180    if result is None or 'error_token' in result or 'uuid' not in result:
181        if result:
182            logger.error("API server returned an error result: {}".format(result))
183        exit(1)
184
185    print(result['uuid'])
186
187    if result.get('partial_error'):
188        logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
189        exit(1)
190
191    logger.info("Success: created copy with uuid {}".format(result['uuid']))
192    exit(0)
def set_src_owner_uuid(resource, uuid, args):
194def set_src_owner_uuid(resource, uuid, args):
195    global src_owner_uuid
196    c = resource.get(uuid=uuid).execute(num_retries=args.retries)
197    src_owner_uuid = c.get("owner_uuid")
def api_for_instance(instance_name, num_retries):
211def api_for_instance(instance_name, num_retries):
212    if not instance_name:
213        # Use environment
214        return arvados.api('v1')
215
216    if '/' in instance_name:
217        config_file = instance_name
218    else:
219        config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
220
221    try:
222        cfg = arvados.config.load(config_file)
223    except (IOError, OSError) as e:
224        abort(("Could not open config file {}: {}\n" +
225               "You must make sure that your configuration tokens\n" +
226               "for Arvados instance {} are in {} and that this\n" +
227               "file is readable.").format(
228                   config_file, e, instance_name, config_file))
229
230    if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
231        api_is_insecure = (
232            cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
233                ['1', 't', 'true', 'y', 'yes']))
234        client = arvados.api('v1',
235                             host=cfg['ARVADOS_API_HOST'],
236                             token=cfg['ARVADOS_API_TOKEN'],
237                             insecure=api_is_insecure,
238                             num_retries=num_retries,
239                             )
240    else:
241        abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
242    return client
def check_git_availability():
245def check_git_availability():
246    try:
247        subprocess.run(
248            ['git', '--version'],
249            check=True,
250            stdout=subprocess.DEVNULL,
251        )
252    except FileNotFoundError:
253        abort('git command is not available. Please ensure git is installed.')
def filter_iter(arg):
256def filter_iter(arg):
257    """Iterate a filter string-or-list.
258
259    Pass in a filter field that can either be a string or list.
260    This will iterate elements as if the field had been written as a list.
261    """
262    if isinstance(arg, basestring):
263        return iter((arg,))
264    else:
265        return iter(arg)

Iterate a filter string-or-list.

Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.

def migrate_repository_filter(repo_filter, src_repository, dst_repository):
267def migrate_repository_filter(repo_filter, src_repository, dst_repository):
268    """Update a single repository filter in-place for the destination.
269
270    If the filter checks that the repository is src_repository, it is
271    updated to check that the repository is dst_repository.  If it does
272    anything else, this function raises ValueError.
273    """
274    if src_repository is None:
275        raise ValueError("component does not specify a source repository")
276    elif dst_repository is None:
277        raise ValueError("no destination repository specified to update repository filter")
278    elif repo_filter[1:] == ['=', src_repository]:
279        repo_filter[2] = dst_repository
280    elif repo_filter[1:] == ['in', [src_repository]]:
281        repo_filter[2] = [dst_repository]
282    else:
283        raise ValueError("repository filter is not a simple source match")

Update a single repository filter in-place for the destination.

If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.

def migrate_script_version_filter(version_filter):
285def migrate_script_version_filter(version_filter):
286    """Update a single script_version filter in-place for the destination.
287
288    Currently this function checks that all the filter operands are Git
289    commit hashes.  If they're not, it raises ValueError to indicate that
290    the filter is not portable.  It could be extended to make other
291    transformations in the future.
292    """
293    if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
294        raise ValueError("script_version filter is not limited to commit hashes")

Update a single script_version filter in-place for the destination.

Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.

def attr_filtered(filter_, *attr_names):
296def attr_filtered(filter_, *attr_names):
297    """Return True if filter_ applies to any of attr_names, else False."""
298    return any((name == 'any') or (name in attr_names)
299               for name in filter_iter(filter_[0]))

Return True if filter_ applies to any of attr_names, else False.

@contextlib.contextmanager
def exception_handler(handler, *exc_types):
301@contextlib.contextmanager
302def exception_handler(handler, *exc_types):
303    """If any exc_types are raised in the block, call handler on the exception."""
304    try:
305        yield
306    except exc_types as error:
307        handler(error)

If any exc_types are raised in the block, call handler on the exception.

def copy_workflow(wf_uuid, src, dst, args):
322def copy_workflow(wf_uuid, src, dst, args):
323    # fetch the workflow from the source instance
324    wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
325
326    if not wf["definition"]:
327        logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
328
329    # copy collections and docker images
330    if args.recursive and wf["definition"]:
331        env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc,
332               "ARVADOS_API_TOKEN": src.api_token,
333               "PATH": os.environ["PATH"]}
334        try:
335            result = subprocess.run(
336                ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid],
337                env=env,
338                stdout=subprocess.PIPE,
339                universal_newlines=True,
340            )
341        except FileNotFoundError:
342            no_arv_copy = True
343        else:
344            no_arv_copy = result.returncode == 2
345
346        if no_arv_copy:
347            raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.')
348        elif result.returncode != 0:
349            raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps')
350
351        locations = json.loads(result.stdout)
352
353        if locations:
354            copy_collections(locations, src, dst, args)
355
356    # copy the workflow itself
357    del wf['uuid']
358    wf['owner_uuid'] = args.project_uuid
359
360    existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
361                                             ["name", "=", wf["name"]]]).execute()
362    if len(existing["items"]) == 0:
363        return dst.workflows().create(body=wf).execute(num_retries=args.retries)
364    else:
365        return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
def workflow_collections(obj, locations, docker_images):
368def workflow_collections(obj, locations, docker_images):
369    if isinstance(obj, dict):
370        loc = obj.get('location', None)
371        if loc is not None:
372            if loc.startswith("keep:"):
373                locations.append(loc[5:])
374
375        docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
376        if docker_image is not None:
377            ds = docker_image.split(":", 1)
378            tag = ds[1] if len(ds)==2 else 'latest'
379            docker_images[ds[0]] = tag
380
381        for x in obj:
382            workflow_collections(obj[x], locations, docker_images)
383    elif isinstance(obj, list):
384        for x in obj:
385            workflow_collections(x, locations, docker_images)
def copy_collections(obj, src, dst, args):
399def copy_collections(obj, src, dst, args):
400
401    def copy_collection_fn(collection_match):
402        """Helper function for regex substitution: copies a single collection,
403        identified by the collection_match MatchObject, to the
404        destination.  Returns the destination collection uuid (or the
405        portable data hash if that's what src_id is).
406
407        """
408        src_id = collection_match.group(0)
409        if src_id not in collections_copied:
410            dst_col = copy_collection(src_id, src, dst, args)
411            if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
412                collections_copied[src_id] = src_id
413            else:
414                collections_copied[src_id] = dst_col['uuid']
415        return collections_copied[src_id]
416
417    if isinstance(obj, basestring):
418        # Copy any collections identified in this string to dst, replacing
419        # them with the dst uuids as necessary.
420        obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
421        obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
422        return obj
423    elif isinstance(obj, dict):
424        return type(obj)((v, copy_collections(obj[v], src, dst, args))
425                         for v in obj)
426    elif isinstance(obj, list):
427        return type(obj)(copy_collections(v, src, dst, args) for v in obj)
428    return obj
def total_collection_size(manifest_text):
431def total_collection_size(manifest_text):
432    """Return the total number of bytes in this collection (excluding
433    duplicate blocks)."""
434
435    total_bytes = 0
436    locators_seen = {}
437    for line in manifest_text.splitlines():
438        words = line.split()
439        for word in words[1:]:
440            try:
441                loc = arvados.KeepLocator(word)
442            except ValueError:
443                continue  # this word isn't a locator, skip it
444            if loc.md5sum not in locators_seen:
445                locators_seen[loc.md5sum] = True
446                total_bytes += loc.size
447
448    return total_bytes

Return the total number of bytes in this collection (excluding duplicate blocks).

def create_collection_from(c, src, dst, args):
450def create_collection_from(c, src, dst, args):
451    """Create a new collection record on dst, and copy Docker metadata if
452    available."""
453
454    collection_uuid = c['uuid']
455    body = {}
456    for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'):
457        body[d] = c[d]
458
459    if not body["name"]:
460        body['name'] = "copied from " + collection_uuid
461
462    if args.storage_classes:
463        body['storage_classes_desired'] = args.storage_classes
464
465    body['owner_uuid'] = args.project_uuid
466
467    dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries)
468
469    # Create docker_image_repo+tag and docker_image_hash links
470    # at the destination.
471    for link_class in ("docker_image_repo+tag", "docker_image_hash"):
472        docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
473
474        for src_link in docker_links:
475            body = {key: src_link[key]
476                    for key in ['link_class', 'name', 'properties']}
477            body['head_uuid'] = dst_collection['uuid']
478            body['owner_uuid'] = args.project_uuid
479
480            lk = dst.links().create(body=body).execute(num_retries=args.retries)
481            logger.debug('created dst link {}'.format(lk))
482
483    return dst_collection

Create a new collection record on dst, and copy Docker metadata if available.

def copy_collection(obj_uuid, src, dst, args):
507def copy_collection(obj_uuid, src, dst, args):
508    if arvados.util.keep_locator_pattern.match(obj_uuid):
509        # If the obj_uuid is a portable data hash, it might not be
510        # uniquely identified with a particular collection.  As a
511        # result, it is ambiguous as to what name to use for the copy.
512        # Apply some heuristics to pick which collection to get the
513        # name from.
514        srccol = src.collections().list(
515            filters=[['portable_data_hash', '=', obj_uuid]],
516            order="created_at asc"
517            ).execute(num_retries=args.retries)
518
519        items = srccol.get("items")
520
521        if not items:
522            logger.warning("Could not find collection with portable data hash %s", obj_uuid)
523            return
524
525        c = None
526
527        if len(items) == 1:
528            # There's only one collection with the PDH, so use that.
529            c = items[0]
530        if not c:
531            # See if there is a collection that's in the same project
532            # as the root item (usually a workflow) being copied.
533            for i in items:
534                if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
535                    c = i
536                    break
537        if not c:
538            # Didn't find any collections located in the same project, so
539            # pick the oldest collection that has a name assigned to it.
540            for i in items:
541                if i.get("name"):
542                    c = i
543                    break
544        if not c:
545            # None of the collections have names (?!), so just pick the
546            # first one.
547            c = items[0]
548
549        # list() doesn't return manifest text (and we don't want it to,
550        # because we don't need the same maninfest text sent to us 50
551        # times) so go and retrieve the collection object directly
552        # which will include the manifest text.
553        c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
554    else:
555        # Assume this is an actual collection uuid, so fetch it directly.
556        c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
557
558    # If a collection with this hash already exists at the
559    # destination, and 'force' is not true, just return that
560    # collection.
561    if not args.force:
562        if 'portable_data_hash' in c:
563            colhash = c['portable_data_hash']
564        else:
565            colhash = c['uuid']
566        dstcol = dst.collections().list(
567            filters=[['portable_data_hash', '=', colhash]]
568        ).execute(num_retries=args.retries)
569        if dstcol['items_available'] > 0:
570            for d in dstcol['items']:
571                if ((args.project_uuid == d['owner_uuid']) and
572                    (c.get('name') == d['name']) and
573                    (c['portable_data_hash'] == d['portable_data_hash'])):
574                    return d
575            c['manifest_text'] = dst.collections().get(
576                uuid=dstcol['items'][0]['uuid']
577            ).execute(num_retries=args.retries)['manifest_text']
578            return create_collection_from(c, src, dst, args)
579
580    # Fetch the collection's manifest.
581    manifest = c['manifest_text']
582    logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
583
584    # Copy each block from src_keep to dst_keep.
585    # Use the newly signed locators returned from dst_keep to build
586    # a new manifest as we go.
587    src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
588    dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
589    dst_manifest = io.StringIO()
590    dst_locators = {}
591    bytes_written = 0
592    bytes_expected = total_collection_size(manifest)
593    if args.progress:
594        progress_writer = ProgressWriter(human_progress)
595    else:
596        progress_writer = None
597
598    # go through the words
599    # put each block loc into 'get' queue
600    # 'get' threads get block and put it into 'put' queue
601    # 'put' threads put block and then update dst_locators
602    #
603    # after going through the whole manifest we go back through it
604    # again and build dst_manifest
605
606    lock = threading.Lock()
607
608    # the get queue should be unbounded because we'll add all the
609    # block hashes we want to get, but these are small
610    get_queue = queue.Queue()
611
612    threadcount = 4
613
614    # the put queue contains full data blocks
615    # and if 'get' is faster than 'put' we could end up consuming
616    # a great deal of RAM if it isn't bounded.
617    put_queue = queue.Queue(threadcount)
618    transfer_error = []
619
620    def get_thread():
621        while True:
622            word = get_queue.get()
623            if word is None:
624                put_queue.put(None)
625                get_queue.task_done()
626                return
627
628            blockhash = arvados.KeepLocator(word).md5sum
629            with lock:
630                if blockhash in dst_locators:
631                    # Already uploaded
632                    get_queue.task_done()
633                    continue
634
635            try:
636                logger.debug("Getting block %s", word)
637                data = src_keep.get(word)
638                put_queue.put((word, data))
639            except e:
640                logger.error("Error getting block %s: %s", word, e)
641                transfer_error.append(e)
642                try:
643                    # Drain the 'get' queue so we end early
644                    while True:
645                        get_queue.get(False)
646                        get_queue.task_done()
647                except queue.Empty:
648                    pass
649            finally:
650                get_queue.task_done()
651
652    def put_thread():
653        nonlocal bytes_written
654        while True:
655            item = put_queue.get()
656            if item is None:
657                put_queue.task_done()
658                return
659
660            word, data = item
661            loc = arvados.KeepLocator(word)
662            blockhash = loc.md5sum
663            with lock:
664                if blockhash in dst_locators:
665                    # Already uploaded
666                    put_queue.task_done()
667                    continue
668
669            try:
670                logger.debug("Putting block %s (%s bytes)", blockhash, loc.size)
671                dst_locator = dst_keep.put(data, classes=(args.storage_classes or []))
672                with lock:
673                    dst_locators[blockhash] = dst_locator
674                    bytes_written += loc.size
675                    if progress_writer:
676                        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
677            except e:
678                logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e)
679                try:
680                    # Drain the 'get' queue so we end early
681                    while True:
682                        get_queue.get(False)
683                        get_queue.task_done()
684                except queue.Empty:
685                    pass
686                transfer_error.append(e)
687            finally:
688                put_queue.task_done()
689
690    for line in manifest.splitlines():
691        words = line.split()
692        for word in words[1:]:
693            try:
694                loc = arvados.KeepLocator(word)
695            except ValueError:
696                # If 'word' can't be parsed as a locator,
697                # presume it's a filename.
698                continue
699
700            get_queue.put(word)
701
702    for i in range(0, threadcount):
703        get_queue.put(None)
704
705    for i in range(0, threadcount):
706        threading.Thread(target=get_thread, daemon=True).start()
707
708    for i in range(0, threadcount):
709        threading.Thread(target=put_thread, daemon=True).start()
710
711    get_queue.join()
712    put_queue.join()
713
714    if len(transfer_error) > 0:
715        return {"error_token": "Failed to transfer blocks"}
716
717    for line in manifest.splitlines():
718        words = line.split()
719        dst_manifest.write(words[0])
720        for word in words[1:]:
721            try:
722                loc = arvados.KeepLocator(word)
723            except ValueError:
724                # If 'word' can't be parsed as a locator,
725                # presume it's a filename.
726                dst_manifest.write(' ')
727                dst_manifest.write(word)
728                continue
729            blockhash = loc.md5sum
730            dst_manifest.write(' ')
731            dst_manifest.write(dst_locators[blockhash])
732        dst_manifest.write("\n")
733
734    if progress_writer:
735        progress_writer.report(obj_uuid, bytes_written, bytes_expected)
736        progress_writer.finish()
737
738    # Copy the manifest and save the collection.
739    logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
740
741    c['manifest_text'] = dst_manifest.getvalue()
742    return create_collection_from(c, src, dst, args)
def select_git_url( api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
744def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
745    r = api.repositories().list(
746        filters=[['name', '=', repo_name]]).execute(num_retries=retries)
747    if r['items_available'] != 1:
748        raise Exception('cannot identify repo {}; {} repos found'
749                        .format(repo_name, r['items_available']))
750
751    https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
752    http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
753    other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
754
755    priority = https_url + other_url + http_url
756
757    for url in priority:
758        if url.startswith("http"):
759            u = urllib.parse.urlsplit(url)
760            baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
761            git_config = ["-c", "credential.%s/.username=none" % baseurl,
762                          "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
763        else:
764            git_config = []
765
766        try:
767            logger.debug("trying %s", url)
768            subprocess.run(
769                ['git', *git_config, 'ls-remote', url],
770                check=True,
771                env={
772                    'ARVADOS_API_TOKEN': api.api_token,
773                    'GIT_ASKPASS': '/bin/false',
774                    'HOME': os.environ['HOME'],
775                },
776                stdout=subprocess.DEVNULL,
777            )
778        except subprocess.CalledProcessError:
779            pass
780        else:
781            git_url = url
782            break
783    else:
784        raise Exception('Cannot access git repository, tried {}'
785                        .format(priority))
786
787    if git_url.startswith("http:"):
788        if allow_insecure_http:
789            logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
790        else:
791            raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
792
793    return (git_url, git_config)
def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
796def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
797    """Copy the docker image identified by docker_image and
798    docker_image_tag from src to dst. Create appropriate
799    docker_image_repo+tag and docker_image_hash links at dst.
800
801    """
802
803    logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
804
805    # Find the link identifying this docker image.
806    docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
807        src, args.retries, docker_image, docker_image_tag)
808    if docker_image_list:
809        image_uuid, image_info = docker_image_list[0]
810        logger.debug('copying collection {} {}'.format(image_uuid, image_info))
811
812        # Copy the collection it refers to.
813        dst_image_col = copy_collection(image_uuid, src, dst, args)
814    elif arvados.util.keep_locator_pattern.match(docker_image):
815        dst_image_col = copy_collection(docker_image, src, dst, args)
816    else:
817        logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))

Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.

def copy_project(obj_uuid, src, dst, owner_uuid, args):
819def copy_project(obj_uuid, src, dst, owner_uuid, args):
820
821    src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
822
823    # Create/update the destination project
824    existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
825                                          ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
826    if len(existing["items"]) == 0:
827        project_record = dst.groups().create(body={"group": {"group_class": "project",
828                                                             "owner_uuid": owner_uuid,
829                                                             "name": src_project_record["name"]}}).execute(num_retries=args.retries)
830    else:
831        project_record = existing["items"][0]
832
833    dst.groups().update(uuid=project_record["uuid"],
834                        body={"group": {
835                            "description": src_project_record["description"]}}).execute(num_retries=args.retries)
836
837    args.project_uuid = project_record["uuid"]
838
839    logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
840
841
842    partial_error = ""
843
844    # Copy collections
845    try:
846        copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
847                         src, dst, args)
848    except Exception as e:
849        partial_error += "\n" + str(e)
850
851    # Copy workflows
852    for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
853        try:
854            copy_workflow(w["uuid"], src, dst, args)
855        except Exception as e:
856            partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
857
858    if args.recursive:
859        for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
860            try:
861                copy_project(g["uuid"], src, dst, project_record["uuid"], args)
862            except Exception as e:
863                partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
864
865    project_record["partial_error"] = partial_error
866
867    return project_record
def git_rev_parse(rev, repo):
875def git_rev_parse(rev, repo):
876    proc = subprocess.run(
877        ['git', 'rev-parse', rev],
878        check=True,
879        cwd=repo,
880        stdout=subprocess.PIPE,
881        text=True,
882    )
883    return proc.stdout.read().strip()
def uuid_type(api, object_uuid):
895def uuid_type(api, object_uuid):
896    if re.match(arvados.util.keep_locator_pattern, object_uuid):
897        return 'Collection'
898
899    if object_uuid.startswith("http:") or object_uuid.startswith("https:"):
900        return 'httpURL'
901
902    p = object_uuid.split('-')
903    if len(p) == 3:
904        type_prefix = p[1]
905        for k in api._schema.schemas:
906            obj_class = api._schema.schemas[k].get('uuidPrefix', None)
907            if type_prefix == obj_class:
908                return k
909    return None
def copy_from_http(url, src, dst, args):
912def copy_from_http(url, src, dst, args):
913
914    project_uuid = args.project_uuid
915    varying_url_params = args.varying_url_params
916    prefer_cached_downloads = args.prefer_cached_downloads
917
918    cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {},
919                                                   varying_url_params=varying_url_params,
920                                                   prefer_cached_downloads=prefer_cached_downloads)
921    if cached[2] is not None:
922        return copy_collection(cached[2], src, dst, args)
923
924    cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url,
925                                               varying_url_params=varying_url_params,
926                                               prefer_cached_downloads=prefer_cached_downloads)
927
928    if cached is not None:
929        return {"uuid": cached[2]}
def abort(msg, code=1):
932def abort(msg, code=1):
933    logger.info("arv-copy: %s", msg)
934    exit(code)
def machine_progress(obj_uuid, bytes_written, bytes_expected):
943def machine_progress(obj_uuid, bytes_written, bytes_expected):
944    return "{} {}: {} {} written {} total\n".format(
945        sys.argv[0],
946        os.getpid(),
947        obj_uuid,
948        bytes_written,
949        -1 if (bytes_expected is None) else bytes_expected)
def human_progress(obj_uuid, bytes_written, bytes_expected):
951def human_progress(obj_uuid, bytes_written, bytes_expected):
952    if bytes_expected:
953        return "\r{}: {}M / {}M {:.1%} ".format(
954            obj_uuid,
955            bytes_written >> 20, bytes_expected >> 20,
956            float(bytes_written) / bytes_expected)
957    else:
958        return "\r{}: {} ".format(obj_uuid, bytes_written)
class ProgressWriter:
960class ProgressWriter(object):
961    _progress_func = None
962    outfile = sys.stderr
963
964    def __init__(self, progress_func):
965        self._progress_func = progress_func
966
967    def report(self, obj_uuid, bytes_written, bytes_expected):
968        if self._progress_func is not None:
969            self.outfile.write(
970                self._progress_func(obj_uuid, bytes_written, bytes_expected))
971
972    def finish(self):
973        self.outfile.write("\n")
ProgressWriter(progress_func)
964    def __init__(self, progress_func):
965        self._progress_func = progress_func
outfile = <_io.StringIO object>
def report(self, obj_uuid, bytes_written, bytes_expected):
967    def report(self, obj_uuid, bytes_written, bytes_expected):
968        if self._progress_func is not None:
969            self.outfile.write(
970                self._progress_func(obj_uuid, bytes_written, bytes_expected))
def finish(self):
972    def finish(self):
973        self.outfile.write("\n")