arvados.commands.arv_copy
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5# arv-copy [--recursive] [--no-recursive] object-uuid 6# 7# Copies an object from Arvados instance src to instance dst. 8# 9# By default, arv-copy recursively copies any dependent objects 10# necessary to make the object functional in the new instance 11# (e.g. for a workflow, arv-copy copies the workflow, 12# input collections, and docker images). If 13# --no-recursive is given, arv-copy copies only the single record 14# identified by object-uuid. 15# 16# The user must have configuration files {src}.conf and 17# {dst}.conf in a standard configuration directory with valid login credentials 18# for instances src and dst. If either of these files is not found, 19# arv-copy will issue an error. 20 21import argparse 22import contextlib 23import getpass 24import os 25import re 26import shutil 27import subprocess 28import sys 29import logging 30import tempfile 31import urllib.parse 32import io 33import json 34import queue 35import threading 36import errno 37 38import httplib2.error 39import googleapiclient 40 41import arvados 42import arvados.config 43import arvados.keep 44import arvados.util 45import arvados.commands._util as arv_cmd 46import arvados.commands.keepdocker 47from arvados.logging import log_handler 48 49from arvados._internal import basedirs, http_to_keep 50from arvados._version import __version__ 51 52COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$') 53 54logger = logging.getLogger('arvados.arv-copy') 55 56# Set this up so connection errors get logged. 57googleapi_logger = logging.getLogger('googleapiclient.http') 58 59# local_repo_dir records which git repositories from the Arvados source 60# instance have been checked out locally during this run, and to which 61# directories. 62# e.g. if repository 'twp' from src_arv has been cloned into 63# /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A' 64# 65local_repo_dir = {} 66 67# List of collections that have been copied in this session, and their 68# destination collection UUIDs. 69collections_copied = {} 70 71# Set of (repository, script_version) two-tuples of commits copied in git. 72scripts_copied = set() 73 74# The owner_uuid of the object being copied 75src_owner_uuid = None 76 77def main(): 78 copy_opts = argparse.ArgumentParser(add_help=False) 79 80 copy_opts.add_argument( 81 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 82 help='Print version and exit.') 83 copy_opts.add_argument( 84 '-v', '--verbose', dest='verbose', action='store_true', 85 help='Verbose output.') 86 copy_opts.add_argument( 87 '--progress', dest='progress', action='store_true', 88 help='Report progress on copying collections. (default)') 89 copy_opts.add_argument( 90 '--no-progress', dest='progress', action='store_false', 91 help='Do not report progress on copying collections.') 92 copy_opts.add_argument( 93 '-f', '--force', dest='force', action='store_true', 94 help='Perform copy even if the object appears to exist at the remote destination.') 95 copy_opts.add_argument( 96 '--src', dest='source_arvados', 97 help=""" 98Client configuration location for the source Arvados cluster. 99May be either a configuration file path, or a plain identifier like `foo` 100to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 101If not provided, will search for a configuration file named after the cluster ID of the source object UUID. 102""", 103 ) 104 copy_opts.add_argument( 105 '--dst', dest='destination_arvados', 106 help=""" 107Client configuration location for the destination Arvados cluster. 108May be either a configuration file path, or a plain identifier like `foo` 109to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 110If not provided, will use the default client configuration from the environment or `settings.conf`. 111""", 112 ) 113 copy_opts.add_argument( 114 '--recursive', dest='recursive', action='store_true', 115 help='Recursively copy any dependencies for this object, and subprojects. (default)') 116 copy_opts.add_argument( 117 '--no-recursive', dest='recursive', action='store_false', 118 help='Do not copy any dependencies or subprojects.') 119 copy_opts.add_argument( 120 '--project-uuid', dest='project_uuid', 121 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 122 copy_opts.add_argument( 123 '--replication', 124 type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), 125 metavar='N', 126 help=""" 127Number of replicas per storage class for the copied collections at the destination. 128If not provided (or if provided with invalid value), 129use the destination's default replication-level setting (if found), 130or the fallback value 2. 131""") 132 copy_opts.add_argument( 133 '--storage-classes', 134 type=arv_cmd.UniqueSplit(), 135 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 136 copy_opts.add_argument("--varying-url-params", type=str, default="", 137 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 138 139 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 140 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 141 142 copy_opts.add_argument( 143 'object_uuid', 144 help='The UUID of the object to be copied.') 145 copy_opts.set_defaults(progress=True) 146 copy_opts.set_defaults(recursive=True) 147 148 parser = argparse.ArgumentParser( 149 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 150 parents=[copy_opts, arv_cmd.retry_opt]) 151 args = parser.parse_args() 152 153 if args.verbose: 154 logger.setLevel(logging.DEBUG) 155 else: 156 logger.setLevel(logging.INFO) 157 158 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 159 args.source_arvados = args.object_uuid[:5] 160 161 if not args.destination_arvados and args.project_uuid: 162 args.destination_arvados = args.project_uuid[:5] 163 164 # Make sure errors trying to connect to clusters get logged. 165 googleapi_logger.setLevel(logging.WARN) 166 googleapi_logger.addHandler(log_handler) 167 168 # Create API clients for the source and destination instances 169 src_arv = api_for_instance(args.source_arvados, args.retries) 170 dst_arv = api_for_instance(args.destination_arvados, args.retries) 171 172 # Once we've successfully contacted the clusters, we probably 173 # don't want to see logging about retries (unless the user asked 174 # for verbose output). 175 if not args.verbose: 176 googleapi_logger.setLevel(logging.ERROR) 177 178 if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: 179 logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) 180 else: 181 logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) 182 logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) 183 184 if not args.project_uuid: 185 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 186 187 # Identify the kind of object we have been given, and begin copying. 188 t = uuid_type(src_arv, args.object_uuid) 189 190 try: 191 if t == 'Collection': 192 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 193 result = copy_collection(args.object_uuid, 194 src_arv, dst_arv, 195 args) 196 elif t == 'Workflow': 197 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 198 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 199 elif t == 'Group': 200 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 201 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 202 elif t == 'httpURL': 203 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args) 204 else: 205 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 206 except Exception as e: 207 logger.error("%s", e, exc_info=args.verbose) 208 exit(1) 209 210 # Clean up any outstanding temp git repositories. 211 for d in local_repo_dir.values(): 212 shutil.rmtree(d, ignore_errors=True) 213 214 if not result: 215 exit(1) 216 217 # If no exception was thrown and the response does not have an 218 # error_token field, presume success 219 if result is None or 'error_token' in result or 'uuid' not in result: 220 if result: 221 logger.error("API server returned an error result: {}".format(result)) 222 exit(1) 223 224 print(result['uuid']) 225 226 if result.get('partial_error'): 227 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 228 exit(1) 229 230 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 231 exit(0) 232 233def set_src_owner_uuid(resource, uuid, args): 234 global src_owner_uuid 235 c = resource.get(uuid=uuid).execute(num_retries=args.retries) 236 src_owner_uuid = c.get("owner_uuid") 237 238# api_for_instance(instance_name) 239# 240# Creates an API client for the Arvados instance identified by 241# instance_name. 242# 243# If instance_name contains a slash, it is presumed to be a path 244# (either local or absolute) to a file with Arvados configuration 245# settings. 246# 247# Otherwise, it is presumed to be the name of a file in a standard 248# configuration directory. 249# 250def api_for_instance(instance_name, num_retries): 251 msg = [] 252 if instance_name: 253 if '/' in instance_name: 254 config_file = instance_name 255 else: 256 dirs = basedirs.BaseDirectories('CONFIG') 257 config_file = next(dirs.search(f'{instance_name}.conf'), '') 258 259 try: 260 cfg = arvados.config.load(config_file) 261 262 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 263 api_is_insecure = ( 264 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 265 ['1', 't', 'true', 'y', 'yes'])) 266 return arvados.api('v1', 267 host=cfg['ARVADOS_API_HOST'], 268 token=cfg['ARVADOS_API_TOKEN'], 269 insecure=api_is_insecure, 270 num_retries=num_retries, 271 ) 272 else: 273 msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) 274 except OSError as e: 275 if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): 276 verb = 'connect to instance from' 277 elif config_file: 278 verb = 'open' 279 else: 280 verb = 'find' 281 searchlist = ":".join(str(p) for p in dirs.search_paths()) 282 config_file = f'{instance_name}.conf in path {searchlist}' 283 msg.append(("Could not {} config file {}: {}").format( 284 verb, config_file, e.strerror)) 285 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: 286 msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) 287 288 default_api = None 289 default_instance = None 290 try: 291 default_api = arvados.api('v1', num_retries=num_retries) 292 default_instance = default_api.config()["ClusterID"] 293 except ValueError: 294 pass 295 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: 296 msg.append("Failed to connect to default instance, error was {}".format(e)) 297 298 if default_api is not None and (not instance_name or instance_name == default_instance): 299 # Use default settings 300 return default_api 301 302 if instance_name and default_instance and instance_name != default_instance: 303 msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) 304 305 for m in msg: 306 logger.error(m) 307 308 abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN') 309 310# Check if git is available 311def check_git_availability(): 312 try: 313 subprocess.run( 314 ['git', '--version'], 315 check=True, 316 stdout=subprocess.DEVNULL, 317 ) 318 except FileNotFoundError: 319 abort('git command is not available. Please ensure git is installed.') 320 321 322def filter_iter(arg): 323 """Iterate a filter string-or-list. 324 325 Pass in a filter field that can either be a string or list. 326 This will iterate elements as if the field had been written as a list. 327 """ 328 if isinstance(arg, str): 329 yield arg 330 else: 331 yield from arg 332 333def migrate_repository_filter(repo_filter, src_repository, dst_repository): 334 """Update a single repository filter in-place for the destination. 335 336 If the filter checks that the repository is src_repository, it is 337 updated to check that the repository is dst_repository. If it does 338 anything else, this function raises ValueError. 339 """ 340 if src_repository is None: 341 raise ValueError("component does not specify a source repository") 342 elif dst_repository is None: 343 raise ValueError("no destination repository specified to update repository filter") 344 elif repo_filter[1:] == ['=', src_repository]: 345 repo_filter[2] = dst_repository 346 elif repo_filter[1:] == ['in', [src_repository]]: 347 repo_filter[2] = [dst_repository] 348 else: 349 raise ValueError("repository filter is not a simple source match") 350 351def migrate_script_version_filter(version_filter): 352 """Update a single script_version filter in-place for the destination. 353 354 Currently this function checks that all the filter operands are Git 355 commit hashes. If they're not, it raises ValueError to indicate that 356 the filter is not portable. It could be extended to make other 357 transformations in the future. 358 """ 359 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 360 raise ValueError("script_version filter is not limited to commit hashes") 361 362def attr_filtered(filter_, *attr_names): 363 """Return True if filter_ applies to any of attr_names, else False.""" 364 return any((name == 'any') or (name in attr_names) 365 for name in filter_iter(filter_[0])) 366 367@contextlib.contextmanager 368def exception_handler(handler, *exc_types): 369 """If any exc_types are raised in the block, call handler on the exception.""" 370 try: 371 yield 372 except exc_types as error: 373 handler(error) 374 375 376# copy_workflow(wf_uuid, src, dst, args) 377# 378# Copies a workflow identified by wf_uuid from src to dst. 379# 380# If args.recursive is True, also copy any collections 381# referenced in the workflow definition yaml. 382# 383# The owner_uuid of the new workflow is set to any given 384# project_uuid or the user who copied the template. 385# 386# Returns the copied workflow object. 387# 388def copy_workflow(wf_uuid, src, dst, args): 389 # fetch the workflow from the source instance 390 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 391 392 if not wf["definition"]: 393 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 394 395 # copy collections and docker images 396 if args.recursive and wf["definition"]: 397 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 398 "ARVADOS_API_TOKEN": src.api_token, 399 "PATH": os.environ["PATH"]} 400 try: 401 result = subprocess.run( 402 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 403 env=env, 404 stdout=subprocess.PIPE, 405 universal_newlines=True, 406 ) 407 except FileNotFoundError: 408 no_arv_copy = True 409 else: 410 no_arv_copy = result.returncode == 2 411 412 if no_arv_copy: 413 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 414 elif result.returncode != 0: 415 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 416 417 locations = json.loads(result.stdout) 418 419 if locations: 420 copy_collections(locations, src, dst, args) 421 422 # copy the workflow itself 423 del wf['uuid'] 424 wf['owner_uuid'] = args.project_uuid 425 426 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 427 ["name", "=", wf["name"]]]).execute() 428 if len(existing["items"]) == 0: 429 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 430 else: 431 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries) 432 433 434def workflow_collections(obj, locations, docker_images): 435 if isinstance(obj, dict): 436 loc = obj.get('location', None) 437 if loc is not None: 438 if loc.startswith("keep:"): 439 locations.append(loc[5:]) 440 441 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 442 if docker_image is not None: 443 ds = docker_image.split(":", 1) 444 tag = ds[1] if len(ds)==2 else 'latest' 445 docker_images[ds[0]] = tag 446 447 for x in obj: 448 workflow_collections(obj[x], locations, docker_images) 449 elif isinstance(obj, list): 450 for x in obj: 451 workflow_collections(x, locations, docker_images) 452 453# copy_collections(obj, src, dst, args) 454# 455# Recursively copies all collections referenced by 'obj' from src 456# to dst. obj may be a dict or a list, in which case we run 457# copy_collections on every value it contains. If it is a string, 458# search it for any substring that matches a collection hash or uuid 459# (this will find hidden references to collections like 460# "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)") 461# 462# Returns a copy of obj with any old collection uuids replaced by 463# the new ones. 464# 465def copy_collections(obj, src, dst, args): 466 467 def copy_collection_fn(collection_match): 468 """Helper function for regex substitution: copies a single collection, 469 identified by the collection_match MatchObject, to the 470 destination. Returns the destination collection uuid (or the 471 portable data hash if that's what src_id is). 472 473 """ 474 src_id = collection_match.group(0) 475 if src_id not in collections_copied: 476 dst_col = copy_collection(src_id, src, dst, args) 477 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 478 collections_copied[src_id] = src_id 479 else: 480 collections_copied[src_id] = dst_col['uuid'] 481 return collections_copied[src_id] 482 483 if isinstance(obj, str): 484 # Copy any collections identified in this string to dst, replacing 485 # them with the dst uuids as necessary. 486 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 487 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 488 return obj 489 elif isinstance(obj, dict): 490 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 491 for v in obj) 492 elif isinstance(obj, list): 493 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 494 return obj 495 496 497def total_collection_size(manifest_text): 498 """Return the total number of bytes in this collection (excluding 499 duplicate blocks).""" 500 501 total_bytes = 0 502 locators_seen = {} 503 for line in manifest_text.splitlines(): 504 words = line.split() 505 for word in words[1:]: 506 try: 507 loc = arvados.KeepLocator(word) 508 except ValueError: 509 continue # this word isn't a locator, skip it 510 if loc.md5sum not in locators_seen: 511 locators_seen[loc.md5sum] = True 512 total_bytes += loc.size 513 514 return total_bytes 515 516def create_collection_from(c, src, dst, args): 517 """Create a new collection record on dst, and copy Docker metadata if 518 available.""" 519 520 collection_uuid = c['uuid'] 521 body = {} 522 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'): 523 body[d] = c[d] 524 525 if not body["name"]: 526 body['name'] = "copied from " + collection_uuid 527 528 if args.storage_classes: 529 body['storage_classes_desired'] = args.storage_classes 530 531 body['owner_uuid'] = args.project_uuid 532 533 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 534 535 # Create docker_image_repo+tag and docker_image_hash links 536 # at the destination. 537 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 538 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 539 540 for src_link in docker_links: 541 body = {key: src_link[key] 542 for key in ['link_class', 'name', 'properties']} 543 body['head_uuid'] = dst_collection['uuid'] 544 body['owner_uuid'] = args.project_uuid 545 546 lk = dst.links().create(body=body).execute(num_retries=args.retries) 547 logger.debug('created dst link {}'.format(lk)) 548 549 return dst_collection 550 551# copy_collection(obj_uuid, src, dst, args) 552# 553# Copies the collection identified by obj_uuid from src to dst. 554# Returns the collection object created at dst. 555# 556# If args.progress is True, produce a human-friendly progress 557# report. 558# 559# If a collection with the desired portable_data_hash already 560# exists at dst, and args.force is False, copy_collection returns 561# the existing collection without copying any blocks. Otherwise 562# (if no collection exists or if args.force is True) 563# copy_collection copies all of the collection data blocks from src 564# to dst. 565# 566# For this application, it is critical to preserve the 567# collection's manifest hash, which is not guaranteed with the 568# arvados.CollectionReader and arvados.CollectionWriter classes. 569# Copying each block in the collection manually, followed by 570# the manifest block, ensures that the collection's manifest 571# hash will not change. 572# 573def copy_collection(obj_uuid, src, dst, args): 574 if arvados.util.keep_locator_pattern.match(obj_uuid): 575 # If the obj_uuid is a portable data hash, it might not be 576 # uniquely identified with a particular collection. As a 577 # result, it is ambiguous as to what name to use for the copy. 578 # Apply some heuristics to pick which collection to get the 579 # name from. 580 srccol = src.collections().list( 581 filters=[['portable_data_hash', '=', obj_uuid]], 582 order="created_at asc" 583 ).execute(num_retries=args.retries) 584 585 items = srccol.get("items") 586 587 if not items: 588 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 589 return 590 591 c = None 592 593 if len(items) == 1: 594 # There's only one collection with the PDH, so use that. 595 c = items[0] 596 if not c: 597 # See if there is a collection that's in the same project 598 # as the root item (usually a workflow) being copied. 599 for i in items: 600 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 601 c = i 602 break 603 if not c: 604 # Didn't find any collections located in the same project, so 605 # pick the oldest collection that has a name assigned to it. 606 for i in items: 607 if i.get("name"): 608 c = i 609 break 610 if not c: 611 # None of the collections have names (?!), so just pick the 612 # first one. 613 c = items[0] 614 615 # list() doesn't return manifest text (and we don't want it to, 616 # because we don't need the same maninfest text sent to us 50 617 # times) so go and retrieve the collection object directly 618 # which will include the manifest text. 619 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 620 else: 621 # Assume this is an actual collection uuid, so fetch it directly. 622 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 623 624 # If a collection with this hash already exists at the 625 # destination, and 'force' is not true, just return that 626 # collection. 627 if not args.force: 628 if 'portable_data_hash' in c: 629 colhash = c['portable_data_hash'] 630 else: 631 colhash = c['uuid'] 632 dstcol = dst.collections().list( 633 filters=[['portable_data_hash', '=', colhash]] 634 ).execute(num_retries=args.retries) 635 if dstcol['items_available'] > 0: 636 for d in dstcol['items']: 637 if ((args.project_uuid == d['owner_uuid']) and 638 (c.get('name') == d['name']) and 639 (c['portable_data_hash'] == d['portable_data_hash'])): 640 return d 641 c['manifest_text'] = dst.collections().get( 642 uuid=dstcol['items'][0]['uuid'] 643 ).execute(num_retries=args.retries)['manifest_text'] 644 return create_collection_from(c, src, dst, args) 645 646 if args.replication is None: 647 # Obtain default or fallback collection replication setting on the 648 # destination 649 try: 650 args.replication = int(dst.config()["Collections"]["DefaultReplication"]) 651 except (KeyError, TypeError, ValueError): 652 args.replication = 2 653 654 # Fetch the collection's manifest. 655 manifest = c['manifest_text'] 656 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 657 658 # Copy each block from src_keep to dst_keep. 659 # Use the newly signed locators returned from dst_keep to build 660 # a new manifest as we go. 661 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 662 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 663 dst_manifest = io.StringIO() 664 dst_locators = {} 665 bytes_written = 0 666 bytes_expected = total_collection_size(manifest) 667 if args.progress: 668 progress_writer = ProgressWriter(human_progress) 669 else: 670 progress_writer = None 671 672 # go through the words 673 # put each block loc into 'get' queue 674 # 'get' threads get block and put it into 'put' queue 675 # 'put' threads put block and then update dst_locators 676 # 677 # after going through the whole manifest we go back through it 678 # again and build dst_manifest 679 680 lock = threading.Lock() 681 682 # the get queue should be unbounded because we'll add all the 683 # block hashes we want to get, but these are small 684 get_queue = queue.Queue() 685 686 threadcount = 4 687 688 # the put queue contains full data blocks 689 # and if 'get' is faster than 'put' we could end up consuming 690 # a great deal of RAM if it isn't bounded. 691 put_queue = queue.Queue(threadcount) 692 transfer_error = [] 693 694 def get_thread(): 695 while True: 696 word = get_queue.get() 697 if word is None: 698 put_queue.put(None) 699 get_queue.task_done() 700 return 701 702 blockhash = arvados.KeepLocator(word).md5sum 703 with lock: 704 if blockhash in dst_locators: 705 # Already uploaded 706 get_queue.task_done() 707 continue 708 709 try: 710 logger.debug("Getting block %s", word) 711 data = src_keep.get(word) 712 put_queue.put((word, data)) 713 except Exception as e: 714 logger.error("Error getting block %s: %s", word, e) 715 transfer_error.append(e) 716 try: 717 # Drain the 'get' queue so we end early 718 while True: 719 get_queue.get(False) 720 get_queue.task_done() 721 except queue.Empty: 722 pass 723 finally: 724 get_queue.task_done() 725 726 def put_thread(): 727 nonlocal bytes_written 728 while True: 729 item = put_queue.get() 730 if item is None: 731 put_queue.task_done() 732 return 733 734 word, data = item 735 loc = arvados.KeepLocator(word) 736 blockhash = loc.md5sum 737 with lock: 738 if blockhash in dst_locators: 739 # Already uploaded 740 put_queue.task_done() 741 continue 742 743 try: 744 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 745 dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) 746 with lock: 747 dst_locators[blockhash] = dst_locator 748 bytes_written += loc.size 749 if progress_writer: 750 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 751 except Exception as e: 752 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 753 try: 754 # Drain the 'get' queue so we end early 755 while True: 756 get_queue.get(False) 757 get_queue.task_done() 758 except queue.Empty: 759 pass 760 transfer_error.append(e) 761 finally: 762 put_queue.task_done() 763 764 for line in manifest.splitlines(): 765 words = line.split() 766 for word in words[1:]: 767 try: 768 loc = arvados.KeepLocator(word) 769 except ValueError: 770 # If 'word' can't be parsed as a locator, 771 # presume it's a filename. 772 continue 773 774 get_queue.put(word) 775 776 for i in range(0, threadcount): 777 get_queue.put(None) 778 779 for i in range(0, threadcount): 780 threading.Thread(target=get_thread, daemon=True).start() 781 782 for i in range(0, threadcount): 783 threading.Thread(target=put_thread, daemon=True).start() 784 785 get_queue.join() 786 put_queue.join() 787 788 if len(transfer_error) > 0: 789 return {"error_token": "Failed to transfer blocks"} 790 791 for line in manifest.splitlines(): 792 words = line.split() 793 dst_manifest.write(words[0]) 794 for word in words[1:]: 795 try: 796 loc = arvados.KeepLocator(word) 797 except ValueError: 798 # If 'word' can't be parsed as a locator, 799 # presume it's a filename. 800 dst_manifest.write(' ') 801 dst_manifest.write(word) 802 continue 803 blockhash = loc.md5sum 804 dst_manifest.write(' ') 805 dst_manifest.write(dst_locators[blockhash]) 806 dst_manifest.write("\n") 807 808 if progress_writer: 809 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 810 progress_writer.finish() 811 812 # Copy the manifest and save the collection. 813 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 814 815 c['manifest_text'] = dst_manifest.getvalue() 816 return create_collection_from(c, src, dst, args) 817 818def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 819 """Copy the docker image identified by docker_image and 820 docker_image_tag from src to dst. Create appropriate 821 docker_image_repo+tag and docker_image_hash links at dst. 822 823 """ 824 825 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 826 827 # Find the link identifying this docker image. 828 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 829 src, args.retries, docker_image, docker_image_tag) 830 if docker_image_list: 831 image_uuid, image_info = docker_image_list[0] 832 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 833 834 # Copy the collection it refers to. 835 dst_image_col = copy_collection(image_uuid, src, dst, args) 836 elif arvados.util.keep_locator_pattern.match(docker_image): 837 dst_image_col = copy_collection(docker_image, src, dst, args) 838 else: 839 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag)) 840 841def copy_project(obj_uuid, src, dst, owner_uuid, args): 842 843 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 844 845 # Create/update the destination project 846 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 847 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 848 if len(existing["items"]) == 0: 849 project_record = dst.groups().create(body={"group": {"group_class": "project", 850 "owner_uuid": owner_uuid, 851 "name": src_project_record["name"]}}).execute(num_retries=args.retries) 852 else: 853 project_record = existing["items"][0] 854 855 dst.groups().update(uuid=project_record["uuid"], 856 body={"group": { 857 "description": src_project_record["description"]}}).execute(num_retries=args.retries) 858 859 args.project_uuid = project_record["uuid"] 860 861 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 862 863 864 partial_error = "" 865 866 # Copy collections 867 try: 868 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 869 src, dst, args) 870 except Exception as e: 871 partial_error += "\n" + str(e) 872 873 # Copy workflows 874 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 875 try: 876 copy_workflow(w["uuid"], src, dst, args) 877 except Exception as e: 878 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 879 880 if args.recursive: 881 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 882 try: 883 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 884 except Exception as e: 885 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 886 887 project_record["partial_error"] = partial_error 888 889 return project_record 890 891# git_rev_parse(rev, repo) 892# 893# Returns the 40-character commit hash corresponding to 'rev' in 894# git repository 'repo' (which must be the path of a local git 895# repository) 896# 897def git_rev_parse(rev, repo): 898 proc = subprocess.run( 899 ['git', 'rev-parse', rev], 900 check=True, 901 cwd=repo, 902 stdout=subprocess.PIPE, 903 text=True, 904 ) 905 return proc.stdout.read().strip() 906 907# uuid_type(api, object_uuid) 908# 909# Returns the name of the class that object_uuid belongs to, based on 910# the second field of the uuid. This function consults the api's 911# schema to identify the object class. 912# 913# It returns a string such as 'Collection', 'Workflow', etc. 914# 915# Special case: if handed a Keep locator hash, return 'Collection'. 916# 917def uuid_type(api, object_uuid): 918 if re.match(arvados.util.keep_locator_pattern, object_uuid): 919 return 'Collection' 920 921 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 922 return 'httpURL' 923 924 p = object_uuid.split('-') 925 if len(p) == 3: 926 type_prefix = p[1] 927 for k in api._schema.schemas: 928 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 929 if type_prefix == obj_class: 930 return k 931 return None 932 933 934def copy_from_http(url, src, dst, args): 935 936 project_uuid = args.project_uuid 937 # Ensure string of varying parameters is well-formed 938 prefer_cached_downloads = args.prefer_cached_downloads 939 940 cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, 941 varying_url_params=args.varying_url_params, 942 prefer_cached_downloads=prefer_cached_downloads) 943 if cached[2] is not None: 944 return copy_collection(cached[2], src, dst, args) 945 946 cached = http_to_keep.http_to_keep(dst, project_uuid, url, 947 varying_url_params=args.varying_url_params, 948 prefer_cached_downloads=prefer_cached_downloads) 949 950 if cached is not None: 951 return {"uuid": cached[2]} 952 953 954def abort(msg, code=1): 955 logger.info("arv-copy: %s", msg) 956 exit(code) 957 958 959# Code for reporting on the progress of a collection upload. 960# Stolen from arvados.commands.put.ArvPutCollectionWriter 961# TODO(twp): figure out how to refactor into a shared library 962# (may involve refactoring some arvados.commands.arv_copy.copy_collection 963# code) 964 965def machine_progress(obj_uuid, bytes_written, bytes_expected): 966 return "{} {}: {} {} written {} total\n".format( 967 sys.argv[0], 968 os.getpid(), 969 obj_uuid, 970 bytes_written, 971 -1 if (bytes_expected is None) else bytes_expected) 972 973def human_progress(obj_uuid, bytes_written, bytes_expected): 974 if bytes_expected: 975 return "\r{}: {}M / {}M {:.1%} ".format( 976 obj_uuid, 977 bytes_written >> 20, bytes_expected >> 20, 978 float(bytes_written) / bytes_expected) 979 else: 980 return "\r{}: {} ".format(obj_uuid, bytes_written) 981 982class ProgressWriter(object): 983 _progress_func = None 984 outfile = sys.stderr 985 986 def __init__(self, progress_func): 987 self._progress_func = progress_func 988 989 def report(self, obj_uuid, bytes_written, bytes_expected): 990 if self._progress_func is not None: 991 self.outfile.write( 992 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 993 994 def finish(self): 995 self.outfile.write("\n") 996 997if __name__ == '__main__': 998 main()
78def main(): 79 copy_opts = argparse.ArgumentParser(add_help=False) 80 81 copy_opts.add_argument( 82 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 83 help='Print version and exit.') 84 copy_opts.add_argument( 85 '-v', '--verbose', dest='verbose', action='store_true', 86 help='Verbose output.') 87 copy_opts.add_argument( 88 '--progress', dest='progress', action='store_true', 89 help='Report progress on copying collections. (default)') 90 copy_opts.add_argument( 91 '--no-progress', dest='progress', action='store_false', 92 help='Do not report progress on copying collections.') 93 copy_opts.add_argument( 94 '-f', '--force', dest='force', action='store_true', 95 help='Perform copy even if the object appears to exist at the remote destination.') 96 copy_opts.add_argument( 97 '--src', dest='source_arvados', 98 help=""" 99Client configuration location for the source Arvados cluster. 100May be either a configuration file path, or a plain identifier like `foo` 101to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 102If not provided, will search for a configuration file named after the cluster ID of the source object UUID. 103""", 104 ) 105 copy_opts.add_argument( 106 '--dst', dest='destination_arvados', 107 help=""" 108Client configuration location for the destination Arvados cluster. 109May be either a configuration file path, or a plain identifier like `foo` 110to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 111If not provided, will use the default client configuration from the environment or `settings.conf`. 112""", 113 ) 114 copy_opts.add_argument( 115 '--recursive', dest='recursive', action='store_true', 116 help='Recursively copy any dependencies for this object, and subprojects. (default)') 117 copy_opts.add_argument( 118 '--no-recursive', dest='recursive', action='store_false', 119 help='Do not copy any dependencies or subprojects.') 120 copy_opts.add_argument( 121 '--project-uuid', dest='project_uuid', 122 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 123 copy_opts.add_argument( 124 '--replication', 125 type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), 126 metavar='N', 127 help=""" 128Number of replicas per storage class for the copied collections at the destination. 129If not provided (or if provided with invalid value), 130use the destination's default replication-level setting (if found), 131or the fallback value 2. 132""") 133 copy_opts.add_argument( 134 '--storage-classes', 135 type=arv_cmd.UniqueSplit(), 136 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 137 copy_opts.add_argument("--varying-url-params", type=str, default="", 138 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 139 140 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 141 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 142 143 copy_opts.add_argument( 144 'object_uuid', 145 help='The UUID of the object to be copied.') 146 copy_opts.set_defaults(progress=True) 147 copy_opts.set_defaults(recursive=True) 148 149 parser = argparse.ArgumentParser( 150 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 151 parents=[copy_opts, arv_cmd.retry_opt]) 152 args = parser.parse_args() 153 154 if args.verbose: 155 logger.setLevel(logging.DEBUG) 156 else: 157 logger.setLevel(logging.INFO) 158 159 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 160 args.source_arvados = args.object_uuid[:5] 161 162 if not args.destination_arvados and args.project_uuid: 163 args.destination_arvados = args.project_uuid[:5] 164 165 # Make sure errors trying to connect to clusters get logged. 166 googleapi_logger.setLevel(logging.WARN) 167 googleapi_logger.addHandler(log_handler) 168 169 # Create API clients for the source and destination instances 170 src_arv = api_for_instance(args.source_arvados, args.retries) 171 dst_arv = api_for_instance(args.destination_arvados, args.retries) 172 173 # Once we've successfully contacted the clusters, we probably 174 # don't want to see logging about retries (unless the user asked 175 # for verbose output). 176 if not args.verbose: 177 googleapi_logger.setLevel(logging.ERROR) 178 179 if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: 180 logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) 181 else: 182 logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) 183 logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) 184 185 if not args.project_uuid: 186 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 187 188 # Identify the kind of object we have been given, and begin copying. 189 t = uuid_type(src_arv, args.object_uuid) 190 191 try: 192 if t == 'Collection': 193 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 194 result = copy_collection(args.object_uuid, 195 src_arv, dst_arv, 196 args) 197 elif t == 'Workflow': 198 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 199 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 200 elif t == 'Group': 201 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 202 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 203 elif t == 'httpURL': 204 result = copy_from_http(args.object_uuid, src_arv, dst_arv, args) 205 else: 206 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 207 except Exception as e: 208 logger.error("%s", e, exc_info=args.verbose) 209 exit(1) 210 211 # Clean up any outstanding temp git repositories. 212 for d in local_repo_dir.values(): 213 shutil.rmtree(d, ignore_errors=True) 214 215 if not result: 216 exit(1) 217 218 # If no exception was thrown and the response does not have an 219 # error_token field, presume success 220 if result is None or 'error_token' in result or 'uuid' not in result: 221 if result: 222 logger.error("API server returned an error result: {}".format(result)) 223 exit(1) 224 225 print(result['uuid']) 226 227 if result.get('partial_error'): 228 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 229 exit(1) 230 231 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 232 exit(0)
251def api_for_instance(instance_name, num_retries): 252 msg = [] 253 if instance_name: 254 if '/' in instance_name: 255 config_file = instance_name 256 else: 257 dirs = basedirs.BaseDirectories('CONFIG') 258 config_file = next(dirs.search(f'{instance_name}.conf'), '') 259 260 try: 261 cfg = arvados.config.load(config_file) 262 263 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 264 api_is_insecure = ( 265 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 266 ['1', 't', 'true', 'y', 'yes'])) 267 return arvados.api('v1', 268 host=cfg['ARVADOS_API_HOST'], 269 token=cfg['ARVADOS_API_TOKEN'], 270 insecure=api_is_insecure, 271 num_retries=num_retries, 272 ) 273 else: 274 msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) 275 except OSError as e: 276 if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): 277 verb = 'connect to instance from' 278 elif config_file: 279 verb = 'open' 280 else: 281 verb = 'find' 282 searchlist = ":".join(str(p) for p in dirs.search_paths()) 283 config_file = f'{instance_name}.conf in path {searchlist}' 284 msg.append(("Could not {} config file {}: {}").format( 285 verb, config_file, e.strerror)) 286 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: 287 msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) 288 289 default_api = None 290 default_instance = None 291 try: 292 default_api = arvados.api('v1', num_retries=num_retries) 293 default_instance = default_api.config()["ClusterID"] 294 except ValueError: 295 pass 296 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: 297 msg.append("Failed to connect to default instance, error was {}".format(e)) 298 299 if default_api is not None and (not instance_name or instance_name == default_instance): 300 # Use default settings 301 return default_api 302 303 if instance_name and default_instance and instance_name != default_instance: 304 msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) 305 306 for m in msg: 307 logger.error(m) 308 309 abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
323def filter_iter(arg): 324 """Iterate a filter string-or-list. 325 326 Pass in a filter field that can either be a string or list. 327 This will iterate elements as if the field had been written as a list. 328 """ 329 if isinstance(arg, str): 330 yield arg 331 else: 332 yield from arg
Iterate a filter string-or-list.
Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.
334def migrate_repository_filter(repo_filter, src_repository, dst_repository): 335 """Update a single repository filter in-place for the destination. 336 337 If the filter checks that the repository is src_repository, it is 338 updated to check that the repository is dst_repository. If it does 339 anything else, this function raises ValueError. 340 """ 341 if src_repository is None: 342 raise ValueError("component does not specify a source repository") 343 elif dst_repository is None: 344 raise ValueError("no destination repository specified to update repository filter") 345 elif repo_filter[1:] == ['=', src_repository]: 346 repo_filter[2] = dst_repository 347 elif repo_filter[1:] == ['in', [src_repository]]: 348 repo_filter[2] = [dst_repository] 349 else: 350 raise ValueError("repository filter is not a simple source match")
Update a single repository filter in-place for the destination.
If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.
352def migrate_script_version_filter(version_filter): 353 """Update a single script_version filter in-place for the destination. 354 355 Currently this function checks that all the filter operands are Git 356 commit hashes. If they're not, it raises ValueError to indicate that 357 the filter is not portable. It could be extended to make other 358 transformations in the future. 359 """ 360 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 361 raise ValueError("script_version filter is not limited to commit hashes")
Update a single script_version filter in-place for the destination.
Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.
363def attr_filtered(filter_, *attr_names): 364 """Return True if filter_ applies to any of attr_names, else False.""" 365 return any((name == 'any') or (name in attr_names) 366 for name in filter_iter(filter_[0]))
Return True if filter_ applies to any of attr_names, else False.
368@contextlib.contextmanager 369def exception_handler(handler, *exc_types): 370 """If any exc_types are raised in the block, call handler on the exception.""" 371 try: 372 yield 373 except exc_types as error: 374 handler(error)
If any exc_types are raised in the block, call handler on the exception.
389def copy_workflow(wf_uuid, src, dst, args): 390 # fetch the workflow from the source instance 391 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 392 393 if not wf["definition"]: 394 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 395 396 # copy collections and docker images 397 if args.recursive and wf["definition"]: 398 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 399 "ARVADOS_API_TOKEN": src.api_token, 400 "PATH": os.environ["PATH"]} 401 try: 402 result = subprocess.run( 403 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 404 env=env, 405 stdout=subprocess.PIPE, 406 universal_newlines=True, 407 ) 408 except FileNotFoundError: 409 no_arv_copy = True 410 else: 411 no_arv_copy = result.returncode == 2 412 413 if no_arv_copy: 414 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 415 elif result.returncode != 0: 416 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 417 418 locations = json.loads(result.stdout) 419 420 if locations: 421 copy_collections(locations, src, dst, args) 422 423 # copy the workflow itself 424 del wf['uuid'] 425 wf['owner_uuid'] = args.project_uuid 426 427 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 428 ["name", "=", wf["name"]]]).execute() 429 if len(existing["items"]) == 0: 430 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 431 else: 432 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
435def workflow_collections(obj, locations, docker_images): 436 if isinstance(obj, dict): 437 loc = obj.get('location', None) 438 if loc is not None: 439 if loc.startswith("keep:"): 440 locations.append(loc[5:]) 441 442 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 443 if docker_image is not None: 444 ds = docker_image.split(":", 1) 445 tag = ds[1] if len(ds)==2 else 'latest' 446 docker_images[ds[0]] = tag 447 448 for x in obj: 449 workflow_collections(obj[x], locations, docker_images) 450 elif isinstance(obj, list): 451 for x in obj: 452 workflow_collections(x, locations, docker_images)
466def copy_collections(obj, src, dst, args): 467 468 def copy_collection_fn(collection_match): 469 """Helper function for regex substitution: copies a single collection, 470 identified by the collection_match MatchObject, to the 471 destination. Returns the destination collection uuid (or the 472 portable data hash if that's what src_id is). 473 474 """ 475 src_id = collection_match.group(0) 476 if src_id not in collections_copied: 477 dst_col = copy_collection(src_id, src, dst, args) 478 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 479 collections_copied[src_id] = src_id 480 else: 481 collections_copied[src_id] = dst_col['uuid'] 482 return collections_copied[src_id] 483 484 if isinstance(obj, str): 485 # Copy any collections identified in this string to dst, replacing 486 # them with the dst uuids as necessary. 487 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 488 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 489 return obj 490 elif isinstance(obj, dict): 491 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 492 for v in obj) 493 elif isinstance(obj, list): 494 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 495 return obj
498def total_collection_size(manifest_text): 499 """Return the total number of bytes in this collection (excluding 500 duplicate blocks).""" 501 502 total_bytes = 0 503 locators_seen = {} 504 for line in manifest_text.splitlines(): 505 words = line.split() 506 for word in words[1:]: 507 try: 508 loc = arvados.KeepLocator(word) 509 except ValueError: 510 continue # this word isn't a locator, skip it 511 if loc.md5sum not in locators_seen: 512 locators_seen[loc.md5sum] = True 513 total_bytes += loc.size 514 515 return total_bytes
Return the total number of bytes in this collection (excluding duplicate blocks).
517def create_collection_from(c, src, dst, args): 518 """Create a new collection record on dst, and copy Docker metadata if 519 available.""" 520 521 collection_uuid = c['uuid'] 522 body = {} 523 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'): 524 body[d] = c[d] 525 526 if not body["name"]: 527 body['name'] = "copied from " + collection_uuid 528 529 if args.storage_classes: 530 body['storage_classes_desired'] = args.storage_classes 531 532 body['owner_uuid'] = args.project_uuid 533 534 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 535 536 # Create docker_image_repo+tag and docker_image_hash links 537 # at the destination. 538 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 539 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 540 541 for src_link in docker_links: 542 body = {key: src_link[key] 543 for key in ['link_class', 'name', 'properties']} 544 body['head_uuid'] = dst_collection['uuid'] 545 body['owner_uuid'] = args.project_uuid 546 547 lk = dst.links().create(body=body).execute(num_retries=args.retries) 548 logger.debug('created dst link {}'.format(lk)) 549 550 return dst_collection
Create a new collection record on dst, and copy Docker metadata if available.
574def copy_collection(obj_uuid, src, dst, args): 575 if arvados.util.keep_locator_pattern.match(obj_uuid): 576 # If the obj_uuid is a portable data hash, it might not be 577 # uniquely identified with a particular collection. As a 578 # result, it is ambiguous as to what name to use for the copy. 579 # Apply some heuristics to pick which collection to get the 580 # name from. 581 srccol = src.collections().list( 582 filters=[['portable_data_hash', '=', obj_uuid]], 583 order="created_at asc" 584 ).execute(num_retries=args.retries) 585 586 items = srccol.get("items") 587 588 if not items: 589 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 590 return 591 592 c = None 593 594 if len(items) == 1: 595 # There's only one collection with the PDH, so use that. 596 c = items[0] 597 if not c: 598 # See if there is a collection that's in the same project 599 # as the root item (usually a workflow) being copied. 600 for i in items: 601 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 602 c = i 603 break 604 if not c: 605 # Didn't find any collections located in the same project, so 606 # pick the oldest collection that has a name assigned to it. 607 for i in items: 608 if i.get("name"): 609 c = i 610 break 611 if not c: 612 # None of the collections have names (?!), so just pick the 613 # first one. 614 c = items[0] 615 616 # list() doesn't return manifest text (and we don't want it to, 617 # because we don't need the same maninfest text sent to us 50 618 # times) so go and retrieve the collection object directly 619 # which will include the manifest text. 620 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 621 else: 622 # Assume this is an actual collection uuid, so fetch it directly. 623 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 624 625 # If a collection with this hash already exists at the 626 # destination, and 'force' is not true, just return that 627 # collection. 628 if not args.force: 629 if 'portable_data_hash' in c: 630 colhash = c['portable_data_hash'] 631 else: 632 colhash = c['uuid'] 633 dstcol = dst.collections().list( 634 filters=[['portable_data_hash', '=', colhash]] 635 ).execute(num_retries=args.retries) 636 if dstcol['items_available'] > 0: 637 for d in dstcol['items']: 638 if ((args.project_uuid == d['owner_uuid']) and 639 (c.get('name') == d['name']) and 640 (c['portable_data_hash'] == d['portable_data_hash'])): 641 return d 642 c['manifest_text'] = dst.collections().get( 643 uuid=dstcol['items'][0]['uuid'] 644 ).execute(num_retries=args.retries)['manifest_text'] 645 return create_collection_from(c, src, dst, args) 646 647 if args.replication is None: 648 # Obtain default or fallback collection replication setting on the 649 # destination 650 try: 651 args.replication = int(dst.config()["Collections"]["DefaultReplication"]) 652 except (KeyError, TypeError, ValueError): 653 args.replication = 2 654 655 # Fetch the collection's manifest. 656 manifest = c['manifest_text'] 657 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 658 659 # Copy each block from src_keep to dst_keep. 660 # Use the newly signed locators returned from dst_keep to build 661 # a new manifest as we go. 662 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 663 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 664 dst_manifest = io.StringIO() 665 dst_locators = {} 666 bytes_written = 0 667 bytes_expected = total_collection_size(manifest) 668 if args.progress: 669 progress_writer = ProgressWriter(human_progress) 670 else: 671 progress_writer = None 672 673 # go through the words 674 # put each block loc into 'get' queue 675 # 'get' threads get block and put it into 'put' queue 676 # 'put' threads put block and then update dst_locators 677 # 678 # after going through the whole manifest we go back through it 679 # again and build dst_manifest 680 681 lock = threading.Lock() 682 683 # the get queue should be unbounded because we'll add all the 684 # block hashes we want to get, but these are small 685 get_queue = queue.Queue() 686 687 threadcount = 4 688 689 # the put queue contains full data blocks 690 # and if 'get' is faster than 'put' we could end up consuming 691 # a great deal of RAM if it isn't bounded. 692 put_queue = queue.Queue(threadcount) 693 transfer_error = [] 694 695 def get_thread(): 696 while True: 697 word = get_queue.get() 698 if word is None: 699 put_queue.put(None) 700 get_queue.task_done() 701 return 702 703 blockhash = arvados.KeepLocator(word).md5sum 704 with lock: 705 if blockhash in dst_locators: 706 # Already uploaded 707 get_queue.task_done() 708 continue 709 710 try: 711 logger.debug("Getting block %s", word) 712 data = src_keep.get(word) 713 put_queue.put((word, data)) 714 except Exception as e: 715 logger.error("Error getting block %s: %s", word, e) 716 transfer_error.append(e) 717 try: 718 # Drain the 'get' queue so we end early 719 while True: 720 get_queue.get(False) 721 get_queue.task_done() 722 except queue.Empty: 723 pass 724 finally: 725 get_queue.task_done() 726 727 def put_thread(): 728 nonlocal bytes_written 729 while True: 730 item = put_queue.get() 731 if item is None: 732 put_queue.task_done() 733 return 734 735 word, data = item 736 loc = arvados.KeepLocator(word) 737 blockhash = loc.md5sum 738 with lock: 739 if blockhash in dst_locators: 740 # Already uploaded 741 put_queue.task_done() 742 continue 743 744 try: 745 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 746 dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) 747 with lock: 748 dst_locators[blockhash] = dst_locator 749 bytes_written += loc.size 750 if progress_writer: 751 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 752 except Exception as e: 753 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 754 try: 755 # Drain the 'get' queue so we end early 756 while True: 757 get_queue.get(False) 758 get_queue.task_done() 759 except queue.Empty: 760 pass 761 transfer_error.append(e) 762 finally: 763 put_queue.task_done() 764 765 for line in manifest.splitlines(): 766 words = line.split() 767 for word in words[1:]: 768 try: 769 loc = arvados.KeepLocator(word) 770 except ValueError: 771 # If 'word' can't be parsed as a locator, 772 # presume it's a filename. 773 continue 774 775 get_queue.put(word) 776 777 for i in range(0, threadcount): 778 get_queue.put(None) 779 780 for i in range(0, threadcount): 781 threading.Thread(target=get_thread, daemon=True).start() 782 783 for i in range(0, threadcount): 784 threading.Thread(target=put_thread, daemon=True).start() 785 786 get_queue.join() 787 put_queue.join() 788 789 if len(transfer_error) > 0: 790 return {"error_token": "Failed to transfer blocks"} 791 792 for line in manifest.splitlines(): 793 words = line.split() 794 dst_manifest.write(words[0]) 795 for word in words[1:]: 796 try: 797 loc = arvados.KeepLocator(word) 798 except ValueError: 799 # If 'word' can't be parsed as a locator, 800 # presume it's a filename. 801 dst_manifest.write(' ') 802 dst_manifest.write(word) 803 continue 804 blockhash = loc.md5sum 805 dst_manifest.write(' ') 806 dst_manifest.write(dst_locators[blockhash]) 807 dst_manifest.write("\n") 808 809 if progress_writer: 810 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 811 progress_writer.finish() 812 813 # Copy the manifest and save the collection. 814 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 815 816 c['manifest_text'] = dst_manifest.getvalue() 817 return create_collection_from(c, src, dst, args)
819def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 820 """Copy the docker image identified by docker_image and 821 docker_image_tag from src to dst. Create appropriate 822 docker_image_repo+tag and docker_image_hash links at dst. 823 824 """ 825 826 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 827 828 # Find the link identifying this docker image. 829 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 830 src, args.retries, docker_image, docker_image_tag) 831 if docker_image_list: 832 image_uuid, image_info = docker_image_list[0] 833 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 834 835 # Copy the collection it refers to. 836 dst_image_col = copy_collection(image_uuid, src, dst, args) 837 elif arvados.util.keep_locator_pattern.match(docker_image): 838 dst_image_col = copy_collection(docker_image, src, dst, args) 839 else: 840 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.
842def copy_project(obj_uuid, src, dst, owner_uuid, args): 843 844 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 845 846 # Create/update the destination project 847 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 848 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 849 if len(existing["items"]) == 0: 850 project_record = dst.groups().create(body={"group": {"group_class": "project", 851 "owner_uuid": owner_uuid, 852 "name": src_project_record["name"]}}).execute(num_retries=args.retries) 853 else: 854 project_record = existing["items"][0] 855 856 dst.groups().update(uuid=project_record["uuid"], 857 body={"group": { 858 "description": src_project_record["description"]}}).execute(num_retries=args.retries) 859 860 args.project_uuid = project_record["uuid"] 861 862 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 863 864 865 partial_error = "" 866 867 # Copy collections 868 try: 869 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 870 src, dst, args) 871 except Exception as e: 872 partial_error += "\n" + str(e) 873 874 # Copy workflows 875 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 876 try: 877 copy_workflow(w["uuid"], src, dst, args) 878 except Exception as e: 879 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 880 881 if args.recursive: 882 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 883 try: 884 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 885 except Exception as e: 886 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 887 888 project_record["partial_error"] = partial_error 889 890 return project_record
918def uuid_type(api, object_uuid): 919 if re.match(arvados.util.keep_locator_pattern, object_uuid): 920 return 'Collection' 921 922 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 923 return 'httpURL' 924 925 p = object_uuid.split('-') 926 if len(p) == 3: 927 type_prefix = p[1] 928 for k in api._schema.schemas: 929 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 930 if type_prefix == obj_class: 931 return k 932 return None
935def copy_from_http(url, src, dst, args): 936 937 project_uuid = args.project_uuid 938 # Ensure string of varying parameters is well-formed 939 prefer_cached_downloads = args.prefer_cached_downloads 940 941 cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, 942 varying_url_params=args.varying_url_params, 943 prefer_cached_downloads=prefer_cached_downloads) 944 if cached[2] is not None: 945 return copy_collection(cached[2], src, dst, args) 946 947 cached = http_to_keep.http_to_keep(dst, project_uuid, url, 948 varying_url_params=args.varying_url_params, 949 prefer_cached_downloads=prefer_cached_downloads) 950 951 if cached is not None: 952 return {"uuid": cached[2]}
974def human_progress(obj_uuid, bytes_written, bytes_expected): 975 if bytes_expected: 976 return "\r{}: {}M / {}M {:.1%} ".format( 977 obj_uuid, 978 bytes_written >> 20, bytes_expected >> 20, 979 float(bytes_written) / bytes_expected) 980 else: 981 return "\r{}: {} ".format(obj_uuid, bytes_written)
983class ProgressWriter(object): 984 _progress_func = None 985 outfile = sys.stderr 986 987 def __init__(self, progress_func): 988 self._progress_func = progress_func 989 990 def report(self, obj_uuid, bytes_written, bytes_expected): 991 if self._progress_func is not None: 992 self.outfile.write( 993 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 994 995 def finish(self): 996 self.outfile.write("\n")