arvados.commands.arv_copy
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5# arv-copy [--recursive] [--no-recursive] object-uuid 6# 7# Copies an object from Arvados instance src to instance dst. 8# 9# By default, arv-copy recursively copies any dependent objects 10# necessary to make the object functional in the new instance 11# (e.g. for a workflow, arv-copy copies the workflow, 12# input collections, and docker images). If 13# --no-recursive is given, arv-copy copies only the single record 14# identified by object-uuid. 15# 16# The user must have configuration files {src}.conf and 17# {dst}.conf in a standard configuration directory with valid login credentials 18# for instances src and dst. If either of these files is not found, 19# arv-copy will issue an error. 20 21import argparse 22import contextlib 23import getpass 24import os 25import re 26import shutil 27import subprocess 28import sys 29import logging 30import tempfile 31import urllib.parse 32import io 33import json 34import queue 35import threading 36import errno 37 38import httplib2.error 39import googleapiclient 40 41import arvados 42import arvados.api 43import arvados.config 44import arvados.keep 45import arvados.util 46import arvados.commands._util as arv_cmd 47import arvados.commands.keepdocker 48from arvados.logging import log_handler 49 50from arvados._internal import basedirs, http_to_keep, s3_to_keep, to_keep_util 51from arvados._version import __version__ 52 53COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$') 54 55arvlogger = logging.getLogger('arvados') 56keeplogger = logging.getLogger('arvados.keep') 57logger = logging.getLogger('arvados.arv-copy') 58 59# Set this up so connection errors get logged. 60googleapi_logger = logging.getLogger('googleapiclient.http') 61 62# List of collections that have been copied in this session, and their 63# destination collection UUIDs. 64collections_copied = {} 65 66# Set of (repository, script_version) two-tuples of commits copied in git. 67scripts_copied = set() 68 69# The owner_uuid of the object being copied 70src_owner_uuid = None 71 72def main(): 73 copy_opts = argparse.ArgumentParser(add_help=False) 74 75 copy_opts.add_argument( 76 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 77 help='Print version and exit.') 78 copy_opts.add_argument( 79 '-v', '--verbose', dest='verbose', action='store_true', 80 help='Verbose output.') 81 copy_opts.add_argument( 82 '--progress', dest='progress', action='store_true', 83 help='Report progress on copying collections. (default)') 84 copy_opts.add_argument( 85 '--no-progress', dest='progress', action='store_false', 86 help='Do not report progress on copying collections.') 87 copy_opts.add_argument( 88 '-f', '--force', dest='force', action='store_true', 89 help='Perform copy even if the object appears to exist at the remote destination.') 90 copy_opts.add_argument( 91 '--src', dest='source_arvados', 92 help=""" 93Client configuration location for the source Arvados cluster. 94May be either a configuration file path, or a plain identifier like `foo` 95to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 96If not provided, will search for a configuration file named after the cluster ID of the source object UUID. 97""", 98 ) 99 copy_opts.add_argument( 100 '--dst', dest='destination_arvados', 101 help=""" 102Client configuration location for the destination Arvados cluster. 103May be either a configuration file path, or a plain identifier like `foo` 104to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 105If not provided, will use the default client configuration from the environment or `settings.conf`. 106""", 107 ) 108 copy_opts.add_argument( 109 '--recursive', dest='recursive', action='store_true', 110 help='Recursively copy any dependencies for this object, and subprojects. (default)') 111 copy_opts.add_argument( 112 '--no-recursive', dest='recursive', action='store_false', 113 help='Do not copy any dependencies or subprojects.') 114 copy_opts.add_argument( 115 '--project-uuid', dest='project_uuid', 116 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 117 copy_opts.add_argument( 118 '--replication', 119 type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), 120 metavar='N', 121 help=""" 122Number of replicas per storage class for the copied collections at the destination. 123If not provided (or if provided with invalid value), 124use the destination's default replication-level setting (if found), 125or the fallback value 2. 126""") 127 copy_opts.add_argument( 128 '--storage-classes', 129 type=arv_cmd.UniqueSplit(), 130 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 131 copy_opts.add_argument( 132 '--block-copy', 133 dest='keep_block_copy', 134 action='store_true', 135 help="""Copy Keep blocks when copying collections (default).""", 136 ) 137 copy_opts.add_argument( 138 '--no-block-copy', 139 dest='keep_block_copy', 140 action='store_false', 141 help="""Do not copy Keep blocks when copying collections. Must have 142administrator privileges on the destination cluster to create collections. 143""") 144 145 copy_opts.add_argument("--varying-url-params", type=str, default="", 146 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 147 148 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 149 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 150 copy_opts.add_argument( 151 'object_uuid', 152 help='The UUID of the object to be copied.') 153 154 copy_opts.set_defaults( 155 # export_all_fields is used by external tools to make complete copies 156 # of Arvados records. 157 export_all_fields=False, 158 keep_block_copy=True, 159 progress=True, 160 recursive=True, 161 ) 162 163 parser = argparse.ArgumentParser( 164 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 165 parents=[copy_opts, arv_cmd.retry_opt]) 166 args = parser.parse_args() 167 168 if args.verbose: 169 arvlogger.setLevel(logging.DEBUG) 170 else: 171 arvlogger.setLevel(logging.INFO) 172 keeplogger.setLevel(logging.WARNING) 173 174 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 175 args.source_arvados = args.object_uuid[:5] 176 177 if not args.destination_arvados and args.project_uuid: 178 args.destination_arvados = args.project_uuid[:5] 179 180 # Make sure errors trying to connect to clusters get logged. 181 googleapi_logger.setLevel(logging.WARN) 182 googleapi_logger.addHandler(log_handler) 183 184 # Create API clients for the source and destination instances 185 src_arv = api_for_instance(args.source_arvados, args.retries) 186 dst_arv = api_for_instance(args.destination_arvados, args.retries) 187 188 # Once we've successfully contacted the clusters, we probably 189 # don't want to see logging about retries (unless the user asked 190 # for verbose output). 191 if not args.verbose: 192 googleapi_logger.setLevel(logging.ERROR) 193 194 if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: 195 logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) 196 else: 197 logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) 198 logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) 199 200 if not args.project_uuid: 201 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 202 203 # Identify the kind of object we have been given, and begin copying. 204 t = uuid_type(src_arv, args.object_uuid) 205 206 try: 207 if t == 'Collection': 208 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 209 result = copy_collection(args.object_uuid, 210 src_arv, dst_arv, 211 args) 212 elif t == 'Workflow': 213 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 214 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 215 elif t == 'Group': 216 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 217 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 218 elif t == 'httpURL' or t == 's3URL': 219 result = copy_from_url(args.object_uuid, src_arv, dst_arv, args) 220 else: 221 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 222 except Exception as e: 223 logger.error("%s", e, exc_info=args.verbose) 224 exit(1) 225 226 if not result: 227 exit(1) 228 229 # If no exception was thrown and the response does not have an 230 # error_token field, presume success 231 if result is None or 'error_token' in result or 'uuid' not in result: 232 if result: 233 logger.error("API server returned an error result: {}".format(result)) 234 exit(1) 235 236 print(result['uuid']) 237 238 if result.get('partial_error'): 239 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 240 exit(1) 241 242 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 243 exit(0) 244 245def set_src_owner_uuid(resource, uuid, args): 246 global src_owner_uuid 247 c = resource.get(uuid=uuid).execute(num_retries=args.retries) 248 src_owner_uuid = c.get("owner_uuid") 249 250# api_for_instance(instance_name) 251# 252# Creates an API client for the Arvados instance identified by 253# instance_name. 254# 255# If instance_name contains a slash, it is presumed to be a path 256# (either local or absolute) to a file with Arvados configuration 257# settings. 258# 259# Otherwise, it is presumed to be the name of a file in a standard 260# configuration directory. 261# 262def api_for_instance(instance_name, num_retries): 263 msg = [] 264 if instance_name: 265 if '/' in instance_name: 266 config_file = instance_name 267 else: 268 dirs = basedirs.BaseDirectories('CONFIG') 269 config_file = next(dirs.search(f'{instance_name}.conf'), '') 270 271 arvados.api._reset_googleapiclient_logging() 272 try: 273 cfg = arvados.config.load(config_file) 274 275 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 276 api_is_insecure = ( 277 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 278 ['1', 't', 'true', 'y', 'yes'])) 279 return arvados.api('v1', 280 host=cfg['ARVADOS_API_HOST'], 281 token=cfg['ARVADOS_API_TOKEN'], 282 insecure=api_is_insecure, 283 num_retries=num_retries, 284 ) 285 else: 286 msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) 287 except OSError as e: 288 if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): 289 verb = 'connect to instance from' 290 elif config_file: 291 verb = 'open' 292 else: 293 verb = 'find' 294 searchlist = ":".join(str(p) for p in dirs.search_paths()) 295 config_file = f'{instance_name}.conf in path {searchlist}' 296 msg.append(("Could not {} config file {}: {}").format( 297 verb, config_file, e.strerror)) 298 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: 299 msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) 300 301 arvados.api._reset_googleapiclient_logging() 302 default_api = None 303 default_instance = None 304 try: 305 default_api = arvados.api('v1', num_retries=num_retries) 306 default_instance = default_api.config()["ClusterID"] 307 except ValueError: 308 pass 309 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: 310 msg.append("Failed to connect to default instance, error was {}".format(e)) 311 312 if default_api is not None and (not instance_name or instance_name == default_instance): 313 # Use default settings 314 return default_api 315 316 if instance_name and default_instance and instance_name != default_instance: 317 msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) 318 319 for m in msg: 320 logger.error(m) 321 322 abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN') 323 324# Check if git is available 325def check_git_availability(): 326 try: 327 subprocess.run( 328 ['git', '--version'], 329 check=True, 330 stdout=subprocess.DEVNULL, 331 ) 332 except FileNotFoundError: 333 abort('git command is not available. Please ensure git is installed.') 334 335 336def filter_iter(arg): 337 """Iterate a filter string-or-list. 338 339 Pass in a filter field that can either be a string or list. 340 This will iterate elements as if the field had been written as a list. 341 """ 342 if isinstance(arg, str): 343 yield arg 344 else: 345 yield from arg 346 347def migrate_repository_filter(repo_filter, src_repository, dst_repository): 348 """Update a single repository filter in-place for the destination. 349 350 If the filter checks that the repository is src_repository, it is 351 updated to check that the repository is dst_repository. If it does 352 anything else, this function raises ValueError. 353 """ 354 if src_repository is None: 355 raise ValueError("component does not specify a source repository") 356 elif dst_repository is None: 357 raise ValueError("no destination repository specified to update repository filter") 358 elif repo_filter[1:] == ['=', src_repository]: 359 repo_filter[2] = dst_repository 360 elif repo_filter[1:] == ['in', [src_repository]]: 361 repo_filter[2] = [dst_repository] 362 else: 363 raise ValueError("repository filter is not a simple source match") 364 365def migrate_script_version_filter(version_filter): 366 """Update a single script_version filter in-place for the destination. 367 368 Currently this function checks that all the filter operands are Git 369 commit hashes. If they're not, it raises ValueError to indicate that 370 the filter is not portable. It could be extended to make other 371 transformations in the future. 372 """ 373 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 374 raise ValueError("script_version filter is not limited to commit hashes") 375 376def attr_filtered(filter_, *attr_names): 377 """Return True if filter_ applies to any of attr_names, else False.""" 378 return any((name == 'any') or (name in attr_names) 379 for name in filter_iter(filter_[0])) 380 381@contextlib.contextmanager 382def exception_handler(handler, *exc_types): 383 """If any exc_types are raised in the block, call handler on the exception.""" 384 try: 385 yield 386 except exc_types as error: 387 handler(error) 388 389 390# copy_workflow(wf_uuid, src, dst, args) 391# 392# Copies a workflow identified by wf_uuid from src to dst. 393# 394# If args.recursive is True, also copy any collections 395# referenced in the workflow definition yaml. 396# 397# The owner_uuid of the new workflow is set to any given 398# project_uuid or the user who copied the template. 399# 400# Returns the copied workflow object. 401# 402def copy_workflow(wf_uuid, src, dst, args): 403 # fetch the workflow from the source instance 404 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 405 406 if not wf["definition"]: 407 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 408 409 # copy collections and docker images 410 if args.recursive and wf["definition"]: 411 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 412 "ARVADOS_API_TOKEN": src.api_token, 413 "PATH": os.environ["PATH"]} 414 try: 415 result = subprocess.run( 416 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 417 env=env, 418 stdout=subprocess.PIPE, 419 universal_newlines=True, 420 ) 421 except FileNotFoundError: 422 no_arv_copy = True 423 else: 424 no_arv_copy = result.returncode == 2 425 426 if no_arv_copy: 427 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 428 elif result.returncode != 0: 429 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 430 431 locations = json.loads(result.stdout) 432 433 if locations: 434 copy_collections(locations, src, dst, args) 435 436 # copy the workflow itself 437 del wf['uuid'] 438 wf['owner_uuid'] = args.project_uuid 439 440 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 441 ["name", "=", wf["name"]]]).execute() 442 if len(existing["items"]) == 0: 443 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 444 else: 445 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries) 446 447 448def workflow_collections(obj, locations, docker_images): 449 if isinstance(obj, dict): 450 loc = obj.get('location', None) 451 if loc is not None: 452 if loc.startswith("keep:"): 453 locations.append(loc[5:]) 454 455 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 456 if docker_image is not None: 457 ds = docker_image.split(":", 1) 458 tag = ds[1] if len(ds)==2 else 'latest' 459 docker_images[ds[0]] = tag 460 461 for x in obj: 462 workflow_collections(obj[x], locations, docker_images) 463 elif isinstance(obj, list): 464 for x in obj: 465 workflow_collections(x, locations, docker_images) 466 467# copy_collections(obj, src, dst, args) 468# 469# Recursively copies all collections referenced by 'obj' from src 470# to dst. obj may be a dict or a list, in which case we run 471# copy_collections on every value it contains. If it is a string, 472# search it for any substring that matches a collection hash or uuid 473# (this will find hidden references to collections like 474# "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)") 475# 476# Returns a copy of obj with any old collection uuids replaced by 477# the new ones. 478# 479def copy_collections(obj, src, dst, args): 480 481 def copy_collection_fn(collection_match): 482 """Helper function for regex substitution: copies a single collection, 483 identified by the collection_match MatchObject, to the 484 destination. Returns the destination collection uuid (or the 485 portable data hash if that's what src_id is). 486 487 """ 488 src_id = collection_match.group(0) 489 if src_id not in collections_copied: 490 dst_col = copy_collection(src_id, src, dst, args) 491 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 492 collections_copied[src_id] = src_id 493 else: 494 collections_copied[src_id] = dst_col['uuid'] 495 return collections_copied[src_id] 496 497 if isinstance(obj, str): 498 # Copy any collections identified in this string to dst, replacing 499 # them with the dst uuids as necessary. 500 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 501 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 502 return obj 503 elif isinstance(obj, dict): 504 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 505 for v in obj) 506 elif isinstance(obj, list): 507 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 508 return obj 509 510 511def total_collection_size(manifest_text): 512 """Return the total number of bytes in this collection (excluding 513 duplicate blocks).""" 514 515 total_bytes = 0 516 locators_seen = {} 517 for line in manifest_text.splitlines(): 518 words = line.split() 519 for word in words[1:]: 520 try: 521 loc = arvados.KeepLocator(word) 522 except ValueError: 523 continue # this word isn't a locator, skip it 524 if loc.md5sum not in locators_seen: 525 locators_seen[loc.md5sum] = True 526 total_bytes += loc.size 527 528 return total_bytes 529 530def create_collection_from(c, src, dst, args): 531 """Create a new collection record on dst, and copy Docker metadata if 532 available.""" 533 534 collection_uuid = c['uuid'] 535 if args.export_all_fields: 536 body = c.copy() 537 else: 538 body = {key: c[key] for key in [ 539 'description', 540 'manifest_text', 541 'name', 542 'portable_data_hash', 543 'properties', 544 ]} 545 if not body['name']: 546 body['name'] = f"copied from {collection_uuid}" 547 if args.storage_classes: 548 body['storage_classes_desired'] = args.storage_classes 549 body['owner_uuid'] = args.project_uuid 550 551 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 552 553 # Create docker_image_repo+tag and docker_image_hash links 554 # at the destination. 555 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 556 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 557 558 for src_link in docker_links: 559 body = {key: src_link[key] 560 for key in ['link_class', 'name', 'properties']} 561 body['head_uuid'] = dst_collection['uuid'] 562 body['owner_uuid'] = args.project_uuid 563 564 lk = dst.links().create(body=body).execute(num_retries=args.retries) 565 logger.debug('created dst link {}'.format(lk)) 566 567 return dst_collection 568 569# copy_collection(obj_uuid, src, dst, args) 570# 571# Copies the collection identified by obj_uuid from src to dst. 572# Returns the collection object created at dst. 573# 574# If args.progress is True, produce a human-friendly progress 575# report. 576# 577# If a collection with the desired portable_data_hash already 578# exists at dst, and args.force is False, copy_collection returns 579# the existing collection without copying any blocks. Otherwise 580# (if no collection exists or if args.force is True) 581# copy_collection copies all of the collection data blocks from src 582# to dst. 583# 584# For this application, it is critical to preserve the 585# collection's manifest hash, which is not guaranteed with the 586# arvados.CollectionReader and arvados.CollectionWriter classes. 587# Copying each block in the collection manually, followed by 588# the manifest block, ensures that the collection's manifest 589# hash will not change. 590# 591def copy_collection(obj_uuid, src, dst, args): 592 if arvados.util.keep_locator_pattern.match(obj_uuid): 593 # If the obj_uuid is a portable data hash, it might not be 594 # uniquely identified with a particular collection. As a 595 # result, it is ambiguous as to what name to use for the copy. 596 # Apply some heuristics to pick which collection to get the 597 # name from. 598 srccol = src.collections().list( 599 filters=[['portable_data_hash', '=', obj_uuid]], 600 order="created_at asc" 601 ).execute(num_retries=args.retries) 602 603 items = srccol.get("items") 604 605 if not items: 606 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 607 return 608 609 c = None 610 611 if len(items) == 1: 612 # There's only one collection with the PDH, so use that. 613 c = items[0] 614 if not c: 615 # See if there is a collection that's in the same project 616 # as the root item (usually a workflow) being copied. 617 for i in items: 618 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 619 c = i 620 break 621 if not c: 622 # Didn't find any collections located in the same project, so 623 # pick the oldest collection that has a name assigned to it. 624 for i in items: 625 if i.get("name"): 626 c = i 627 break 628 if not c: 629 # None of the collections have names (?!), so just pick the 630 # first one. 631 c = items[0] 632 633 # list() doesn't return manifest text (and we don't want it to, 634 # because we don't need the same maninfest text sent to us 50 635 # times) so go and retrieve the collection object directly 636 # which will include the manifest text. 637 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 638 else: 639 # Assume this is an actual collection uuid, so fetch it directly. 640 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 641 642 # If a collection with this hash already exists at the 643 # destination, and 'force' is not true, just return that 644 # collection. 645 if not args.force: 646 if 'portable_data_hash' in c: 647 colhash = c['portable_data_hash'] 648 else: 649 colhash = c['uuid'] 650 dstcol = dst.collections().list( 651 filters=[['portable_data_hash', '=', colhash]] 652 ).execute(num_retries=args.retries) 653 if dstcol['items_available'] > 0: 654 for d in dstcol['items']: 655 if ((args.project_uuid == d['owner_uuid']) and 656 (c.get('name') == d['name']) and 657 (c['portable_data_hash'] == d['portable_data_hash'])): 658 return d 659 c['manifest_text'] = dst.collections().get( 660 uuid=dstcol['items'][0]['uuid'] 661 ).execute(num_retries=args.retries)['manifest_text'] 662 return create_collection_from(c, src, dst, args) 663 664 if args.replication is None: 665 # Obtain default or fallback collection replication setting on the 666 # destination 667 try: 668 args.replication = int(dst.config()["Collections"]["DefaultReplication"]) 669 except (KeyError, TypeError, ValueError): 670 args.replication = 2 671 672 # Fetch the collection's manifest. 673 manifest = c['manifest_text'] 674 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 675 676 # Copy each block from src_keep to dst_keep. 677 # Use the newly signed locators returned from dst_keep to build 678 # a new manifest as we go. 679 src_keep = src.keep 680 dst_keep = dst.keep 681 dst_manifest = io.StringIO() 682 dst_locators = {} 683 bytes_written = 0 684 bytes_expected = total_collection_size(manifest) 685 if args.progress: 686 progress_writer = ProgressWriter(human_progress) 687 else: 688 progress_writer = None 689 690 # go through the words 691 # put each block loc into 'get' queue 692 # 'get' threads get block and put it into 'put' queue 693 # 'put' threads put block and then update dst_locators 694 # 695 # after going through the whole manifest we go back through it 696 # again and build dst_manifest 697 698 lock = threading.Lock() 699 700 # the get queue should be unbounded because we'll add all the 701 # block hashes we want to get, but these are small 702 get_queue = queue.Queue() 703 704 threadcount = 4 705 706 # the put queue contains full data blocks 707 # and if 'get' is faster than 'put' we could end up consuming 708 # a great deal of RAM if it isn't bounded. 709 put_queue = queue.Queue(threadcount) 710 transfer_error = [] 711 712 def get_thread(): 713 while True: 714 word = get_queue.get() 715 if word is None: 716 put_queue.put(None) 717 get_queue.task_done() 718 return 719 720 blockhash = arvados.KeepLocator(word).md5sum 721 with lock: 722 if blockhash in dst_locators: 723 # Already uploaded 724 get_queue.task_done() 725 continue 726 727 try: 728 logger.debug("Getting block %s", word) 729 data = src_keep.get(word) 730 put_queue.put((word, data)) 731 except Exception as e: 732 logger.error("Error getting block %s: %s", word, e) 733 transfer_error.append(e) 734 try: 735 # Drain the 'get' queue so we end early 736 while True: 737 get_queue.get(False) 738 get_queue.task_done() 739 except queue.Empty: 740 pass 741 finally: 742 get_queue.task_done() 743 744 def put_thread(): 745 nonlocal bytes_written 746 while True: 747 item = put_queue.get() 748 if item is None: 749 put_queue.task_done() 750 return 751 752 word, data = item 753 loc = arvados.KeepLocator(word) 754 blockhash = loc.md5sum 755 with lock: 756 if blockhash in dst_locators: 757 # Already uploaded 758 put_queue.task_done() 759 continue 760 761 try: 762 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 763 dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) 764 with lock: 765 dst_locators[blockhash] = dst_locator 766 bytes_written += loc.size 767 if progress_writer: 768 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 769 except Exception as e: 770 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 771 try: 772 # Drain the 'get' queue so we end early 773 while True: 774 get_queue.get(False) 775 get_queue.task_done() 776 except queue.Empty: 777 pass 778 transfer_error.append(e) 779 finally: 780 put_queue.task_done() 781 782 if args.keep_block_copy: 783 for line in manifest.splitlines(): 784 words = line.split() 785 for word in words[1:]: 786 try: 787 loc = arvados.KeepLocator(word) 788 except ValueError: 789 # If 'word' can't be parsed as a locator, 790 # presume it's a filename. 791 continue 792 793 get_queue.put(word) 794 795 for i in range(0, threadcount): 796 get_queue.put(None) 797 798 for i in range(0, threadcount): 799 threading.Thread(target=get_thread, daemon=True).start() 800 801 for i in range(0, threadcount): 802 threading.Thread(target=put_thread, daemon=True).start() 803 804 get_queue.join() 805 put_queue.join() 806 807 if len(transfer_error) > 0: 808 return {"error_token": "Failed to transfer blocks"} 809 810 for line in manifest.splitlines(): 811 words = iter(line.split()) 812 out_words = [next(words)] 813 for word in words: 814 try: 815 loc = arvados.KeepLocator(word) 816 except ValueError: 817 # If 'word' can't be parsed as a locator, 818 # presume it's a filename. 819 out_words.append(word) 820 else: 821 if args.keep_block_copy: 822 out_words.append(dst_locators[loc.md5sum]) 823 else: 824 out_words.append(loc.stripped()) 825 dst_manifest.write(' '.join(out_words)) 826 dst_manifest.write("\n") 827 828 if progress_writer: 829 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 830 progress_writer.finish() 831 832 # Copy the manifest and save the collection. 833 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 834 835 c['manifest_text'] = dst_manifest.getvalue() 836 return create_collection_from(c, src, dst, args) 837 838def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 839 """Copy the docker image identified by docker_image and 840 docker_image_tag from src to dst. Create appropriate 841 docker_image_repo+tag and docker_image_hash links at dst. 842 843 """ 844 845 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 846 847 # Find the link identifying this docker image. 848 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 849 src, args.retries, docker_image, docker_image_tag) 850 if docker_image_list: 851 image_uuid, image_info = docker_image_list[0] 852 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 853 854 # Copy the collection it refers to. 855 dst_image_col = copy_collection(image_uuid, src, dst, args) 856 elif arvados.util.keep_locator_pattern.match(docker_image): 857 dst_image_col = copy_collection(docker_image, src, dst, args) 858 else: 859 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag)) 860 861def copy_project(obj_uuid, src, dst, owner_uuid, args): 862 863 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 864 865 # Create/update the destination project 866 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 867 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 868 try: 869 existing_uuid = existing['items'][0]['uuid'] 870 except IndexError: 871 body = src_project_record if args.export_all_fields else {'group': { 872 'description': src_project_record['description'], 873 'group_class': 'project', 874 'name': src_project_record['name'], 875 'owner_uuid': owner_uuid, 876 }} 877 project_req = dst.groups().create(body=body) 878 else: 879 project_req = dst.groups().update( 880 uuid=existing_uuid, 881 body={'group': { 882 'description': src_project_record['description'], 883 }}, 884 ) 885 886 project_record = project_req.execute(num_retries=args.retries) 887 args.project_uuid = project_record["uuid"] 888 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 889 890 partial_error = "" 891 # Copy collections 892 try: 893 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 894 src, dst, args) 895 except Exception as e: 896 partial_error += "\n" + str(e) 897 898 # Copy workflows 899 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 900 try: 901 copy_workflow(w["uuid"], src, dst, args) 902 except Exception as e: 903 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 904 905 if args.recursive: 906 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 907 try: 908 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 909 except Exception as e: 910 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 911 912 project_record["partial_error"] = partial_error 913 914 return project_record 915 916# git_rev_parse(rev, repo) 917# 918# Returns the 40-character commit hash corresponding to 'rev' in 919# git repository 'repo' (which must be the path of a local git 920# repository) 921# 922def git_rev_parse(rev, repo): 923 proc = subprocess.run( 924 ['git', 'rev-parse', rev], 925 check=True, 926 cwd=repo, 927 stdout=subprocess.PIPE, 928 text=True, 929 ) 930 return proc.stdout.read().strip() 931 932# uuid_type(api, object_uuid) 933# 934# Returns the name of the class that object_uuid belongs to, based on 935# the second field of the uuid. This function consults the api's 936# schema to identify the object class. 937# 938# It returns a string such as 'Collection', 'Workflow', etc. 939# 940# Special case: if handed a Keep locator hash, return 'Collection'. 941# 942def uuid_type(api, object_uuid): 943 if re.match(arvados.util.keep_locator_pattern, object_uuid): 944 return 'Collection' 945 946 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 947 return 'httpURL' 948 949 if object_uuid.startswith("s3:"): 950 return 's3URL' 951 952 p = object_uuid.split('-') 953 if len(p) == 3: 954 type_prefix = p[1] 955 for k in api._schema.schemas: 956 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 957 if type_prefix == obj_class: 958 return k 959 return None 960 961 962def copy_from_url(url, src, dst, args): 963 964 project_uuid = args.project_uuid 965 # Ensure string of varying parameters is well-formed 966 prefer_cached_downloads = args.prefer_cached_downloads 967 968 cached = to_keep_util.CheckCacheResult(None, None, None, None, None) 969 970 if url.startswith("http:") or url.startswith("https:"): 971 cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, 972 varying_url_params=args.varying_url_params, 973 prefer_cached_downloads=prefer_cached_downloads) 974 elif url.startswith("s3:"): 975 import boto3.session 976 botosession = boto3.session.Session() 977 cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {}, 978 prefer_cached_downloads=prefer_cached_downloads) 979 980 if cached[2] is not None: 981 return copy_collection(cached[2], src, dst, args) 982 983 if url.startswith("http:") or url.startswith("https:"): 984 cached = http_to_keep.http_to_keep(dst, project_uuid, url, 985 varying_url_params=args.varying_url_params, 986 prefer_cached_downloads=prefer_cached_downloads) 987 elif url.startswith("s3:"): 988 cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url, 989 prefer_cached_downloads=prefer_cached_downloads) 990 991 if cached is not None: 992 return {"uuid": cached[2]} 993 994 995def abort(msg, code=1): 996 logger.info("arv-copy: %s", msg) 997 exit(code) 998 999 1000# Code for reporting on the progress of a collection upload. 1001# Stolen from arvados.commands.put.ArvPutCollectionWriter 1002# TODO(twp): figure out how to refactor into a shared library 1003# (may involve refactoring some arvados.commands.arv_copy.copy_collection 1004# code) 1005 1006def machine_progress(obj_uuid, bytes_written, bytes_expected): 1007 return "{} {}: {} {} written {} total\n".format( 1008 sys.argv[0], 1009 os.getpid(), 1010 obj_uuid, 1011 bytes_written, 1012 -1 if (bytes_expected is None) else bytes_expected) 1013 1014def human_progress(obj_uuid, bytes_written, bytes_expected): 1015 if bytes_expected: 1016 return "\r{}: {}M / {}M {:.1%} ".format( 1017 obj_uuid, 1018 bytes_written >> 20, bytes_expected >> 20, 1019 float(bytes_written) / bytes_expected) 1020 else: 1021 return "\r{}: {} ".format(obj_uuid, bytes_written) 1022 1023class ProgressWriter(object): 1024 _progress_func = None 1025 outfile = sys.stderr 1026 1027 def __init__(self, progress_func): 1028 self._progress_func = progress_func 1029 1030 def report(self, obj_uuid, bytes_written, bytes_expected): 1031 if self._progress_func is not None: 1032 self.outfile.write( 1033 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 1034 1035 def finish(self): 1036 self.outfile.write("\n") 1037 1038if __name__ == '__main__': 1039 main()
73def main(): 74 copy_opts = argparse.ArgumentParser(add_help=False) 75 76 copy_opts.add_argument( 77 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 78 help='Print version and exit.') 79 copy_opts.add_argument( 80 '-v', '--verbose', dest='verbose', action='store_true', 81 help='Verbose output.') 82 copy_opts.add_argument( 83 '--progress', dest='progress', action='store_true', 84 help='Report progress on copying collections. (default)') 85 copy_opts.add_argument( 86 '--no-progress', dest='progress', action='store_false', 87 help='Do not report progress on copying collections.') 88 copy_opts.add_argument( 89 '-f', '--force', dest='force', action='store_true', 90 help='Perform copy even if the object appears to exist at the remote destination.') 91 copy_opts.add_argument( 92 '--src', dest='source_arvados', 93 help=""" 94Client configuration location for the source Arvados cluster. 95May be either a configuration file path, or a plain identifier like `foo` 96to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 97If not provided, will search for a configuration file named after the cluster ID of the source object UUID. 98""", 99 ) 100 copy_opts.add_argument( 101 '--dst', dest='destination_arvados', 102 help=""" 103Client configuration location for the destination Arvados cluster. 104May be either a configuration file path, or a plain identifier like `foo` 105to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 106If not provided, will use the default client configuration from the environment or `settings.conf`. 107""", 108 ) 109 copy_opts.add_argument( 110 '--recursive', dest='recursive', action='store_true', 111 help='Recursively copy any dependencies for this object, and subprojects. (default)') 112 copy_opts.add_argument( 113 '--no-recursive', dest='recursive', action='store_false', 114 help='Do not copy any dependencies or subprojects.') 115 copy_opts.add_argument( 116 '--project-uuid', dest='project_uuid', 117 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 118 copy_opts.add_argument( 119 '--replication', 120 type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), 121 metavar='N', 122 help=""" 123Number of replicas per storage class for the copied collections at the destination. 124If not provided (or if provided with invalid value), 125use the destination's default replication-level setting (if found), 126or the fallback value 2. 127""") 128 copy_opts.add_argument( 129 '--storage-classes', 130 type=arv_cmd.UniqueSplit(), 131 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 132 copy_opts.add_argument( 133 '--block-copy', 134 dest='keep_block_copy', 135 action='store_true', 136 help="""Copy Keep blocks when copying collections (default).""", 137 ) 138 copy_opts.add_argument( 139 '--no-block-copy', 140 dest='keep_block_copy', 141 action='store_false', 142 help="""Do not copy Keep blocks when copying collections. Must have 143administrator privileges on the destination cluster to create collections. 144""") 145 146 copy_opts.add_argument("--varying-url-params", type=str, default="", 147 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 148 149 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 150 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 151 copy_opts.add_argument( 152 'object_uuid', 153 help='The UUID of the object to be copied.') 154 155 copy_opts.set_defaults( 156 # export_all_fields is used by external tools to make complete copies 157 # of Arvados records. 158 export_all_fields=False, 159 keep_block_copy=True, 160 progress=True, 161 recursive=True, 162 ) 163 164 parser = argparse.ArgumentParser( 165 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 166 parents=[copy_opts, arv_cmd.retry_opt]) 167 args = parser.parse_args() 168 169 if args.verbose: 170 arvlogger.setLevel(logging.DEBUG) 171 else: 172 arvlogger.setLevel(logging.INFO) 173 keeplogger.setLevel(logging.WARNING) 174 175 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 176 args.source_arvados = args.object_uuid[:5] 177 178 if not args.destination_arvados and args.project_uuid: 179 args.destination_arvados = args.project_uuid[:5] 180 181 # Make sure errors trying to connect to clusters get logged. 182 googleapi_logger.setLevel(logging.WARN) 183 googleapi_logger.addHandler(log_handler) 184 185 # Create API clients for the source and destination instances 186 src_arv = api_for_instance(args.source_arvados, args.retries) 187 dst_arv = api_for_instance(args.destination_arvados, args.retries) 188 189 # Once we've successfully contacted the clusters, we probably 190 # don't want to see logging about retries (unless the user asked 191 # for verbose output). 192 if not args.verbose: 193 googleapi_logger.setLevel(logging.ERROR) 194 195 if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: 196 logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) 197 else: 198 logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) 199 logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) 200 201 if not args.project_uuid: 202 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 203 204 # Identify the kind of object we have been given, and begin copying. 205 t = uuid_type(src_arv, args.object_uuid) 206 207 try: 208 if t == 'Collection': 209 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 210 result = copy_collection(args.object_uuid, 211 src_arv, dst_arv, 212 args) 213 elif t == 'Workflow': 214 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 215 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 216 elif t == 'Group': 217 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 218 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 219 elif t == 'httpURL' or t == 's3URL': 220 result = copy_from_url(args.object_uuid, src_arv, dst_arv, args) 221 else: 222 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 223 except Exception as e: 224 logger.error("%s", e, exc_info=args.verbose) 225 exit(1) 226 227 if not result: 228 exit(1) 229 230 # If no exception was thrown and the response does not have an 231 # error_token field, presume success 232 if result is None or 'error_token' in result or 'uuid' not in result: 233 if result: 234 logger.error("API server returned an error result: {}".format(result)) 235 exit(1) 236 237 print(result['uuid']) 238 239 if result.get('partial_error'): 240 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 241 exit(1) 242 243 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 244 exit(0)
263def api_for_instance(instance_name, num_retries): 264 msg = [] 265 if instance_name: 266 if '/' in instance_name: 267 config_file = instance_name 268 else: 269 dirs = basedirs.BaseDirectories('CONFIG') 270 config_file = next(dirs.search(f'{instance_name}.conf'), '') 271 272 arvados.api._reset_googleapiclient_logging() 273 try: 274 cfg = arvados.config.load(config_file) 275 276 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 277 api_is_insecure = ( 278 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 279 ['1', 't', 'true', 'y', 'yes'])) 280 return arvados.api('v1', 281 host=cfg['ARVADOS_API_HOST'], 282 token=cfg['ARVADOS_API_TOKEN'], 283 insecure=api_is_insecure, 284 num_retries=num_retries, 285 ) 286 else: 287 msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) 288 except OSError as e: 289 if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): 290 verb = 'connect to instance from' 291 elif config_file: 292 verb = 'open' 293 else: 294 verb = 'find' 295 searchlist = ":".join(str(p) for p in dirs.search_paths()) 296 config_file = f'{instance_name}.conf in path {searchlist}' 297 msg.append(("Could not {} config file {}: {}").format( 298 verb, config_file, e.strerror)) 299 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: 300 msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) 301 302 arvados.api._reset_googleapiclient_logging() 303 default_api = None 304 default_instance = None 305 try: 306 default_api = arvados.api('v1', num_retries=num_retries) 307 default_instance = default_api.config()["ClusterID"] 308 except ValueError: 309 pass 310 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: 311 msg.append("Failed to connect to default instance, error was {}".format(e)) 312 313 if default_api is not None and (not instance_name or instance_name == default_instance): 314 # Use default settings 315 return default_api 316 317 if instance_name and default_instance and instance_name != default_instance: 318 msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) 319 320 for m in msg: 321 logger.error(m) 322 323 abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
337def filter_iter(arg): 338 """Iterate a filter string-or-list. 339 340 Pass in a filter field that can either be a string or list. 341 This will iterate elements as if the field had been written as a list. 342 """ 343 if isinstance(arg, str): 344 yield arg 345 else: 346 yield from arg
Iterate a filter string-or-list.
Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.
348def migrate_repository_filter(repo_filter, src_repository, dst_repository): 349 """Update a single repository filter in-place for the destination. 350 351 If the filter checks that the repository is src_repository, it is 352 updated to check that the repository is dst_repository. If it does 353 anything else, this function raises ValueError. 354 """ 355 if src_repository is None: 356 raise ValueError("component does not specify a source repository") 357 elif dst_repository is None: 358 raise ValueError("no destination repository specified to update repository filter") 359 elif repo_filter[1:] == ['=', src_repository]: 360 repo_filter[2] = dst_repository 361 elif repo_filter[1:] == ['in', [src_repository]]: 362 repo_filter[2] = [dst_repository] 363 else: 364 raise ValueError("repository filter is not a simple source match")
Update a single repository filter in-place for the destination.
If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.
366def migrate_script_version_filter(version_filter): 367 """Update a single script_version filter in-place for the destination. 368 369 Currently this function checks that all the filter operands are Git 370 commit hashes. If they're not, it raises ValueError to indicate that 371 the filter is not portable. It could be extended to make other 372 transformations in the future. 373 """ 374 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 375 raise ValueError("script_version filter is not limited to commit hashes")
Update a single script_version filter in-place for the destination.
Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.
377def attr_filtered(filter_, *attr_names): 378 """Return True if filter_ applies to any of attr_names, else False.""" 379 return any((name == 'any') or (name in attr_names) 380 for name in filter_iter(filter_[0]))
Return True if filter_ applies to any of attr_names, else False.
382@contextlib.contextmanager 383def exception_handler(handler, *exc_types): 384 """If any exc_types are raised in the block, call handler on the exception.""" 385 try: 386 yield 387 except exc_types as error: 388 handler(error)
If any exc_types are raised in the block, call handler on the exception.
403def copy_workflow(wf_uuid, src, dst, args): 404 # fetch the workflow from the source instance 405 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 406 407 if not wf["definition"]: 408 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 409 410 # copy collections and docker images 411 if args.recursive and wf["definition"]: 412 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 413 "ARVADOS_API_TOKEN": src.api_token, 414 "PATH": os.environ["PATH"]} 415 try: 416 result = subprocess.run( 417 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 418 env=env, 419 stdout=subprocess.PIPE, 420 universal_newlines=True, 421 ) 422 except FileNotFoundError: 423 no_arv_copy = True 424 else: 425 no_arv_copy = result.returncode == 2 426 427 if no_arv_copy: 428 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 429 elif result.returncode != 0: 430 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 431 432 locations = json.loads(result.stdout) 433 434 if locations: 435 copy_collections(locations, src, dst, args) 436 437 # copy the workflow itself 438 del wf['uuid'] 439 wf['owner_uuid'] = args.project_uuid 440 441 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 442 ["name", "=", wf["name"]]]).execute() 443 if len(existing["items"]) == 0: 444 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 445 else: 446 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
449def workflow_collections(obj, locations, docker_images): 450 if isinstance(obj, dict): 451 loc = obj.get('location', None) 452 if loc is not None: 453 if loc.startswith("keep:"): 454 locations.append(loc[5:]) 455 456 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 457 if docker_image is not None: 458 ds = docker_image.split(":", 1) 459 tag = ds[1] if len(ds)==2 else 'latest' 460 docker_images[ds[0]] = tag 461 462 for x in obj: 463 workflow_collections(obj[x], locations, docker_images) 464 elif isinstance(obj, list): 465 for x in obj: 466 workflow_collections(x, locations, docker_images)
480def copy_collections(obj, src, dst, args): 481 482 def copy_collection_fn(collection_match): 483 """Helper function for regex substitution: copies a single collection, 484 identified by the collection_match MatchObject, to the 485 destination. Returns the destination collection uuid (or the 486 portable data hash if that's what src_id is). 487 488 """ 489 src_id = collection_match.group(0) 490 if src_id not in collections_copied: 491 dst_col = copy_collection(src_id, src, dst, args) 492 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 493 collections_copied[src_id] = src_id 494 else: 495 collections_copied[src_id] = dst_col['uuid'] 496 return collections_copied[src_id] 497 498 if isinstance(obj, str): 499 # Copy any collections identified in this string to dst, replacing 500 # them with the dst uuids as necessary. 501 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 502 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 503 return obj 504 elif isinstance(obj, dict): 505 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 506 for v in obj) 507 elif isinstance(obj, list): 508 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 509 return obj
512def total_collection_size(manifest_text): 513 """Return the total number of bytes in this collection (excluding 514 duplicate blocks).""" 515 516 total_bytes = 0 517 locators_seen = {} 518 for line in manifest_text.splitlines(): 519 words = line.split() 520 for word in words[1:]: 521 try: 522 loc = arvados.KeepLocator(word) 523 except ValueError: 524 continue # this word isn't a locator, skip it 525 if loc.md5sum not in locators_seen: 526 locators_seen[loc.md5sum] = True 527 total_bytes += loc.size 528 529 return total_bytes
Return the total number of bytes in this collection (excluding duplicate blocks).
531def create_collection_from(c, src, dst, args): 532 """Create a new collection record on dst, and copy Docker metadata if 533 available.""" 534 535 collection_uuid = c['uuid'] 536 if args.export_all_fields: 537 body = c.copy() 538 else: 539 body = {key: c[key] for key in [ 540 'description', 541 'manifest_text', 542 'name', 543 'portable_data_hash', 544 'properties', 545 ]} 546 if not body['name']: 547 body['name'] = f"copied from {collection_uuid}" 548 if args.storage_classes: 549 body['storage_classes_desired'] = args.storage_classes 550 body['owner_uuid'] = args.project_uuid 551 552 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 553 554 # Create docker_image_repo+tag and docker_image_hash links 555 # at the destination. 556 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 557 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 558 559 for src_link in docker_links: 560 body = {key: src_link[key] 561 for key in ['link_class', 'name', 'properties']} 562 body['head_uuid'] = dst_collection['uuid'] 563 body['owner_uuid'] = args.project_uuid 564 565 lk = dst.links().create(body=body).execute(num_retries=args.retries) 566 logger.debug('created dst link {}'.format(lk)) 567 568 return dst_collection
Create a new collection record on dst, and copy Docker metadata if available.
592def copy_collection(obj_uuid, src, dst, args): 593 if arvados.util.keep_locator_pattern.match(obj_uuid): 594 # If the obj_uuid is a portable data hash, it might not be 595 # uniquely identified with a particular collection. As a 596 # result, it is ambiguous as to what name to use for the copy. 597 # Apply some heuristics to pick which collection to get the 598 # name from. 599 srccol = src.collections().list( 600 filters=[['portable_data_hash', '=', obj_uuid]], 601 order="created_at asc" 602 ).execute(num_retries=args.retries) 603 604 items = srccol.get("items") 605 606 if not items: 607 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 608 return 609 610 c = None 611 612 if len(items) == 1: 613 # There's only one collection with the PDH, so use that. 614 c = items[0] 615 if not c: 616 # See if there is a collection that's in the same project 617 # as the root item (usually a workflow) being copied. 618 for i in items: 619 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 620 c = i 621 break 622 if not c: 623 # Didn't find any collections located in the same project, so 624 # pick the oldest collection that has a name assigned to it. 625 for i in items: 626 if i.get("name"): 627 c = i 628 break 629 if not c: 630 # None of the collections have names (?!), so just pick the 631 # first one. 632 c = items[0] 633 634 # list() doesn't return manifest text (and we don't want it to, 635 # because we don't need the same maninfest text sent to us 50 636 # times) so go and retrieve the collection object directly 637 # which will include the manifest text. 638 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 639 else: 640 # Assume this is an actual collection uuid, so fetch it directly. 641 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 642 643 # If a collection with this hash already exists at the 644 # destination, and 'force' is not true, just return that 645 # collection. 646 if not args.force: 647 if 'portable_data_hash' in c: 648 colhash = c['portable_data_hash'] 649 else: 650 colhash = c['uuid'] 651 dstcol = dst.collections().list( 652 filters=[['portable_data_hash', '=', colhash]] 653 ).execute(num_retries=args.retries) 654 if dstcol['items_available'] > 0: 655 for d in dstcol['items']: 656 if ((args.project_uuid == d['owner_uuid']) and 657 (c.get('name') == d['name']) and 658 (c['portable_data_hash'] == d['portable_data_hash'])): 659 return d 660 c['manifest_text'] = dst.collections().get( 661 uuid=dstcol['items'][0]['uuid'] 662 ).execute(num_retries=args.retries)['manifest_text'] 663 return create_collection_from(c, src, dst, args) 664 665 if args.replication is None: 666 # Obtain default or fallback collection replication setting on the 667 # destination 668 try: 669 args.replication = int(dst.config()["Collections"]["DefaultReplication"]) 670 except (KeyError, TypeError, ValueError): 671 args.replication = 2 672 673 # Fetch the collection's manifest. 674 manifest = c['manifest_text'] 675 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 676 677 # Copy each block from src_keep to dst_keep. 678 # Use the newly signed locators returned from dst_keep to build 679 # a new manifest as we go. 680 src_keep = src.keep 681 dst_keep = dst.keep 682 dst_manifest = io.StringIO() 683 dst_locators = {} 684 bytes_written = 0 685 bytes_expected = total_collection_size(manifest) 686 if args.progress: 687 progress_writer = ProgressWriter(human_progress) 688 else: 689 progress_writer = None 690 691 # go through the words 692 # put each block loc into 'get' queue 693 # 'get' threads get block and put it into 'put' queue 694 # 'put' threads put block and then update dst_locators 695 # 696 # after going through the whole manifest we go back through it 697 # again and build dst_manifest 698 699 lock = threading.Lock() 700 701 # the get queue should be unbounded because we'll add all the 702 # block hashes we want to get, but these are small 703 get_queue = queue.Queue() 704 705 threadcount = 4 706 707 # the put queue contains full data blocks 708 # and if 'get' is faster than 'put' we could end up consuming 709 # a great deal of RAM if it isn't bounded. 710 put_queue = queue.Queue(threadcount) 711 transfer_error = [] 712 713 def get_thread(): 714 while True: 715 word = get_queue.get() 716 if word is None: 717 put_queue.put(None) 718 get_queue.task_done() 719 return 720 721 blockhash = arvados.KeepLocator(word).md5sum 722 with lock: 723 if blockhash in dst_locators: 724 # Already uploaded 725 get_queue.task_done() 726 continue 727 728 try: 729 logger.debug("Getting block %s", word) 730 data = src_keep.get(word) 731 put_queue.put((word, data)) 732 except Exception as e: 733 logger.error("Error getting block %s: %s", word, e) 734 transfer_error.append(e) 735 try: 736 # Drain the 'get' queue so we end early 737 while True: 738 get_queue.get(False) 739 get_queue.task_done() 740 except queue.Empty: 741 pass 742 finally: 743 get_queue.task_done() 744 745 def put_thread(): 746 nonlocal bytes_written 747 while True: 748 item = put_queue.get() 749 if item is None: 750 put_queue.task_done() 751 return 752 753 word, data = item 754 loc = arvados.KeepLocator(word) 755 blockhash = loc.md5sum 756 with lock: 757 if blockhash in dst_locators: 758 # Already uploaded 759 put_queue.task_done() 760 continue 761 762 try: 763 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 764 dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) 765 with lock: 766 dst_locators[blockhash] = dst_locator 767 bytes_written += loc.size 768 if progress_writer: 769 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 770 except Exception as e: 771 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 772 try: 773 # Drain the 'get' queue so we end early 774 while True: 775 get_queue.get(False) 776 get_queue.task_done() 777 except queue.Empty: 778 pass 779 transfer_error.append(e) 780 finally: 781 put_queue.task_done() 782 783 if args.keep_block_copy: 784 for line in manifest.splitlines(): 785 words = line.split() 786 for word in words[1:]: 787 try: 788 loc = arvados.KeepLocator(word) 789 except ValueError: 790 # If 'word' can't be parsed as a locator, 791 # presume it's a filename. 792 continue 793 794 get_queue.put(word) 795 796 for i in range(0, threadcount): 797 get_queue.put(None) 798 799 for i in range(0, threadcount): 800 threading.Thread(target=get_thread, daemon=True).start() 801 802 for i in range(0, threadcount): 803 threading.Thread(target=put_thread, daemon=True).start() 804 805 get_queue.join() 806 put_queue.join() 807 808 if len(transfer_error) > 0: 809 return {"error_token": "Failed to transfer blocks"} 810 811 for line in manifest.splitlines(): 812 words = iter(line.split()) 813 out_words = [next(words)] 814 for word in words: 815 try: 816 loc = arvados.KeepLocator(word) 817 except ValueError: 818 # If 'word' can't be parsed as a locator, 819 # presume it's a filename. 820 out_words.append(word) 821 else: 822 if args.keep_block_copy: 823 out_words.append(dst_locators[loc.md5sum]) 824 else: 825 out_words.append(loc.stripped()) 826 dst_manifest.write(' '.join(out_words)) 827 dst_manifest.write("\n") 828 829 if progress_writer: 830 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 831 progress_writer.finish() 832 833 # Copy the manifest and save the collection. 834 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 835 836 c['manifest_text'] = dst_manifest.getvalue() 837 return create_collection_from(c, src, dst, args)
839def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 840 """Copy the docker image identified by docker_image and 841 docker_image_tag from src to dst. Create appropriate 842 docker_image_repo+tag and docker_image_hash links at dst. 843 844 """ 845 846 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 847 848 # Find the link identifying this docker image. 849 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 850 src, args.retries, docker_image, docker_image_tag) 851 if docker_image_list: 852 image_uuid, image_info = docker_image_list[0] 853 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 854 855 # Copy the collection it refers to. 856 dst_image_col = copy_collection(image_uuid, src, dst, args) 857 elif arvados.util.keep_locator_pattern.match(docker_image): 858 dst_image_col = copy_collection(docker_image, src, dst, args) 859 else: 860 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.
862def copy_project(obj_uuid, src, dst, owner_uuid, args): 863 864 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 865 866 # Create/update the destination project 867 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 868 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 869 try: 870 existing_uuid = existing['items'][0]['uuid'] 871 except IndexError: 872 body = src_project_record if args.export_all_fields else {'group': { 873 'description': src_project_record['description'], 874 'group_class': 'project', 875 'name': src_project_record['name'], 876 'owner_uuid': owner_uuid, 877 }} 878 project_req = dst.groups().create(body=body) 879 else: 880 project_req = dst.groups().update( 881 uuid=existing_uuid, 882 body={'group': { 883 'description': src_project_record['description'], 884 }}, 885 ) 886 887 project_record = project_req.execute(num_retries=args.retries) 888 args.project_uuid = project_record["uuid"] 889 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 890 891 partial_error = "" 892 # Copy collections 893 try: 894 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 895 src, dst, args) 896 except Exception as e: 897 partial_error += "\n" + str(e) 898 899 # Copy workflows 900 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 901 try: 902 copy_workflow(w["uuid"], src, dst, args) 903 except Exception as e: 904 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 905 906 if args.recursive: 907 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 908 try: 909 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 910 except Exception as e: 911 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 912 913 project_record["partial_error"] = partial_error 914 915 return project_record
943def uuid_type(api, object_uuid): 944 if re.match(arvados.util.keep_locator_pattern, object_uuid): 945 return 'Collection' 946 947 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 948 return 'httpURL' 949 950 if object_uuid.startswith("s3:"): 951 return 's3URL' 952 953 p = object_uuid.split('-') 954 if len(p) == 3: 955 type_prefix = p[1] 956 for k in api._schema.schemas: 957 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 958 if type_prefix == obj_class: 959 return k 960 return None
963def copy_from_url(url, src, dst, args): 964 965 project_uuid = args.project_uuid 966 # Ensure string of varying parameters is well-formed 967 prefer_cached_downloads = args.prefer_cached_downloads 968 969 cached = to_keep_util.CheckCacheResult(None, None, None, None, None) 970 971 if url.startswith("http:") or url.startswith("https:"): 972 cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, 973 varying_url_params=args.varying_url_params, 974 prefer_cached_downloads=prefer_cached_downloads) 975 elif url.startswith("s3:"): 976 import boto3.session 977 botosession = boto3.session.Session() 978 cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {}, 979 prefer_cached_downloads=prefer_cached_downloads) 980 981 if cached[2] is not None: 982 return copy_collection(cached[2], src, dst, args) 983 984 if url.startswith("http:") or url.startswith("https:"): 985 cached = http_to_keep.http_to_keep(dst, project_uuid, url, 986 varying_url_params=args.varying_url_params, 987 prefer_cached_downloads=prefer_cached_downloads) 988 elif url.startswith("s3:"): 989 cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url, 990 prefer_cached_downloads=prefer_cached_downloads) 991 992 if cached is not None: 993 return {"uuid": cached[2]}
1015def human_progress(obj_uuid, bytes_written, bytes_expected): 1016 if bytes_expected: 1017 return "\r{}: {}M / {}M {:.1%} ".format( 1018 obj_uuid, 1019 bytes_written >> 20, bytes_expected >> 20, 1020 float(bytes_written) / bytes_expected) 1021 else: 1022 return "\r{}: {} ".format(obj_uuid, bytes_written)
1024class ProgressWriter(object): 1025 _progress_func = None 1026 outfile = sys.stderr 1027 1028 def __init__(self, progress_func): 1029 self._progress_func = progress_func 1030 1031 def report(self, obj_uuid, bytes_written, bytes_expected): 1032 if self._progress_func is not None: 1033 self.outfile.write( 1034 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 1035 1036 def finish(self): 1037 self.outfile.write("\n")