arvados.commands.arv_copy
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5# arv-copy [--recursive] [--no-recursive] object-uuid 6# 7# Copies an object from Arvados instance src to instance dst. 8# 9# By default, arv-copy recursively copies any dependent objects 10# necessary to make the object functional in the new instance 11# (e.g. for a workflow, arv-copy copies the workflow, 12# input collections, and docker images). If 13# --no-recursive is given, arv-copy copies only the single record 14# identified by object-uuid. 15# 16# The user must have files $HOME/.config/arvados/{src}.conf and 17# $HOME/.config/arvados/{dst}.conf with valid login credentials for 18# instances src and dst. If either of these files is not found, 19# arv-copy will issue an error. 20 21from __future__ import division 22from future import standard_library 23from future.utils import listvalues 24standard_library.install_aliases() 25from past.builtins import basestring 26from builtins import object 27import argparse 28import contextlib 29import getpass 30import os 31import re 32import shutil 33import subprocess 34import sys 35import logging 36import tempfile 37import urllib.parse 38import io 39import json 40import queue 41import threading 42 43import arvados 44import arvados.config 45import arvados.keep 46import arvados.util 47import arvados.commands._util as arv_cmd 48import arvados.commands.keepdocker 49import arvados.http_to_keep 50import ruamel.yaml as yaml 51 52from arvados._version import __version__ 53 54COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$') 55 56logger = logging.getLogger('arvados.arv-copy') 57 58# local_repo_dir records which git repositories from the Arvados source 59# instance have been checked out locally during this run, and to which 60# directories. 61# e.g. if repository 'twp' from src_arv has been cloned into 62# /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A' 63# 64local_repo_dir = {} 65 66# List of collections that have been copied in this session, and their 67# destination collection UUIDs. 68collections_copied = {} 69 70# Set of (repository, script_version) two-tuples of commits copied in git. 71scripts_copied = set() 72 73# The owner_uuid of the object being copied 74src_owner_uuid = None 75 76def main(): 77 copy_opts = argparse.ArgumentParser(add_help=False) 78 79 copy_opts.add_argument( 80 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 81 help='Print version and exit.') 82 copy_opts.add_argument( 83 '-v', '--verbose', dest='verbose', action='store_true', 84 help='Verbose output.') 85 copy_opts.add_argument( 86 '--progress', dest='progress', action='store_true', 87 help='Report progress on copying collections. (default)') 88 copy_opts.add_argument( 89 '--no-progress', dest='progress', action='store_false', 90 help='Do not report progress on copying collections.') 91 copy_opts.add_argument( 92 '-f', '--force', dest='force', action='store_true', 93 help='Perform copy even if the object appears to exist at the remote destination.') 94 copy_opts.add_argument( 95 '--src', dest='source_arvados', 96 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.') 97 copy_opts.add_argument( 98 '--dst', dest='destination_arvados', 99 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.') 100 copy_opts.add_argument( 101 '--recursive', dest='recursive', action='store_true', 102 help='Recursively copy any dependencies for this object, and subprojects. (default)') 103 copy_opts.add_argument( 104 '--no-recursive', dest='recursive', action='store_false', 105 help='Do not copy any dependencies or subprojects.') 106 copy_opts.add_argument( 107 '--project-uuid', dest='project_uuid', 108 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 109 copy_opts.add_argument( 110 '--storage-classes', dest='storage_classes', 111 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 112 copy_opts.add_argument("--varying-url-params", type=str, default="", 113 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 114 115 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 116 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 117 118 copy_opts.add_argument( 119 'object_uuid', 120 help='The UUID of the object to be copied.') 121 copy_opts.set_defaults(progress=True) 122 copy_opts.set_defaults(recursive=True) 123 124 parser = argparse.ArgumentParser( 125 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 126 parents=[copy_opts, arv_cmd.retry_opt]) 127 args = parser.parse_args() 128 129 if args.storage_classes: 130 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x] 131 132 if args.verbose: 133 logger.setLevel(logging.DEBUG) 134 else: 135 logger.setLevel(logging.INFO) 136 137 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 138 args.source_arvados = args.object_uuid[:5] 139 140 # Create API clients for the source and destination instances 141 src_arv = api_for_instance(args.source_arvados, args.retries) 142 dst_arv = api_for_instance(args.destination_arvados, args.retries) 143 144 if not args.project_uuid: 145 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 146 147 # Identify the kind of object we have been given, and begin copying. 148 t = uuid_type(src_arv, args.object_uuid) 149 150 try: 151 if t == 'Collection': 152 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 153 result = copy_collection(args.object_uuid, 154 src_arv, dst_arv, 155 args) 156 elif t == 'Workflow': 157 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 158 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 159 elif t == 'Group': 160 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 161 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 162 elif t == 'httpURL': 163 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args) 164 else: 165 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 166 except Exception as e: 167 logger.error("%s", e, exc_info=args.verbose) 168 exit(1) 169 170 # Clean up any outstanding temp git repositories. 171 for d in listvalues(local_repo_dir): 172 shutil.rmtree(d, ignore_errors=True) 173 174 if not result: 175 exit(1) 176 177 # If no exception was thrown and the response does not have an 178 # error_token field, presume success 179 if result is None or 'error_token' in result or 'uuid' not in result: 180 if result: 181 logger.error("API server returned an error result: {}".format(result)) 182 exit(1) 183 184 print(result['uuid']) 185 186 if result.get('partial_error'): 187 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 188 exit(1) 189 190 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 191 exit(0) 192 193def set_src_owner_uuid(resource, uuid, args): 194 global src_owner_uuid 195 c = resource.get(uuid=uuid).execute(num_retries=args.retries) 196 src_owner_uuid = c.get("owner_uuid") 197 198# api_for_instance(instance_name) 199# 200# Creates an API client for the Arvados instance identified by 201# instance_name. 202# 203# If instance_name contains a slash, it is presumed to be a path 204# (either local or absolute) to a file with Arvados configuration 205# settings. 206# 207# Otherwise, it is presumed to be the name of a file in 208# $HOME/.config/arvados/instance_name.conf 209# 210def api_for_instance(instance_name, num_retries): 211 if not instance_name: 212 # Use environment 213 return arvados.api('v1') 214 215 if '/' in instance_name: 216 config_file = instance_name 217 else: 218 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name)) 219 220 try: 221 cfg = arvados.config.load(config_file) 222 except (IOError, OSError) as e: 223 abort(("Could not open config file {}: {}\n" + 224 "You must make sure that your configuration tokens\n" + 225 "for Arvados instance {} are in {} and that this\n" + 226 "file is readable.").format( 227 config_file, e, instance_name, config_file)) 228 229 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 230 api_is_insecure = ( 231 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 232 ['1', 't', 'true', 'y', 'yes'])) 233 client = arvados.api('v1', 234 host=cfg['ARVADOS_API_HOST'], 235 token=cfg['ARVADOS_API_TOKEN'], 236 insecure=api_is_insecure, 237 num_retries=num_retries, 238 ) 239 else: 240 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name)) 241 return client 242 243# Check if git is available 244def check_git_availability(): 245 try: 246 subprocess.run( 247 ['git', '--version'], 248 check=True, 249 stdout=subprocess.DEVNULL, 250 ) 251 except FileNotFoundError: 252 abort('git command is not available. Please ensure git is installed.') 253 254 255def filter_iter(arg): 256 """Iterate a filter string-or-list. 257 258 Pass in a filter field that can either be a string or list. 259 This will iterate elements as if the field had been written as a list. 260 """ 261 if isinstance(arg, basestring): 262 return iter((arg,)) 263 else: 264 return iter(arg) 265 266def migrate_repository_filter(repo_filter, src_repository, dst_repository): 267 """Update a single repository filter in-place for the destination. 268 269 If the filter checks that the repository is src_repository, it is 270 updated to check that the repository is dst_repository. If it does 271 anything else, this function raises ValueError. 272 """ 273 if src_repository is None: 274 raise ValueError("component does not specify a source repository") 275 elif dst_repository is None: 276 raise ValueError("no destination repository specified to update repository filter") 277 elif repo_filter[1:] == ['=', src_repository]: 278 repo_filter[2] = dst_repository 279 elif repo_filter[1:] == ['in', [src_repository]]: 280 repo_filter[2] = [dst_repository] 281 else: 282 raise ValueError("repository filter is not a simple source match") 283 284def migrate_script_version_filter(version_filter): 285 """Update a single script_version filter in-place for the destination. 286 287 Currently this function checks that all the filter operands are Git 288 commit hashes. If they're not, it raises ValueError to indicate that 289 the filter is not portable. It could be extended to make other 290 transformations in the future. 291 """ 292 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 293 raise ValueError("script_version filter is not limited to commit hashes") 294 295def attr_filtered(filter_, *attr_names): 296 """Return True if filter_ applies to any of attr_names, else False.""" 297 return any((name == 'any') or (name in attr_names) 298 for name in filter_iter(filter_[0])) 299 300@contextlib.contextmanager 301def exception_handler(handler, *exc_types): 302 """If any exc_types are raised in the block, call handler on the exception.""" 303 try: 304 yield 305 except exc_types as error: 306 handler(error) 307 308 309# copy_workflow(wf_uuid, src, dst, args) 310# 311# Copies a workflow identified by wf_uuid from src to dst. 312# 313# If args.recursive is True, also copy any collections 314# referenced in the workflow definition yaml. 315# 316# The owner_uuid of the new workflow is set to any given 317# project_uuid or the user who copied the template. 318# 319# Returns the copied workflow object. 320# 321def copy_workflow(wf_uuid, src, dst, args): 322 # fetch the workflow from the source instance 323 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 324 325 if not wf["definition"]: 326 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 327 328 # copy collections and docker images 329 if args.recursive and wf["definition"]: 330 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 331 "ARVADOS_API_TOKEN": src.api_token, 332 "PATH": os.environ["PATH"]} 333 try: 334 result = subprocess.run( 335 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 336 env=env, 337 stdout=subprocess.PIPE, 338 universal_newlines=True, 339 ) 340 except FileNotFoundError: 341 no_arv_copy = True 342 else: 343 no_arv_copy = result.returncode == 2 344 345 if no_arv_copy: 346 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 347 elif result.returncode != 0: 348 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 349 350 locations = json.loads(result.stdout) 351 352 if locations: 353 copy_collections(locations, src, dst, args) 354 355 # copy the workflow itself 356 del wf['uuid'] 357 wf['owner_uuid'] = args.project_uuid 358 359 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 360 ["name", "=", wf["name"]]]).execute() 361 if len(existing["items"]) == 0: 362 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 363 else: 364 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries) 365 366 367def workflow_collections(obj, locations, docker_images): 368 if isinstance(obj, dict): 369 loc = obj.get('location', None) 370 if loc is not None: 371 if loc.startswith("keep:"): 372 locations.append(loc[5:]) 373 374 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 375 if docker_image is not None: 376 ds = docker_image.split(":", 1) 377 tag = ds[1] if len(ds)==2 else 'latest' 378 docker_images[ds[0]] = tag 379 380 for x in obj: 381 workflow_collections(obj[x], locations, docker_images) 382 elif isinstance(obj, list): 383 for x in obj: 384 workflow_collections(x, locations, docker_images) 385 386# copy_collections(obj, src, dst, args) 387# 388# Recursively copies all collections referenced by 'obj' from src 389# to dst. obj may be a dict or a list, in which case we run 390# copy_collections on every value it contains. If it is a string, 391# search it for any substring that matches a collection hash or uuid 392# (this will find hidden references to collections like 393# "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)") 394# 395# Returns a copy of obj with any old collection uuids replaced by 396# the new ones. 397# 398def copy_collections(obj, src, dst, args): 399 400 def copy_collection_fn(collection_match): 401 """Helper function for regex substitution: copies a single collection, 402 identified by the collection_match MatchObject, to the 403 destination. Returns the destination collection uuid (or the 404 portable data hash if that's what src_id is). 405 406 """ 407 src_id = collection_match.group(0) 408 if src_id not in collections_copied: 409 dst_col = copy_collection(src_id, src, dst, args) 410 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 411 collections_copied[src_id] = src_id 412 else: 413 collections_copied[src_id] = dst_col['uuid'] 414 return collections_copied[src_id] 415 416 if isinstance(obj, basestring): 417 # Copy any collections identified in this string to dst, replacing 418 # them with the dst uuids as necessary. 419 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 420 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 421 return obj 422 elif isinstance(obj, dict): 423 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 424 for v in obj) 425 elif isinstance(obj, list): 426 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 427 return obj 428 429 430def total_collection_size(manifest_text): 431 """Return the total number of bytes in this collection (excluding 432 duplicate blocks).""" 433 434 total_bytes = 0 435 locators_seen = {} 436 for line in manifest_text.splitlines(): 437 words = line.split() 438 for word in words[1:]: 439 try: 440 loc = arvados.KeepLocator(word) 441 except ValueError: 442 continue # this word isn't a locator, skip it 443 if loc.md5sum not in locators_seen: 444 locators_seen[loc.md5sum] = True 445 total_bytes += loc.size 446 447 return total_bytes 448 449def create_collection_from(c, src, dst, args): 450 """Create a new collection record on dst, and copy Docker metadata if 451 available.""" 452 453 collection_uuid = c['uuid'] 454 body = {} 455 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'): 456 body[d] = c[d] 457 458 if not body["name"]: 459 body['name'] = "copied from " + collection_uuid 460 461 if args.storage_classes: 462 body['storage_classes_desired'] = args.storage_classes 463 464 body['owner_uuid'] = args.project_uuid 465 466 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 467 468 # Create docker_image_repo+tag and docker_image_hash links 469 # at the destination. 470 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 471 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 472 473 for src_link in docker_links: 474 body = {key: src_link[key] 475 for key in ['link_class', 'name', 'properties']} 476 body['head_uuid'] = dst_collection['uuid'] 477 body['owner_uuid'] = args.project_uuid 478 479 lk = dst.links().create(body=body).execute(num_retries=args.retries) 480 logger.debug('created dst link {}'.format(lk)) 481 482 return dst_collection 483 484# copy_collection(obj_uuid, src, dst, args) 485# 486# Copies the collection identified by obj_uuid from src to dst. 487# Returns the collection object created at dst. 488# 489# If args.progress is True, produce a human-friendly progress 490# report. 491# 492# If a collection with the desired portable_data_hash already 493# exists at dst, and args.force is False, copy_collection returns 494# the existing collection without copying any blocks. Otherwise 495# (if no collection exists or if args.force is True) 496# copy_collection copies all of the collection data blocks from src 497# to dst. 498# 499# For this application, it is critical to preserve the 500# collection's manifest hash, which is not guaranteed with the 501# arvados.CollectionReader and arvados.CollectionWriter classes. 502# Copying each block in the collection manually, followed by 503# the manifest block, ensures that the collection's manifest 504# hash will not change. 505# 506def copy_collection(obj_uuid, src, dst, args): 507 if arvados.util.keep_locator_pattern.match(obj_uuid): 508 # If the obj_uuid is a portable data hash, it might not be 509 # uniquely identified with a particular collection. As a 510 # result, it is ambiguous as to what name to use for the copy. 511 # Apply some heuristics to pick which collection to get the 512 # name from. 513 srccol = src.collections().list( 514 filters=[['portable_data_hash', '=', obj_uuid]], 515 order="created_at asc" 516 ).execute(num_retries=args.retries) 517 518 items = srccol.get("items") 519 520 if not items: 521 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 522 return 523 524 c = None 525 526 if len(items) == 1: 527 # There's only one collection with the PDH, so use that. 528 c = items[0] 529 if not c: 530 # See if there is a collection that's in the same project 531 # as the root item (usually a workflow) being copied. 532 for i in items: 533 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 534 c = i 535 break 536 if not c: 537 # Didn't find any collections located in the same project, so 538 # pick the oldest collection that has a name assigned to it. 539 for i in items: 540 if i.get("name"): 541 c = i 542 break 543 if not c: 544 # None of the collections have names (?!), so just pick the 545 # first one. 546 c = items[0] 547 548 # list() doesn't return manifest text (and we don't want it to, 549 # because we don't need the same maninfest text sent to us 50 550 # times) so go and retrieve the collection object directly 551 # which will include the manifest text. 552 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 553 else: 554 # Assume this is an actual collection uuid, so fetch it directly. 555 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 556 557 # If a collection with this hash already exists at the 558 # destination, and 'force' is not true, just return that 559 # collection. 560 if not args.force: 561 if 'portable_data_hash' in c: 562 colhash = c['portable_data_hash'] 563 else: 564 colhash = c['uuid'] 565 dstcol = dst.collections().list( 566 filters=[['portable_data_hash', '=', colhash]] 567 ).execute(num_retries=args.retries) 568 if dstcol['items_available'] > 0: 569 for d in dstcol['items']: 570 if ((args.project_uuid == d['owner_uuid']) and 571 (c.get('name') == d['name']) and 572 (c['portable_data_hash'] == d['portable_data_hash'])): 573 return d 574 c['manifest_text'] = dst.collections().get( 575 uuid=dstcol['items'][0]['uuid'] 576 ).execute(num_retries=args.retries)['manifest_text'] 577 return create_collection_from(c, src, dst, args) 578 579 # Fetch the collection's manifest. 580 manifest = c['manifest_text'] 581 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 582 583 # Copy each block from src_keep to dst_keep. 584 # Use the newly signed locators returned from dst_keep to build 585 # a new manifest as we go. 586 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 587 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 588 dst_manifest = io.StringIO() 589 dst_locators = {} 590 bytes_written = 0 591 bytes_expected = total_collection_size(manifest) 592 if args.progress: 593 progress_writer = ProgressWriter(human_progress) 594 else: 595 progress_writer = None 596 597 # go through the words 598 # put each block loc into 'get' queue 599 # 'get' threads get block and put it into 'put' queue 600 # 'put' threads put block and then update dst_locators 601 # 602 # after going through the whole manifest we go back through it 603 # again and build dst_manifest 604 605 lock = threading.Lock() 606 607 # the get queue should be unbounded because we'll add all the 608 # block hashes we want to get, but these are small 609 get_queue = queue.Queue() 610 611 threadcount = 4 612 613 # the put queue contains full data blocks 614 # and if 'get' is faster than 'put' we could end up consuming 615 # a great deal of RAM if it isn't bounded. 616 put_queue = queue.Queue(threadcount) 617 transfer_error = [] 618 619 def get_thread(): 620 while True: 621 word = get_queue.get() 622 if word is None: 623 put_queue.put(None) 624 get_queue.task_done() 625 return 626 627 blockhash = arvados.KeepLocator(word).md5sum 628 with lock: 629 if blockhash in dst_locators: 630 # Already uploaded 631 get_queue.task_done() 632 continue 633 634 try: 635 logger.debug("Getting block %s", word) 636 data = src_keep.get(word) 637 put_queue.put((word, data)) 638 except e: 639 logger.error("Error getting block %s: %s", word, e) 640 transfer_error.append(e) 641 try: 642 # Drain the 'get' queue so we end early 643 while True: 644 get_queue.get(False) 645 get_queue.task_done() 646 except queue.Empty: 647 pass 648 finally: 649 get_queue.task_done() 650 651 def put_thread(): 652 nonlocal bytes_written 653 while True: 654 item = put_queue.get() 655 if item is None: 656 put_queue.task_done() 657 return 658 659 word, data = item 660 loc = arvados.KeepLocator(word) 661 blockhash = loc.md5sum 662 with lock: 663 if blockhash in dst_locators: 664 # Already uploaded 665 put_queue.task_done() 666 continue 667 668 try: 669 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 670 dst_locator = dst_keep.put(data, classes=(args.storage_classes or [])) 671 with lock: 672 dst_locators[blockhash] = dst_locator 673 bytes_written += loc.size 674 if progress_writer: 675 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 676 except e: 677 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 678 try: 679 # Drain the 'get' queue so we end early 680 while True: 681 get_queue.get(False) 682 get_queue.task_done() 683 except queue.Empty: 684 pass 685 transfer_error.append(e) 686 finally: 687 put_queue.task_done() 688 689 for line in manifest.splitlines(): 690 words = line.split() 691 for word in words[1:]: 692 try: 693 loc = arvados.KeepLocator(word) 694 except ValueError: 695 # If 'word' can't be parsed as a locator, 696 # presume it's a filename. 697 continue 698 699 get_queue.put(word) 700 701 for i in range(0, threadcount): 702 get_queue.put(None) 703 704 for i in range(0, threadcount): 705 threading.Thread(target=get_thread, daemon=True).start() 706 707 for i in range(0, threadcount): 708 threading.Thread(target=put_thread, daemon=True).start() 709 710 get_queue.join() 711 put_queue.join() 712 713 if len(transfer_error) > 0: 714 return {"error_token": "Failed to transfer blocks"} 715 716 for line in manifest.splitlines(): 717 words = line.split() 718 dst_manifest.write(words[0]) 719 for word in words[1:]: 720 try: 721 loc = arvados.KeepLocator(word) 722 except ValueError: 723 # If 'word' can't be parsed as a locator, 724 # presume it's a filename. 725 dst_manifest.write(' ') 726 dst_manifest.write(word) 727 continue 728 blockhash = loc.md5sum 729 dst_manifest.write(' ') 730 dst_manifest.write(dst_locators[blockhash]) 731 dst_manifest.write("\n") 732 733 if progress_writer: 734 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 735 progress_writer.finish() 736 737 # Copy the manifest and save the collection. 738 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 739 740 c['manifest_text'] = dst_manifest.getvalue() 741 return create_collection_from(c, src, dst, args) 742 743def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt): 744 r = api.repositories().list( 745 filters=[['name', '=', repo_name]]).execute(num_retries=retries) 746 if r['items_available'] != 1: 747 raise Exception('cannot identify repo {}; {} repos found' 748 .format(repo_name, r['items_available'])) 749 750 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")] 751 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")] 752 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")] 753 754 priority = https_url + other_url + http_url 755 756 for url in priority: 757 if url.startswith("http"): 758 u = urllib.parse.urlsplit(url) 759 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", "")) 760 git_config = ["-c", "credential.%s/.username=none" % baseurl, 761 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl] 762 else: 763 git_config = [] 764 765 try: 766 logger.debug("trying %s", url) 767 subprocess.run( 768 ['git', *git_config, 'ls-remote', url], 769 check=True, 770 env={ 771 'ARVADOS_API_TOKEN': api.api_token, 772 'GIT_ASKPASS': '/bin/false', 773 'HOME': os.environ['HOME'], 774 }, 775 stdout=subprocess.DEVNULL, 776 ) 777 except subprocess.CalledProcessError: 778 pass 779 else: 780 git_url = url 781 break 782 else: 783 raise Exception('Cannot access git repository, tried {}' 784 .format(priority)) 785 786 if git_url.startswith("http:"): 787 if allow_insecure_http: 788 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt) 789 else: 790 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt)) 791 792 return (git_url, git_config) 793 794 795def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 796 """Copy the docker image identified by docker_image and 797 docker_image_tag from src to dst. Create appropriate 798 docker_image_repo+tag and docker_image_hash links at dst. 799 800 """ 801 802 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 803 804 # Find the link identifying this docker image. 805 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 806 src, args.retries, docker_image, docker_image_tag) 807 if docker_image_list: 808 image_uuid, image_info = docker_image_list[0] 809 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 810 811 # Copy the collection it refers to. 812 dst_image_col = copy_collection(image_uuid, src, dst, args) 813 elif arvados.util.keep_locator_pattern.match(docker_image): 814 dst_image_col = copy_collection(docker_image, src, dst, args) 815 else: 816 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag)) 817 818def copy_project(obj_uuid, src, dst, owner_uuid, args): 819 820 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 821 822 # Create/update the destination project 823 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 824 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 825 if len(existing["items"]) == 0: 826 project_record = dst.groups().create(body={"group": {"group_class": "project", 827 "owner_uuid": owner_uuid, 828 "name": src_project_record["name"]}}).execute(num_retries=args.retries) 829 else: 830 project_record = existing["items"][0] 831 832 dst.groups().update(uuid=project_record["uuid"], 833 body={"group": { 834 "description": src_project_record["description"]}}).execute(num_retries=args.retries) 835 836 args.project_uuid = project_record["uuid"] 837 838 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 839 840 841 partial_error = "" 842 843 # Copy collections 844 try: 845 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 846 src, dst, args) 847 except Exception as e: 848 partial_error += "\n" + str(e) 849 850 # Copy workflows 851 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 852 try: 853 copy_workflow(w["uuid"], src, dst, args) 854 except Exception as e: 855 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 856 857 if args.recursive: 858 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 859 try: 860 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 861 except Exception as e: 862 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 863 864 project_record["partial_error"] = partial_error 865 866 return project_record 867 868# git_rev_parse(rev, repo) 869# 870# Returns the 40-character commit hash corresponding to 'rev' in 871# git repository 'repo' (which must be the path of a local git 872# repository) 873# 874def git_rev_parse(rev, repo): 875 proc = subprocess.run( 876 ['git', 'rev-parse', rev], 877 check=True, 878 cwd=repo, 879 stdout=subprocess.PIPE, 880 text=True, 881 ) 882 return proc.stdout.read().strip() 883 884# uuid_type(api, object_uuid) 885# 886# Returns the name of the class that object_uuid belongs to, based on 887# the second field of the uuid. This function consults the api's 888# schema to identify the object class. 889# 890# It returns a string such as 'Collection', 'Workflow', etc. 891# 892# Special case: if handed a Keep locator hash, return 'Collection'. 893# 894def uuid_type(api, object_uuid): 895 if re.match(arvados.util.keep_locator_pattern, object_uuid): 896 return 'Collection' 897 898 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 899 return 'httpURL' 900 901 p = object_uuid.split('-') 902 if len(p) == 3: 903 type_prefix = p[1] 904 for k in api._schema.schemas: 905 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 906 if type_prefix == obj_class: 907 return k 908 return None 909 910 911def copy_from_http(url, src, dst, args): 912 913 project_uuid = args.project_uuid 914 varying_url_params = args.varying_url_params 915 prefer_cached_downloads = args.prefer_cached_downloads 916 917 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {}, 918 varying_url_params=varying_url_params, 919 prefer_cached_downloads=prefer_cached_downloads) 920 if cached[2] is not None: 921 return copy_collection(cached[2], src, dst, args) 922 923 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url, 924 varying_url_params=varying_url_params, 925 prefer_cached_downloads=prefer_cached_downloads) 926 927 if cached is not None: 928 return {"uuid": cached[2]} 929 930 931def abort(msg, code=1): 932 logger.info("arv-copy: %s", msg) 933 exit(code) 934 935 936# Code for reporting on the progress of a collection upload. 937# Stolen from arvados.commands.put.ArvPutCollectionWriter 938# TODO(twp): figure out how to refactor into a shared library 939# (may involve refactoring some arvados.commands.arv_copy.copy_collection 940# code) 941 942def machine_progress(obj_uuid, bytes_written, bytes_expected): 943 return "{} {}: {} {} written {} total\n".format( 944 sys.argv[0], 945 os.getpid(), 946 obj_uuid, 947 bytes_written, 948 -1 if (bytes_expected is None) else bytes_expected) 949 950def human_progress(obj_uuid, bytes_written, bytes_expected): 951 if bytes_expected: 952 return "\r{}: {}M / {}M {:.1%} ".format( 953 obj_uuid, 954 bytes_written >> 20, bytes_expected >> 20, 955 float(bytes_written) / bytes_expected) 956 else: 957 return "\r{}: {} ".format(obj_uuid, bytes_written) 958 959class ProgressWriter(object): 960 _progress_func = None 961 outfile = sys.stderr 962 963 def __init__(self, progress_func): 964 self._progress_func = progress_func 965 966 def report(self, obj_uuid, bytes_written, bytes_expected): 967 if self._progress_func is not None: 968 self.outfile.write( 969 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 970 971 def finish(self): 972 self.outfile.write("\n") 973 974if __name__ == '__main__': 975 main()
77def main(): 78 copy_opts = argparse.ArgumentParser(add_help=False) 79 80 copy_opts.add_argument( 81 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 82 help='Print version and exit.') 83 copy_opts.add_argument( 84 '-v', '--verbose', dest='verbose', action='store_true', 85 help='Verbose output.') 86 copy_opts.add_argument( 87 '--progress', dest='progress', action='store_true', 88 help='Report progress on copying collections. (default)') 89 copy_opts.add_argument( 90 '--no-progress', dest='progress', action='store_false', 91 help='Do not report progress on copying collections.') 92 copy_opts.add_argument( 93 '-f', '--force', dest='force', action='store_true', 94 help='Perform copy even if the object appears to exist at the remote destination.') 95 copy_opts.add_argument( 96 '--src', dest='source_arvados', 97 help='The cluster id of the source Arvados instance. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will be inferred from the UUID of the object being copied.') 98 copy_opts.add_argument( 99 '--dst', dest='destination_arvados', 100 help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf. If not provided, will use ARVADOS_API_HOST from environment.') 101 copy_opts.add_argument( 102 '--recursive', dest='recursive', action='store_true', 103 help='Recursively copy any dependencies for this object, and subprojects. (default)') 104 copy_opts.add_argument( 105 '--no-recursive', dest='recursive', action='store_false', 106 help='Do not copy any dependencies or subprojects.') 107 copy_opts.add_argument( 108 '--project-uuid', dest='project_uuid', 109 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 110 copy_opts.add_argument( 111 '--storage-classes', dest='storage_classes', 112 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 113 copy_opts.add_argument("--varying-url-params", type=str, default="", 114 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 115 116 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 117 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 118 119 copy_opts.add_argument( 120 'object_uuid', 121 help='The UUID of the object to be copied.') 122 copy_opts.set_defaults(progress=True) 123 copy_opts.set_defaults(recursive=True) 124 125 parser = argparse.ArgumentParser( 126 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 127 parents=[copy_opts, arv_cmd.retry_opt]) 128 args = parser.parse_args() 129 130 if args.storage_classes: 131 args.storage_classes = [x for x in args.storage_classes.strip().replace(' ', '').split(',') if x] 132 133 if args.verbose: 134 logger.setLevel(logging.DEBUG) 135 else: 136 logger.setLevel(logging.INFO) 137 138 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 139 args.source_arvados = args.object_uuid[:5] 140 141 # Create API clients for the source and destination instances 142 src_arv = api_for_instance(args.source_arvados, args.retries) 143 dst_arv = api_for_instance(args.destination_arvados, args.retries) 144 145 if not args.project_uuid: 146 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 147 148 # Identify the kind of object we have been given, and begin copying. 149 t = uuid_type(src_arv, args.object_uuid) 150 151 try: 152 if t == 'Collection': 153 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 154 result = copy_collection(args.object_uuid, 155 src_arv, dst_arv, 156 args) 157 elif t == 'Workflow': 158 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 159 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 160 elif t == 'Group': 161 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 162 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 163 elif t == 'httpURL': 164 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args) 165 else: 166 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 167 except Exception as e: 168 logger.error("%s", e, exc_info=args.verbose) 169 exit(1) 170 171 # Clean up any outstanding temp git repositories. 172 for d in listvalues(local_repo_dir): 173 shutil.rmtree(d, ignore_errors=True) 174 175 if not result: 176 exit(1) 177 178 # If no exception was thrown and the response does not have an 179 # error_token field, presume success 180 if result is None or 'error_token' in result or 'uuid' not in result: 181 if result: 182 logger.error("API server returned an error result: {}".format(result)) 183 exit(1) 184 185 print(result['uuid']) 186 187 if result.get('partial_error'): 188 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 189 exit(1) 190 191 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 192 exit(0)
211def api_for_instance(instance_name, num_retries): 212 if not instance_name: 213 # Use environment 214 return arvados.api('v1') 215 216 if '/' in instance_name: 217 config_file = instance_name 218 else: 219 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name)) 220 221 try: 222 cfg = arvados.config.load(config_file) 223 except (IOError, OSError) as e: 224 abort(("Could not open config file {}: {}\n" + 225 "You must make sure that your configuration tokens\n" + 226 "for Arvados instance {} are in {} and that this\n" + 227 "file is readable.").format( 228 config_file, e, instance_name, config_file)) 229 230 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 231 api_is_insecure = ( 232 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 233 ['1', 't', 'true', 'y', 'yes'])) 234 client = arvados.api('v1', 235 host=cfg['ARVADOS_API_HOST'], 236 token=cfg['ARVADOS_API_TOKEN'], 237 insecure=api_is_insecure, 238 num_retries=num_retries, 239 ) 240 else: 241 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name)) 242 return client
256def filter_iter(arg): 257 """Iterate a filter string-or-list. 258 259 Pass in a filter field that can either be a string or list. 260 This will iterate elements as if the field had been written as a list. 261 """ 262 if isinstance(arg, basestring): 263 return iter((arg,)) 264 else: 265 return iter(arg)
Iterate a filter string-or-list.
Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.
267def migrate_repository_filter(repo_filter, src_repository, dst_repository): 268 """Update a single repository filter in-place for the destination. 269 270 If the filter checks that the repository is src_repository, it is 271 updated to check that the repository is dst_repository. If it does 272 anything else, this function raises ValueError. 273 """ 274 if src_repository is None: 275 raise ValueError("component does not specify a source repository") 276 elif dst_repository is None: 277 raise ValueError("no destination repository specified to update repository filter") 278 elif repo_filter[1:] == ['=', src_repository]: 279 repo_filter[2] = dst_repository 280 elif repo_filter[1:] == ['in', [src_repository]]: 281 repo_filter[2] = [dst_repository] 282 else: 283 raise ValueError("repository filter is not a simple source match")
Update a single repository filter in-place for the destination.
If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.
285def migrate_script_version_filter(version_filter): 286 """Update a single script_version filter in-place for the destination. 287 288 Currently this function checks that all the filter operands are Git 289 commit hashes. If they're not, it raises ValueError to indicate that 290 the filter is not portable. It could be extended to make other 291 transformations in the future. 292 """ 293 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 294 raise ValueError("script_version filter is not limited to commit hashes")
Update a single script_version filter in-place for the destination.
Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.
296def attr_filtered(filter_, *attr_names): 297 """Return True if filter_ applies to any of attr_names, else False.""" 298 return any((name == 'any') or (name in attr_names) 299 for name in filter_iter(filter_[0]))
Return True if filter_ applies to any of attr_names, else False.
301@contextlib.contextmanager 302def exception_handler(handler, *exc_types): 303 """If any exc_types are raised in the block, call handler on the exception.""" 304 try: 305 yield 306 except exc_types as error: 307 handler(error)
If any exc_types are raised in the block, call handler on the exception.
322def copy_workflow(wf_uuid, src, dst, args): 323 # fetch the workflow from the source instance 324 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 325 326 if not wf["definition"]: 327 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 328 329 # copy collections and docker images 330 if args.recursive and wf["definition"]: 331 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 332 "ARVADOS_API_TOKEN": src.api_token, 333 "PATH": os.environ["PATH"]} 334 try: 335 result = subprocess.run( 336 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 337 env=env, 338 stdout=subprocess.PIPE, 339 universal_newlines=True, 340 ) 341 except FileNotFoundError: 342 no_arv_copy = True 343 else: 344 no_arv_copy = result.returncode == 2 345 346 if no_arv_copy: 347 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 348 elif result.returncode != 0: 349 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 350 351 locations = json.loads(result.stdout) 352 353 if locations: 354 copy_collections(locations, src, dst, args) 355 356 # copy the workflow itself 357 del wf['uuid'] 358 wf['owner_uuid'] = args.project_uuid 359 360 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 361 ["name", "=", wf["name"]]]).execute() 362 if len(existing["items"]) == 0: 363 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 364 else: 365 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
368def workflow_collections(obj, locations, docker_images): 369 if isinstance(obj, dict): 370 loc = obj.get('location', None) 371 if loc is not None: 372 if loc.startswith("keep:"): 373 locations.append(loc[5:]) 374 375 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 376 if docker_image is not None: 377 ds = docker_image.split(":", 1) 378 tag = ds[1] if len(ds)==2 else 'latest' 379 docker_images[ds[0]] = tag 380 381 for x in obj: 382 workflow_collections(obj[x], locations, docker_images) 383 elif isinstance(obj, list): 384 for x in obj: 385 workflow_collections(x, locations, docker_images)
399def copy_collections(obj, src, dst, args): 400 401 def copy_collection_fn(collection_match): 402 """Helper function for regex substitution: copies a single collection, 403 identified by the collection_match MatchObject, to the 404 destination. Returns the destination collection uuid (or the 405 portable data hash if that's what src_id is). 406 407 """ 408 src_id = collection_match.group(0) 409 if src_id not in collections_copied: 410 dst_col = copy_collection(src_id, src, dst, args) 411 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 412 collections_copied[src_id] = src_id 413 else: 414 collections_copied[src_id] = dst_col['uuid'] 415 return collections_copied[src_id] 416 417 if isinstance(obj, basestring): 418 # Copy any collections identified in this string to dst, replacing 419 # them with the dst uuids as necessary. 420 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 421 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 422 return obj 423 elif isinstance(obj, dict): 424 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 425 for v in obj) 426 elif isinstance(obj, list): 427 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 428 return obj
431def total_collection_size(manifest_text): 432 """Return the total number of bytes in this collection (excluding 433 duplicate blocks).""" 434 435 total_bytes = 0 436 locators_seen = {} 437 for line in manifest_text.splitlines(): 438 words = line.split() 439 for word in words[1:]: 440 try: 441 loc = arvados.KeepLocator(word) 442 except ValueError: 443 continue # this word isn't a locator, skip it 444 if loc.md5sum not in locators_seen: 445 locators_seen[loc.md5sum] = True 446 total_bytes += loc.size 447 448 return total_bytes
Return the total number of bytes in this collection (excluding duplicate blocks).
450def create_collection_from(c, src, dst, args): 451 """Create a new collection record on dst, and copy Docker metadata if 452 available.""" 453 454 collection_uuid = c['uuid'] 455 body = {} 456 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'): 457 body[d] = c[d] 458 459 if not body["name"]: 460 body['name'] = "copied from " + collection_uuid 461 462 if args.storage_classes: 463 body['storage_classes_desired'] = args.storage_classes 464 465 body['owner_uuid'] = args.project_uuid 466 467 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 468 469 # Create docker_image_repo+tag and docker_image_hash links 470 # at the destination. 471 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 472 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 473 474 for src_link in docker_links: 475 body = {key: src_link[key] 476 for key in ['link_class', 'name', 'properties']} 477 body['head_uuid'] = dst_collection['uuid'] 478 body['owner_uuid'] = args.project_uuid 479 480 lk = dst.links().create(body=body).execute(num_retries=args.retries) 481 logger.debug('created dst link {}'.format(lk)) 482 483 return dst_collection
Create a new collection record on dst, and copy Docker metadata if available.
507def copy_collection(obj_uuid, src, dst, args): 508 if arvados.util.keep_locator_pattern.match(obj_uuid): 509 # If the obj_uuid is a portable data hash, it might not be 510 # uniquely identified with a particular collection. As a 511 # result, it is ambiguous as to what name to use for the copy. 512 # Apply some heuristics to pick which collection to get the 513 # name from. 514 srccol = src.collections().list( 515 filters=[['portable_data_hash', '=', obj_uuid]], 516 order="created_at asc" 517 ).execute(num_retries=args.retries) 518 519 items = srccol.get("items") 520 521 if not items: 522 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 523 return 524 525 c = None 526 527 if len(items) == 1: 528 # There's only one collection with the PDH, so use that. 529 c = items[0] 530 if not c: 531 # See if there is a collection that's in the same project 532 # as the root item (usually a workflow) being copied. 533 for i in items: 534 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 535 c = i 536 break 537 if not c: 538 # Didn't find any collections located in the same project, so 539 # pick the oldest collection that has a name assigned to it. 540 for i in items: 541 if i.get("name"): 542 c = i 543 break 544 if not c: 545 # None of the collections have names (?!), so just pick the 546 # first one. 547 c = items[0] 548 549 # list() doesn't return manifest text (and we don't want it to, 550 # because we don't need the same maninfest text sent to us 50 551 # times) so go and retrieve the collection object directly 552 # which will include the manifest text. 553 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 554 else: 555 # Assume this is an actual collection uuid, so fetch it directly. 556 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 557 558 # If a collection with this hash already exists at the 559 # destination, and 'force' is not true, just return that 560 # collection. 561 if not args.force: 562 if 'portable_data_hash' in c: 563 colhash = c['portable_data_hash'] 564 else: 565 colhash = c['uuid'] 566 dstcol = dst.collections().list( 567 filters=[['portable_data_hash', '=', colhash]] 568 ).execute(num_retries=args.retries) 569 if dstcol['items_available'] > 0: 570 for d in dstcol['items']: 571 if ((args.project_uuid == d['owner_uuid']) and 572 (c.get('name') == d['name']) and 573 (c['portable_data_hash'] == d['portable_data_hash'])): 574 return d 575 c['manifest_text'] = dst.collections().get( 576 uuid=dstcol['items'][0]['uuid'] 577 ).execute(num_retries=args.retries)['manifest_text'] 578 return create_collection_from(c, src, dst, args) 579 580 # Fetch the collection's manifest. 581 manifest = c['manifest_text'] 582 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 583 584 # Copy each block from src_keep to dst_keep. 585 # Use the newly signed locators returned from dst_keep to build 586 # a new manifest as we go. 587 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 588 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 589 dst_manifest = io.StringIO() 590 dst_locators = {} 591 bytes_written = 0 592 bytes_expected = total_collection_size(manifest) 593 if args.progress: 594 progress_writer = ProgressWriter(human_progress) 595 else: 596 progress_writer = None 597 598 # go through the words 599 # put each block loc into 'get' queue 600 # 'get' threads get block and put it into 'put' queue 601 # 'put' threads put block and then update dst_locators 602 # 603 # after going through the whole manifest we go back through it 604 # again and build dst_manifest 605 606 lock = threading.Lock() 607 608 # the get queue should be unbounded because we'll add all the 609 # block hashes we want to get, but these are small 610 get_queue = queue.Queue() 611 612 threadcount = 4 613 614 # the put queue contains full data blocks 615 # and if 'get' is faster than 'put' we could end up consuming 616 # a great deal of RAM if it isn't bounded. 617 put_queue = queue.Queue(threadcount) 618 transfer_error = [] 619 620 def get_thread(): 621 while True: 622 word = get_queue.get() 623 if word is None: 624 put_queue.put(None) 625 get_queue.task_done() 626 return 627 628 blockhash = arvados.KeepLocator(word).md5sum 629 with lock: 630 if blockhash in dst_locators: 631 # Already uploaded 632 get_queue.task_done() 633 continue 634 635 try: 636 logger.debug("Getting block %s", word) 637 data = src_keep.get(word) 638 put_queue.put((word, data)) 639 except e: 640 logger.error("Error getting block %s: %s", word, e) 641 transfer_error.append(e) 642 try: 643 # Drain the 'get' queue so we end early 644 while True: 645 get_queue.get(False) 646 get_queue.task_done() 647 except queue.Empty: 648 pass 649 finally: 650 get_queue.task_done() 651 652 def put_thread(): 653 nonlocal bytes_written 654 while True: 655 item = put_queue.get() 656 if item is None: 657 put_queue.task_done() 658 return 659 660 word, data = item 661 loc = arvados.KeepLocator(word) 662 blockhash = loc.md5sum 663 with lock: 664 if blockhash in dst_locators: 665 # Already uploaded 666 put_queue.task_done() 667 continue 668 669 try: 670 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 671 dst_locator = dst_keep.put(data, classes=(args.storage_classes or [])) 672 with lock: 673 dst_locators[blockhash] = dst_locator 674 bytes_written += loc.size 675 if progress_writer: 676 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 677 except e: 678 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 679 try: 680 # Drain the 'get' queue so we end early 681 while True: 682 get_queue.get(False) 683 get_queue.task_done() 684 except queue.Empty: 685 pass 686 transfer_error.append(e) 687 finally: 688 put_queue.task_done() 689 690 for line in manifest.splitlines(): 691 words = line.split() 692 for word in words[1:]: 693 try: 694 loc = arvados.KeepLocator(word) 695 except ValueError: 696 # If 'word' can't be parsed as a locator, 697 # presume it's a filename. 698 continue 699 700 get_queue.put(word) 701 702 for i in range(0, threadcount): 703 get_queue.put(None) 704 705 for i in range(0, threadcount): 706 threading.Thread(target=get_thread, daemon=True).start() 707 708 for i in range(0, threadcount): 709 threading.Thread(target=put_thread, daemon=True).start() 710 711 get_queue.join() 712 put_queue.join() 713 714 if len(transfer_error) > 0: 715 return {"error_token": "Failed to transfer blocks"} 716 717 for line in manifest.splitlines(): 718 words = line.split() 719 dst_manifest.write(words[0]) 720 for word in words[1:]: 721 try: 722 loc = arvados.KeepLocator(word) 723 except ValueError: 724 # If 'word' can't be parsed as a locator, 725 # presume it's a filename. 726 dst_manifest.write(' ') 727 dst_manifest.write(word) 728 continue 729 blockhash = loc.md5sum 730 dst_manifest.write(' ') 731 dst_manifest.write(dst_locators[blockhash]) 732 dst_manifest.write("\n") 733 734 if progress_writer: 735 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 736 progress_writer.finish() 737 738 # Copy the manifest and save the collection. 739 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 740 741 c['manifest_text'] = dst_manifest.getvalue() 742 return create_collection_from(c, src, dst, args)
744def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt): 745 r = api.repositories().list( 746 filters=[['name', '=', repo_name]]).execute(num_retries=retries) 747 if r['items_available'] != 1: 748 raise Exception('cannot identify repo {}; {} repos found' 749 .format(repo_name, r['items_available'])) 750 751 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")] 752 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")] 753 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")] 754 755 priority = https_url + other_url + http_url 756 757 for url in priority: 758 if url.startswith("http"): 759 u = urllib.parse.urlsplit(url) 760 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", "")) 761 git_config = ["-c", "credential.%s/.username=none" % baseurl, 762 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl] 763 else: 764 git_config = [] 765 766 try: 767 logger.debug("trying %s", url) 768 subprocess.run( 769 ['git', *git_config, 'ls-remote', url], 770 check=True, 771 env={ 772 'ARVADOS_API_TOKEN': api.api_token, 773 'GIT_ASKPASS': '/bin/false', 774 'HOME': os.environ['HOME'], 775 }, 776 stdout=subprocess.DEVNULL, 777 ) 778 except subprocess.CalledProcessError: 779 pass 780 else: 781 git_url = url 782 break 783 else: 784 raise Exception('Cannot access git repository, tried {}' 785 .format(priority)) 786 787 if git_url.startswith("http:"): 788 if allow_insecure_http: 789 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt) 790 else: 791 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt)) 792 793 return (git_url, git_config)
796def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 797 """Copy the docker image identified by docker_image and 798 docker_image_tag from src to dst. Create appropriate 799 docker_image_repo+tag and docker_image_hash links at dst. 800 801 """ 802 803 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 804 805 # Find the link identifying this docker image. 806 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 807 src, args.retries, docker_image, docker_image_tag) 808 if docker_image_list: 809 image_uuid, image_info = docker_image_list[0] 810 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 811 812 # Copy the collection it refers to. 813 dst_image_col = copy_collection(image_uuid, src, dst, args) 814 elif arvados.util.keep_locator_pattern.match(docker_image): 815 dst_image_col = copy_collection(docker_image, src, dst, args) 816 else: 817 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.
819def copy_project(obj_uuid, src, dst, owner_uuid, args): 820 821 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 822 823 # Create/update the destination project 824 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 825 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 826 if len(existing["items"]) == 0: 827 project_record = dst.groups().create(body={"group": {"group_class": "project", 828 "owner_uuid": owner_uuid, 829 "name": src_project_record["name"]}}).execute(num_retries=args.retries) 830 else: 831 project_record = existing["items"][0] 832 833 dst.groups().update(uuid=project_record["uuid"], 834 body={"group": { 835 "description": src_project_record["description"]}}).execute(num_retries=args.retries) 836 837 args.project_uuid = project_record["uuid"] 838 839 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 840 841 842 partial_error = "" 843 844 # Copy collections 845 try: 846 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 847 src, dst, args) 848 except Exception as e: 849 partial_error += "\n" + str(e) 850 851 # Copy workflows 852 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 853 try: 854 copy_workflow(w["uuid"], src, dst, args) 855 except Exception as e: 856 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 857 858 if args.recursive: 859 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 860 try: 861 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 862 except Exception as e: 863 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 864 865 project_record["partial_error"] = partial_error 866 867 return project_record
895def uuid_type(api, object_uuid): 896 if re.match(arvados.util.keep_locator_pattern, object_uuid): 897 return 'Collection' 898 899 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 900 return 'httpURL' 901 902 p = object_uuid.split('-') 903 if len(p) == 3: 904 type_prefix = p[1] 905 for k in api._schema.schemas: 906 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 907 if type_prefix == obj_class: 908 return k 909 return None
912def copy_from_http(url, src, dst, args): 913 914 project_uuid = args.project_uuid 915 varying_url_params = args.varying_url_params 916 prefer_cached_downloads = args.prefer_cached_downloads 917 918 cached = arvados.http_to_keep.check_cached_url(src, project_uuid, url, {}, 919 varying_url_params=varying_url_params, 920 prefer_cached_downloads=prefer_cached_downloads) 921 if cached[2] is not None: 922 return copy_collection(cached[2], src, dst, args) 923 924 cached = arvados.http_to_keep.http_to_keep(dst, project_uuid, url, 925 varying_url_params=varying_url_params, 926 prefer_cached_downloads=prefer_cached_downloads) 927 928 if cached is not None: 929 return {"uuid": cached[2]}
951def human_progress(obj_uuid, bytes_written, bytes_expected): 952 if bytes_expected: 953 return "\r{}: {}M / {}M {:.1%} ".format( 954 obj_uuid, 955 bytes_written >> 20, bytes_expected >> 20, 956 float(bytes_written) / bytes_expected) 957 else: 958 return "\r{}: {} ".format(obj_uuid, bytes_written)
960class ProgressWriter(object): 961 _progress_func = None 962 outfile = sys.stderr 963 964 def __init__(self, progress_func): 965 self._progress_func = progress_func 966 967 def report(self, obj_uuid, bytes_written, bytes_expected): 968 if self._progress_func is not None: 969 self.outfile.write( 970 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 971 972 def finish(self): 973 self.outfile.write("\n")