arvados.commands.arv_copy
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5# arv-copy [--recursive] [--no-recursive] object-uuid 6# 7# Copies an object from Arvados instance src to instance dst. 8# 9# By default, arv-copy recursively copies any dependent objects 10# necessary to make the object functional in the new instance 11# (e.g. for a workflow, arv-copy copies the workflow, 12# input collections, and docker images). If 13# --no-recursive is given, arv-copy copies only the single record 14# identified by object-uuid. 15# 16# The user must have configuration files {src}.conf and 17# {dst}.conf in a standard configuration directory with valid login credentials 18# for instances src and dst. If either of these files is not found, 19# arv-copy will issue an error. 20 21import argparse 22import contextlib 23import getpass 24import os 25import re 26import shutil 27import subprocess 28import sys 29import logging 30import tempfile 31import urllib.parse 32import io 33import json 34import queue 35import threading 36import errno 37 38import httplib2.error 39import googleapiclient 40 41import arvados 42import arvados.config 43import arvados.keep 44import arvados.util 45import arvados.commands._util as arv_cmd 46import arvados.commands.keepdocker 47from arvados.logging import log_handler 48 49from arvados._internal import basedirs, http_to_keep, s3_to_keep, to_keep_util 50from arvados._version import __version__ 51 52COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$') 53 54arvlogger = logging.getLogger('arvados') 55keeplogger = logging.getLogger('arvados.keep') 56logger = logging.getLogger('arvados.arv-copy') 57 58# Set this up so connection errors get logged. 59googleapi_logger = logging.getLogger('googleapiclient.http') 60 61# local_repo_dir records which git repositories from the Arvados source 62# instance have been checked out locally during this run, and to which 63# directories. 64# e.g. if repository 'twp' from src_arv has been cloned into 65# /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A' 66# 67local_repo_dir = {} 68 69# List of collections that have been copied in this session, and their 70# destination collection UUIDs. 71collections_copied = {} 72 73# Set of (repository, script_version) two-tuples of commits copied in git. 74scripts_copied = set() 75 76# The owner_uuid of the object being copied 77src_owner_uuid = None 78 79def main(): 80 copy_opts = argparse.ArgumentParser(add_help=False) 81 82 copy_opts.add_argument( 83 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 84 help='Print version and exit.') 85 copy_opts.add_argument( 86 '-v', '--verbose', dest='verbose', action='store_true', 87 help='Verbose output.') 88 copy_opts.add_argument( 89 '--progress', dest='progress', action='store_true', 90 help='Report progress on copying collections. (default)') 91 copy_opts.add_argument( 92 '--no-progress', dest='progress', action='store_false', 93 help='Do not report progress on copying collections.') 94 copy_opts.add_argument( 95 '-f', '--force', dest='force', action='store_true', 96 help='Perform copy even if the object appears to exist at the remote destination.') 97 copy_opts.add_argument( 98 '--src', dest='source_arvados', 99 help=""" 100Client configuration location for the source Arvados cluster. 101May be either a configuration file path, or a plain identifier like `foo` 102to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 103If not provided, will search for a configuration file named after the cluster ID of the source object UUID. 104""", 105 ) 106 copy_opts.add_argument( 107 '--dst', dest='destination_arvados', 108 help=""" 109Client configuration location for the destination Arvados cluster. 110May be either a configuration file path, or a plain identifier like `foo` 111to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 112If not provided, will use the default client configuration from the environment or `settings.conf`. 113""", 114 ) 115 copy_opts.add_argument( 116 '--recursive', dest='recursive', action='store_true', 117 help='Recursively copy any dependencies for this object, and subprojects. (default)') 118 copy_opts.add_argument( 119 '--no-recursive', dest='recursive', action='store_false', 120 help='Do not copy any dependencies or subprojects.') 121 copy_opts.add_argument( 122 '--project-uuid', dest='project_uuid', 123 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 124 copy_opts.add_argument( 125 '--replication', 126 type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), 127 metavar='N', 128 help=""" 129Number of replicas per storage class for the copied collections at the destination. 130If not provided (or if provided with invalid value), 131use the destination's default replication-level setting (if found), 132or the fallback value 2. 133""") 134 copy_opts.add_argument( 135 '--storage-classes', 136 type=arv_cmd.UniqueSplit(), 137 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 138 copy_opts.add_argument("--varying-url-params", type=str, default="", 139 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 140 141 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 142 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 143 144 copy_opts.add_argument( 145 'object_uuid', 146 help='The UUID of the object to be copied.') 147 copy_opts.set_defaults(progress=True) 148 copy_opts.set_defaults(recursive=True) 149 150 parser = argparse.ArgumentParser( 151 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 152 parents=[copy_opts, arv_cmd.retry_opt]) 153 args = parser.parse_args() 154 155 if args.verbose: 156 arvlogger.setLevel(logging.DEBUG) 157 else: 158 arvlogger.setLevel(logging.INFO) 159 keeplogger.setLevel(logging.WARNING) 160 161 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 162 args.source_arvados = args.object_uuid[:5] 163 164 if not args.destination_arvados and args.project_uuid: 165 args.destination_arvados = args.project_uuid[:5] 166 167 # Make sure errors trying to connect to clusters get logged. 168 googleapi_logger.setLevel(logging.WARN) 169 googleapi_logger.addHandler(log_handler) 170 171 # Create API clients for the source and destination instances 172 src_arv = api_for_instance(args.source_arvados, args.retries) 173 dst_arv = api_for_instance(args.destination_arvados, args.retries) 174 175 # Once we've successfully contacted the clusters, we probably 176 # don't want to see logging about retries (unless the user asked 177 # for verbose output). 178 if not args.verbose: 179 googleapi_logger.setLevel(logging.ERROR) 180 181 if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: 182 logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) 183 else: 184 logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) 185 logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) 186 187 if not args.project_uuid: 188 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 189 190 # Identify the kind of object we have been given, and begin copying. 191 t = uuid_type(src_arv, args.object_uuid) 192 193 try: 194 if t == 'Collection': 195 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 196 result = copy_collection(args.object_uuid, 197 src_arv, dst_arv, 198 args) 199 elif t == 'Workflow': 200 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 201 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 202 elif t == 'Group': 203 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 204 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 205 elif t == 'httpURL' or t == 's3URL': 206 result = copy_from_url(args.object_uuid, src_arv, dst_arv, args) 207 else: 208 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 209 except Exception as e: 210 logger.error("%s", e, exc_info=args.verbose) 211 exit(1) 212 213 # Clean up any outstanding temp git repositories. 214 for d in local_repo_dir.values(): 215 shutil.rmtree(d, ignore_errors=True) 216 217 if not result: 218 exit(1) 219 220 # If no exception was thrown and the response does not have an 221 # error_token field, presume success 222 if result is None or 'error_token' in result or 'uuid' not in result: 223 if result: 224 logger.error("API server returned an error result: {}".format(result)) 225 exit(1) 226 227 print(result['uuid']) 228 229 if result.get('partial_error'): 230 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 231 exit(1) 232 233 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 234 exit(0) 235 236def set_src_owner_uuid(resource, uuid, args): 237 global src_owner_uuid 238 c = resource.get(uuid=uuid).execute(num_retries=args.retries) 239 src_owner_uuid = c.get("owner_uuid") 240 241# api_for_instance(instance_name) 242# 243# Creates an API client for the Arvados instance identified by 244# instance_name. 245# 246# If instance_name contains a slash, it is presumed to be a path 247# (either local or absolute) to a file with Arvados configuration 248# settings. 249# 250# Otherwise, it is presumed to be the name of a file in a standard 251# configuration directory. 252# 253def api_for_instance(instance_name, num_retries): 254 msg = [] 255 if instance_name: 256 if '/' in instance_name: 257 config_file = instance_name 258 else: 259 dirs = basedirs.BaseDirectories('CONFIG') 260 config_file = next(dirs.search(f'{instance_name}.conf'), '') 261 262 try: 263 cfg = arvados.config.load(config_file) 264 265 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 266 api_is_insecure = ( 267 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 268 ['1', 't', 'true', 'y', 'yes'])) 269 return arvados.api('v1', 270 host=cfg['ARVADOS_API_HOST'], 271 token=cfg['ARVADOS_API_TOKEN'], 272 insecure=api_is_insecure, 273 num_retries=num_retries, 274 ) 275 else: 276 msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) 277 except OSError as e: 278 if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): 279 verb = 'connect to instance from' 280 elif config_file: 281 verb = 'open' 282 else: 283 verb = 'find' 284 searchlist = ":".join(str(p) for p in dirs.search_paths()) 285 config_file = f'{instance_name}.conf in path {searchlist}' 286 msg.append(("Could not {} config file {}: {}").format( 287 verb, config_file, e.strerror)) 288 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: 289 msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) 290 291 default_api = None 292 default_instance = None 293 try: 294 default_api = arvados.api('v1', num_retries=num_retries) 295 default_instance = default_api.config()["ClusterID"] 296 except ValueError: 297 pass 298 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: 299 msg.append("Failed to connect to default instance, error was {}".format(e)) 300 301 if default_api is not None and (not instance_name or instance_name == default_instance): 302 # Use default settings 303 return default_api 304 305 if instance_name and default_instance and instance_name != default_instance: 306 msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) 307 308 for m in msg: 309 logger.error(m) 310 311 abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN') 312 313# Check if git is available 314def check_git_availability(): 315 try: 316 subprocess.run( 317 ['git', '--version'], 318 check=True, 319 stdout=subprocess.DEVNULL, 320 ) 321 except FileNotFoundError: 322 abort('git command is not available. Please ensure git is installed.') 323 324 325def filter_iter(arg): 326 """Iterate a filter string-or-list. 327 328 Pass in a filter field that can either be a string or list. 329 This will iterate elements as if the field had been written as a list. 330 """ 331 if isinstance(arg, str): 332 yield arg 333 else: 334 yield from arg 335 336def migrate_repository_filter(repo_filter, src_repository, dst_repository): 337 """Update a single repository filter in-place for the destination. 338 339 If the filter checks that the repository is src_repository, it is 340 updated to check that the repository is dst_repository. If it does 341 anything else, this function raises ValueError. 342 """ 343 if src_repository is None: 344 raise ValueError("component does not specify a source repository") 345 elif dst_repository is None: 346 raise ValueError("no destination repository specified to update repository filter") 347 elif repo_filter[1:] == ['=', src_repository]: 348 repo_filter[2] = dst_repository 349 elif repo_filter[1:] == ['in', [src_repository]]: 350 repo_filter[2] = [dst_repository] 351 else: 352 raise ValueError("repository filter is not a simple source match") 353 354def migrate_script_version_filter(version_filter): 355 """Update a single script_version filter in-place for the destination. 356 357 Currently this function checks that all the filter operands are Git 358 commit hashes. If they're not, it raises ValueError to indicate that 359 the filter is not portable. It could be extended to make other 360 transformations in the future. 361 """ 362 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 363 raise ValueError("script_version filter is not limited to commit hashes") 364 365def attr_filtered(filter_, *attr_names): 366 """Return True if filter_ applies to any of attr_names, else False.""" 367 return any((name == 'any') or (name in attr_names) 368 for name in filter_iter(filter_[0])) 369 370@contextlib.contextmanager 371def exception_handler(handler, *exc_types): 372 """If any exc_types are raised in the block, call handler on the exception.""" 373 try: 374 yield 375 except exc_types as error: 376 handler(error) 377 378 379# copy_workflow(wf_uuid, src, dst, args) 380# 381# Copies a workflow identified by wf_uuid from src to dst. 382# 383# If args.recursive is True, also copy any collections 384# referenced in the workflow definition yaml. 385# 386# The owner_uuid of the new workflow is set to any given 387# project_uuid or the user who copied the template. 388# 389# Returns the copied workflow object. 390# 391def copy_workflow(wf_uuid, src, dst, args): 392 # fetch the workflow from the source instance 393 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 394 395 if not wf["definition"]: 396 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 397 398 # copy collections and docker images 399 if args.recursive and wf["definition"]: 400 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 401 "ARVADOS_API_TOKEN": src.api_token, 402 "PATH": os.environ["PATH"]} 403 try: 404 result = subprocess.run( 405 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 406 env=env, 407 stdout=subprocess.PIPE, 408 universal_newlines=True, 409 ) 410 except FileNotFoundError: 411 no_arv_copy = True 412 else: 413 no_arv_copy = result.returncode == 2 414 415 if no_arv_copy: 416 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 417 elif result.returncode != 0: 418 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 419 420 locations = json.loads(result.stdout) 421 422 if locations: 423 copy_collections(locations, src, dst, args) 424 425 # copy the workflow itself 426 del wf['uuid'] 427 wf['owner_uuid'] = args.project_uuid 428 429 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 430 ["name", "=", wf["name"]]]).execute() 431 if len(existing["items"]) == 0: 432 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 433 else: 434 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries) 435 436 437def workflow_collections(obj, locations, docker_images): 438 if isinstance(obj, dict): 439 loc = obj.get('location', None) 440 if loc is not None: 441 if loc.startswith("keep:"): 442 locations.append(loc[5:]) 443 444 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 445 if docker_image is not None: 446 ds = docker_image.split(":", 1) 447 tag = ds[1] if len(ds)==2 else 'latest' 448 docker_images[ds[0]] = tag 449 450 for x in obj: 451 workflow_collections(obj[x], locations, docker_images) 452 elif isinstance(obj, list): 453 for x in obj: 454 workflow_collections(x, locations, docker_images) 455 456# copy_collections(obj, src, dst, args) 457# 458# Recursively copies all collections referenced by 'obj' from src 459# to dst. obj may be a dict or a list, in which case we run 460# copy_collections on every value it contains. If it is a string, 461# search it for any substring that matches a collection hash or uuid 462# (this will find hidden references to collections like 463# "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)") 464# 465# Returns a copy of obj with any old collection uuids replaced by 466# the new ones. 467# 468def copy_collections(obj, src, dst, args): 469 470 def copy_collection_fn(collection_match): 471 """Helper function for regex substitution: copies a single collection, 472 identified by the collection_match MatchObject, to the 473 destination. Returns the destination collection uuid (or the 474 portable data hash if that's what src_id is). 475 476 """ 477 src_id = collection_match.group(0) 478 if src_id not in collections_copied: 479 dst_col = copy_collection(src_id, src, dst, args) 480 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 481 collections_copied[src_id] = src_id 482 else: 483 collections_copied[src_id] = dst_col['uuid'] 484 return collections_copied[src_id] 485 486 if isinstance(obj, str): 487 # Copy any collections identified in this string to dst, replacing 488 # them with the dst uuids as necessary. 489 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 490 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 491 return obj 492 elif isinstance(obj, dict): 493 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 494 for v in obj) 495 elif isinstance(obj, list): 496 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 497 return obj 498 499 500def total_collection_size(manifest_text): 501 """Return the total number of bytes in this collection (excluding 502 duplicate blocks).""" 503 504 total_bytes = 0 505 locators_seen = {} 506 for line in manifest_text.splitlines(): 507 words = line.split() 508 for word in words[1:]: 509 try: 510 loc = arvados.KeepLocator(word) 511 except ValueError: 512 continue # this word isn't a locator, skip it 513 if loc.md5sum not in locators_seen: 514 locators_seen[loc.md5sum] = True 515 total_bytes += loc.size 516 517 return total_bytes 518 519def create_collection_from(c, src, dst, args): 520 """Create a new collection record on dst, and copy Docker metadata if 521 available.""" 522 523 collection_uuid = c['uuid'] 524 body = {} 525 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'): 526 body[d] = c[d] 527 528 if not body["name"]: 529 body['name'] = "copied from " + collection_uuid 530 531 if args.storage_classes: 532 body['storage_classes_desired'] = args.storage_classes 533 534 body['owner_uuid'] = args.project_uuid 535 536 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 537 538 # Create docker_image_repo+tag and docker_image_hash links 539 # at the destination. 540 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 541 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 542 543 for src_link in docker_links: 544 body = {key: src_link[key] 545 for key in ['link_class', 'name', 'properties']} 546 body['head_uuid'] = dst_collection['uuid'] 547 body['owner_uuid'] = args.project_uuid 548 549 lk = dst.links().create(body=body).execute(num_retries=args.retries) 550 logger.debug('created dst link {}'.format(lk)) 551 552 return dst_collection 553 554# copy_collection(obj_uuid, src, dst, args) 555# 556# Copies the collection identified by obj_uuid from src to dst. 557# Returns the collection object created at dst. 558# 559# If args.progress is True, produce a human-friendly progress 560# report. 561# 562# If a collection with the desired portable_data_hash already 563# exists at dst, and args.force is False, copy_collection returns 564# the existing collection without copying any blocks. Otherwise 565# (if no collection exists or if args.force is True) 566# copy_collection copies all of the collection data blocks from src 567# to dst. 568# 569# For this application, it is critical to preserve the 570# collection's manifest hash, which is not guaranteed with the 571# arvados.CollectionReader and arvados.CollectionWriter classes. 572# Copying each block in the collection manually, followed by 573# the manifest block, ensures that the collection's manifest 574# hash will not change. 575# 576def copy_collection(obj_uuid, src, dst, args): 577 if arvados.util.keep_locator_pattern.match(obj_uuid): 578 # If the obj_uuid is a portable data hash, it might not be 579 # uniquely identified with a particular collection. As a 580 # result, it is ambiguous as to what name to use for the copy. 581 # Apply some heuristics to pick which collection to get the 582 # name from. 583 srccol = src.collections().list( 584 filters=[['portable_data_hash', '=', obj_uuid]], 585 order="created_at asc" 586 ).execute(num_retries=args.retries) 587 588 items = srccol.get("items") 589 590 if not items: 591 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 592 return 593 594 c = None 595 596 if len(items) == 1: 597 # There's only one collection with the PDH, so use that. 598 c = items[0] 599 if not c: 600 # See if there is a collection that's in the same project 601 # as the root item (usually a workflow) being copied. 602 for i in items: 603 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 604 c = i 605 break 606 if not c: 607 # Didn't find any collections located in the same project, so 608 # pick the oldest collection that has a name assigned to it. 609 for i in items: 610 if i.get("name"): 611 c = i 612 break 613 if not c: 614 # None of the collections have names (?!), so just pick the 615 # first one. 616 c = items[0] 617 618 # list() doesn't return manifest text (and we don't want it to, 619 # because we don't need the same maninfest text sent to us 50 620 # times) so go and retrieve the collection object directly 621 # which will include the manifest text. 622 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 623 else: 624 # Assume this is an actual collection uuid, so fetch it directly. 625 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 626 627 # If a collection with this hash already exists at the 628 # destination, and 'force' is not true, just return that 629 # collection. 630 if not args.force: 631 if 'portable_data_hash' in c: 632 colhash = c['portable_data_hash'] 633 else: 634 colhash = c['uuid'] 635 dstcol = dst.collections().list( 636 filters=[['portable_data_hash', '=', colhash]] 637 ).execute(num_retries=args.retries) 638 if dstcol['items_available'] > 0: 639 for d in dstcol['items']: 640 if ((args.project_uuid == d['owner_uuid']) and 641 (c.get('name') == d['name']) and 642 (c['portable_data_hash'] == d['portable_data_hash'])): 643 return d 644 c['manifest_text'] = dst.collections().get( 645 uuid=dstcol['items'][0]['uuid'] 646 ).execute(num_retries=args.retries)['manifest_text'] 647 return create_collection_from(c, src, dst, args) 648 649 if args.replication is None: 650 # Obtain default or fallback collection replication setting on the 651 # destination 652 try: 653 args.replication = int(dst.config()["Collections"]["DefaultReplication"]) 654 except (KeyError, TypeError, ValueError): 655 args.replication = 2 656 657 # Fetch the collection's manifest. 658 manifest = c['manifest_text'] 659 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 660 661 # Copy each block from src_keep to dst_keep. 662 # Use the newly signed locators returned from dst_keep to build 663 # a new manifest as we go. 664 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 665 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 666 dst_manifest = io.StringIO() 667 dst_locators = {} 668 bytes_written = 0 669 bytes_expected = total_collection_size(manifest) 670 if args.progress: 671 progress_writer = ProgressWriter(human_progress) 672 else: 673 progress_writer = None 674 675 # go through the words 676 # put each block loc into 'get' queue 677 # 'get' threads get block and put it into 'put' queue 678 # 'put' threads put block and then update dst_locators 679 # 680 # after going through the whole manifest we go back through it 681 # again and build dst_manifest 682 683 lock = threading.Lock() 684 685 # the get queue should be unbounded because we'll add all the 686 # block hashes we want to get, but these are small 687 get_queue = queue.Queue() 688 689 threadcount = 4 690 691 # the put queue contains full data blocks 692 # and if 'get' is faster than 'put' we could end up consuming 693 # a great deal of RAM if it isn't bounded. 694 put_queue = queue.Queue(threadcount) 695 transfer_error = [] 696 697 def get_thread(): 698 while True: 699 word = get_queue.get() 700 if word is None: 701 put_queue.put(None) 702 get_queue.task_done() 703 return 704 705 blockhash = arvados.KeepLocator(word).md5sum 706 with lock: 707 if blockhash in dst_locators: 708 # Already uploaded 709 get_queue.task_done() 710 continue 711 712 try: 713 logger.debug("Getting block %s", word) 714 data = src_keep.get(word) 715 put_queue.put((word, data)) 716 except Exception as e: 717 logger.error("Error getting block %s: %s", word, e) 718 transfer_error.append(e) 719 try: 720 # Drain the 'get' queue so we end early 721 while True: 722 get_queue.get(False) 723 get_queue.task_done() 724 except queue.Empty: 725 pass 726 finally: 727 get_queue.task_done() 728 729 def put_thread(): 730 nonlocal bytes_written 731 while True: 732 item = put_queue.get() 733 if item is None: 734 put_queue.task_done() 735 return 736 737 word, data = item 738 loc = arvados.KeepLocator(word) 739 blockhash = loc.md5sum 740 with lock: 741 if blockhash in dst_locators: 742 # Already uploaded 743 put_queue.task_done() 744 continue 745 746 try: 747 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 748 dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) 749 with lock: 750 dst_locators[blockhash] = dst_locator 751 bytes_written += loc.size 752 if progress_writer: 753 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 754 except Exception as e: 755 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 756 try: 757 # Drain the 'get' queue so we end early 758 while True: 759 get_queue.get(False) 760 get_queue.task_done() 761 except queue.Empty: 762 pass 763 transfer_error.append(e) 764 finally: 765 put_queue.task_done() 766 767 for line in manifest.splitlines(): 768 words = line.split() 769 for word in words[1:]: 770 try: 771 loc = arvados.KeepLocator(word) 772 except ValueError: 773 # If 'word' can't be parsed as a locator, 774 # presume it's a filename. 775 continue 776 777 get_queue.put(word) 778 779 for i in range(0, threadcount): 780 get_queue.put(None) 781 782 for i in range(0, threadcount): 783 threading.Thread(target=get_thread, daemon=True).start() 784 785 for i in range(0, threadcount): 786 threading.Thread(target=put_thread, daemon=True).start() 787 788 get_queue.join() 789 put_queue.join() 790 791 if len(transfer_error) > 0: 792 return {"error_token": "Failed to transfer blocks"} 793 794 for line in manifest.splitlines(): 795 words = line.split() 796 dst_manifest.write(words[0]) 797 for word in words[1:]: 798 try: 799 loc = arvados.KeepLocator(word) 800 except ValueError: 801 # If 'word' can't be parsed as a locator, 802 # presume it's a filename. 803 dst_manifest.write(' ') 804 dst_manifest.write(word) 805 continue 806 blockhash = loc.md5sum 807 dst_manifest.write(' ') 808 dst_manifest.write(dst_locators[blockhash]) 809 dst_manifest.write("\n") 810 811 if progress_writer: 812 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 813 progress_writer.finish() 814 815 # Copy the manifest and save the collection. 816 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 817 818 c['manifest_text'] = dst_manifest.getvalue() 819 return create_collection_from(c, src, dst, args) 820 821def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 822 """Copy the docker image identified by docker_image and 823 docker_image_tag from src to dst. Create appropriate 824 docker_image_repo+tag and docker_image_hash links at dst. 825 826 """ 827 828 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 829 830 # Find the link identifying this docker image. 831 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 832 src, args.retries, docker_image, docker_image_tag) 833 if docker_image_list: 834 image_uuid, image_info = docker_image_list[0] 835 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 836 837 # Copy the collection it refers to. 838 dst_image_col = copy_collection(image_uuid, src, dst, args) 839 elif arvados.util.keep_locator_pattern.match(docker_image): 840 dst_image_col = copy_collection(docker_image, src, dst, args) 841 else: 842 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag)) 843 844def copy_project(obj_uuid, src, dst, owner_uuid, args): 845 846 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 847 848 # Create/update the destination project 849 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 850 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 851 if len(existing["items"]) == 0: 852 project_record = dst.groups().create(body={"group": {"group_class": "project", 853 "owner_uuid": owner_uuid, 854 "name": src_project_record["name"]}}).execute(num_retries=args.retries) 855 else: 856 project_record = existing["items"][0] 857 858 dst.groups().update(uuid=project_record["uuid"], 859 body={"group": { 860 "description": src_project_record["description"]}}).execute(num_retries=args.retries) 861 862 args.project_uuid = project_record["uuid"] 863 864 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 865 866 867 partial_error = "" 868 869 # Copy collections 870 try: 871 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 872 src, dst, args) 873 except Exception as e: 874 partial_error += "\n" + str(e) 875 876 # Copy workflows 877 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 878 try: 879 copy_workflow(w["uuid"], src, dst, args) 880 except Exception as e: 881 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 882 883 if args.recursive: 884 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 885 try: 886 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 887 except Exception as e: 888 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 889 890 project_record["partial_error"] = partial_error 891 892 return project_record 893 894# git_rev_parse(rev, repo) 895# 896# Returns the 40-character commit hash corresponding to 'rev' in 897# git repository 'repo' (which must be the path of a local git 898# repository) 899# 900def git_rev_parse(rev, repo): 901 proc = subprocess.run( 902 ['git', 'rev-parse', rev], 903 check=True, 904 cwd=repo, 905 stdout=subprocess.PIPE, 906 text=True, 907 ) 908 return proc.stdout.read().strip() 909 910# uuid_type(api, object_uuid) 911# 912# Returns the name of the class that object_uuid belongs to, based on 913# the second field of the uuid. This function consults the api's 914# schema to identify the object class. 915# 916# It returns a string such as 'Collection', 'Workflow', etc. 917# 918# Special case: if handed a Keep locator hash, return 'Collection'. 919# 920def uuid_type(api, object_uuid): 921 if re.match(arvados.util.keep_locator_pattern, object_uuid): 922 return 'Collection' 923 924 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 925 return 'httpURL' 926 927 if object_uuid.startswith("s3:"): 928 return 's3URL' 929 930 p = object_uuid.split('-') 931 if len(p) == 3: 932 type_prefix = p[1] 933 for k in api._schema.schemas: 934 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 935 if type_prefix == obj_class: 936 return k 937 return None 938 939 940def copy_from_url(url, src, dst, args): 941 942 project_uuid = args.project_uuid 943 # Ensure string of varying parameters is well-formed 944 prefer_cached_downloads = args.prefer_cached_downloads 945 946 cached = to_keep_util.CheckCacheResult(None, None, None, None, None) 947 948 if url.startswith("http:") or url.startswith("https:"): 949 cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, 950 varying_url_params=args.varying_url_params, 951 prefer_cached_downloads=prefer_cached_downloads) 952 elif url.startswith("s3:"): 953 import boto3.session 954 botosession = boto3.session.Session() 955 cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {}, 956 prefer_cached_downloads=prefer_cached_downloads) 957 958 if cached[2] is not None: 959 return copy_collection(cached[2], src, dst, args) 960 961 if url.startswith("http:") or url.startswith("https:"): 962 cached = http_to_keep.http_to_keep(dst, project_uuid, url, 963 varying_url_params=args.varying_url_params, 964 prefer_cached_downloads=prefer_cached_downloads) 965 elif url.startswith("s3:"): 966 cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url, 967 prefer_cached_downloads=prefer_cached_downloads) 968 969 if cached is not None: 970 return {"uuid": cached[2]} 971 972 973def abort(msg, code=1): 974 logger.info("arv-copy: %s", msg) 975 exit(code) 976 977 978# Code for reporting on the progress of a collection upload. 979# Stolen from arvados.commands.put.ArvPutCollectionWriter 980# TODO(twp): figure out how to refactor into a shared library 981# (may involve refactoring some arvados.commands.arv_copy.copy_collection 982# code) 983 984def machine_progress(obj_uuid, bytes_written, bytes_expected): 985 return "{} {}: {} {} written {} total\n".format( 986 sys.argv[0], 987 os.getpid(), 988 obj_uuid, 989 bytes_written, 990 -1 if (bytes_expected is None) else bytes_expected) 991 992def human_progress(obj_uuid, bytes_written, bytes_expected): 993 if bytes_expected: 994 return "\r{}: {}M / {}M {:.1%} ".format( 995 obj_uuid, 996 bytes_written >> 20, bytes_expected >> 20, 997 float(bytes_written) / bytes_expected) 998 else: 999 return "\r{}: {} ".format(obj_uuid, bytes_written) 1000 1001class ProgressWriter(object): 1002 _progress_func = None 1003 outfile = sys.stderr 1004 1005 def __init__(self, progress_func): 1006 self._progress_func = progress_func 1007 1008 def report(self, obj_uuid, bytes_written, bytes_expected): 1009 if self._progress_func is not None: 1010 self.outfile.write( 1011 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 1012 1013 def finish(self): 1014 self.outfile.write("\n") 1015 1016if __name__ == '__main__': 1017 main()
80def main(): 81 copy_opts = argparse.ArgumentParser(add_help=False) 82 83 copy_opts.add_argument( 84 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 85 help='Print version and exit.') 86 copy_opts.add_argument( 87 '-v', '--verbose', dest='verbose', action='store_true', 88 help='Verbose output.') 89 copy_opts.add_argument( 90 '--progress', dest='progress', action='store_true', 91 help='Report progress on copying collections. (default)') 92 copy_opts.add_argument( 93 '--no-progress', dest='progress', action='store_false', 94 help='Do not report progress on copying collections.') 95 copy_opts.add_argument( 96 '-f', '--force', dest='force', action='store_true', 97 help='Perform copy even if the object appears to exist at the remote destination.') 98 copy_opts.add_argument( 99 '--src', dest='source_arvados', 100 help=""" 101Client configuration location for the source Arvados cluster. 102May be either a configuration file path, or a plain identifier like `foo` 103to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 104If not provided, will search for a configuration file named after the cluster ID of the source object UUID. 105""", 106 ) 107 copy_opts.add_argument( 108 '--dst', dest='destination_arvados', 109 help=""" 110Client configuration location for the destination Arvados cluster. 111May be either a configuration file path, or a plain identifier like `foo` 112to search for a configuration file `foo.conf` under a systemd or XDG configuration directory. 113If not provided, will use the default client configuration from the environment or `settings.conf`. 114""", 115 ) 116 copy_opts.add_argument( 117 '--recursive', dest='recursive', action='store_true', 118 help='Recursively copy any dependencies for this object, and subprojects. (default)') 119 copy_opts.add_argument( 120 '--no-recursive', dest='recursive', action='store_false', 121 help='Do not copy any dependencies or subprojects.') 122 copy_opts.add_argument( 123 '--project-uuid', dest='project_uuid', 124 help='The UUID of the project at the destination to which the collection or workflow should be copied.') 125 copy_opts.add_argument( 126 '--replication', 127 type=arv_cmd.RangedValue(int, range(1, sys.maxsize)), 128 metavar='N', 129 help=""" 130Number of replicas per storage class for the copied collections at the destination. 131If not provided (or if provided with invalid value), 132use the destination's default replication-level setting (if found), 133or the fallback value 2. 134""") 135 copy_opts.add_argument( 136 '--storage-classes', 137 type=arv_cmd.UniqueSplit(), 138 help='Comma separated list of storage classes to be used when saving data to the destinaton Arvados instance.') 139 copy_opts.add_argument("--varying-url-params", type=str, default="", 140 help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.") 141 142 copy_opts.add_argument("--prefer-cached-downloads", action="store_true", default=False, 143 help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).") 144 145 copy_opts.add_argument( 146 'object_uuid', 147 help='The UUID of the object to be copied.') 148 copy_opts.set_defaults(progress=True) 149 copy_opts.set_defaults(recursive=True) 150 151 parser = argparse.ArgumentParser( 152 description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.', 153 parents=[copy_opts, arv_cmd.retry_opt]) 154 args = parser.parse_args() 155 156 if args.verbose: 157 arvlogger.setLevel(logging.DEBUG) 158 else: 159 arvlogger.setLevel(logging.INFO) 160 keeplogger.setLevel(logging.WARNING) 161 162 if not args.source_arvados and arvados.util.uuid_pattern.match(args.object_uuid): 163 args.source_arvados = args.object_uuid[:5] 164 165 if not args.destination_arvados and args.project_uuid: 166 args.destination_arvados = args.project_uuid[:5] 167 168 # Make sure errors trying to connect to clusters get logged. 169 googleapi_logger.setLevel(logging.WARN) 170 googleapi_logger.addHandler(log_handler) 171 172 # Create API clients for the source and destination instances 173 src_arv = api_for_instance(args.source_arvados, args.retries) 174 dst_arv = api_for_instance(args.destination_arvados, args.retries) 175 176 # Once we've successfully contacted the clusters, we probably 177 # don't want to see logging about retries (unless the user asked 178 # for verbose output). 179 if not args.verbose: 180 googleapi_logger.setLevel(logging.ERROR) 181 182 if src_arv.config()["ClusterID"] == dst_arv.config()["ClusterID"]: 183 logger.info("Copying within cluster %s", src_arv.config()["ClusterID"]) 184 else: 185 logger.info("Source cluster is %s", src_arv.config()["ClusterID"]) 186 logger.info("Destination cluster is %s", dst_arv.config()["ClusterID"]) 187 188 if not args.project_uuid: 189 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 190 191 # Identify the kind of object we have been given, and begin copying. 192 t = uuid_type(src_arv, args.object_uuid) 193 194 try: 195 if t == 'Collection': 196 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 197 result = copy_collection(args.object_uuid, 198 src_arv, dst_arv, 199 args) 200 elif t == 'Workflow': 201 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 202 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 203 elif t == 'Group': 204 set_src_owner_uuid(src_arv.groups(), args.object_uuid, args) 205 result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args) 206 elif t == 'httpURL' or t == 's3URL': 207 result = copy_from_url(args.object_uuid, src_arv, dst_arv, args) 208 else: 209 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 210 except Exception as e: 211 logger.error("%s", e, exc_info=args.verbose) 212 exit(1) 213 214 # Clean up any outstanding temp git repositories. 215 for d in local_repo_dir.values(): 216 shutil.rmtree(d, ignore_errors=True) 217 218 if not result: 219 exit(1) 220 221 # If no exception was thrown and the response does not have an 222 # error_token field, presume success 223 if result is None or 'error_token' in result or 'uuid' not in result: 224 if result: 225 logger.error("API server returned an error result: {}".format(result)) 226 exit(1) 227 228 print(result['uuid']) 229 230 if result.get('partial_error'): 231 logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error'])) 232 exit(1) 233 234 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 235 exit(0)
254def api_for_instance(instance_name, num_retries): 255 msg = [] 256 if instance_name: 257 if '/' in instance_name: 258 config_file = instance_name 259 else: 260 dirs = basedirs.BaseDirectories('CONFIG') 261 config_file = next(dirs.search(f'{instance_name}.conf'), '') 262 263 try: 264 cfg = arvados.config.load(config_file) 265 266 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 267 api_is_insecure = ( 268 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 269 ['1', 't', 'true', 'y', 'yes'])) 270 return arvados.api('v1', 271 host=cfg['ARVADOS_API_HOST'], 272 token=cfg['ARVADOS_API_TOKEN'], 273 insecure=api_is_insecure, 274 num_retries=num_retries, 275 ) 276 else: 277 msg.append('missing ARVADOS_API_HOST or ARVADOS_API_TOKEN for {} in config file {}'.format(instance_name, config_file)) 278 except OSError as e: 279 if e.errno in (errno.EHOSTUNREACH, errno.ECONNREFUSED, errno.ECONNRESET, errno.ENETUNREACH): 280 verb = 'connect to instance from' 281 elif config_file: 282 verb = 'open' 283 else: 284 verb = 'find' 285 searchlist = ":".join(str(p) for p in dirs.search_paths()) 286 config_file = f'{instance_name}.conf in path {searchlist}' 287 msg.append(("Could not {} config file {}: {}").format( 288 verb, config_file, e.strerror)) 289 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error) as e: 290 msg.append("Failed to connect to instance {} at {}, error was {}".format(instance_name, cfg['ARVADOS_API_HOST'], e)) 291 292 default_api = None 293 default_instance = None 294 try: 295 default_api = arvados.api('v1', num_retries=num_retries) 296 default_instance = default_api.config()["ClusterID"] 297 except ValueError: 298 pass 299 except (httplib2.error.HttpLib2Error, googleapiclient.errors.Error, OSError) as e: 300 msg.append("Failed to connect to default instance, error was {}".format(e)) 301 302 if default_api is not None and (not instance_name or instance_name == default_instance): 303 # Use default settings 304 return default_api 305 306 if instance_name and default_instance and instance_name != default_instance: 307 msg.append("Default credentials are for {} but need to connect to {}".format(default_instance, instance_name)) 308 309 for m in msg: 310 logger.error(m) 311 312 abort('Unable to find usable ARVADOS_API_HOST and ARVADOS_API_TOKEN')
326def filter_iter(arg): 327 """Iterate a filter string-or-list. 328 329 Pass in a filter field that can either be a string or list. 330 This will iterate elements as if the field had been written as a list. 331 """ 332 if isinstance(arg, str): 333 yield arg 334 else: 335 yield from arg
Iterate a filter string-or-list.
Pass in a filter field that can either be a string or list. This will iterate elements as if the field had been written as a list.
337def migrate_repository_filter(repo_filter, src_repository, dst_repository): 338 """Update a single repository filter in-place for the destination. 339 340 If the filter checks that the repository is src_repository, it is 341 updated to check that the repository is dst_repository. If it does 342 anything else, this function raises ValueError. 343 """ 344 if src_repository is None: 345 raise ValueError("component does not specify a source repository") 346 elif dst_repository is None: 347 raise ValueError("no destination repository specified to update repository filter") 348 elif repo_filter[1:] == ['=', src_repository]: 349 repo_filter[2] = dst_repository 350 elif repo_filter[1:] == ['in', [src_repository]]: 351 repo_filter[2] = [dst_repository] 352 else: 353 raise ValueError("repository filter is not a simple source match")
Update a single repository filter in-place for the destination.
If the filter checks that the repository is src_repository, it is updated to check that the repository is dst_repository. If it does anything else, this function raises ValueError.
355def migrate_script_version_filter(version_filter): 356 """Update a single script_version filter in-place for the destination. 357 358 Currently this function checks that all the filter operands are Git 359 commit hashes. If they're not, it raises ValueError to indicate that 360 the filter is not portable. It could be extended to make other 361 transformations in the future. 362 """ 363 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 364 raise ValueError("script_version filter is not limited to commit hashes")
Update a single script_version filter in-place for the destination.
Currently this function checks that all the filter operands are Git commit hashes. If they’re not, it raises ValueError to indicate that the filter is not portable. It could be extended to make other transformations in the future.
366def attr_filtered(filter_, *attr_names): 367 """Return True if filter_ applies to any of attr_names, else False.""" 368 return any((name == 'any') or (name in attr_names) 369 for name in filter_iter(filter_[0]))
Return True if filter_ applies to any of attr_names, else False.
371@contextlib.contextmanager 372def exception_handler(handler, *exc_types): 373 """If any exc_types are raised in the block, call handler on the exception.""" 374 try: 375 yield 376 except exc_types as error: 377 handler(error)
If any exc_types are raised in the block, call handler on the exception.
392def copy_workflow(wf_uuid, src, dst, args): 393 # fetch the workflow from the source instance 394 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 395 396 if not wf["definition"]: 397 logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid)) 398 399 # copy collections and docker images 400 if args.recursive and wf["definition"]: 401 env = {"ARVADOS_API_HOST": urllib.parse.urlparse(src._rootDesc["rootUrl"]).netloc, 402 "ARVADOS_API_TOKEN": src.api_token, 403 "PATH": os.environ["PATH"]} 404 try: 405 result = subprocess.run( 406 ["arvados-cwl-runner", "--quiet", "--print-keep-deps", "arvwf:"+wf_uuid], 407 env=env, 408 stdout=subprocess.PIPE, 409 universal_newlines=True, 410 ) 411 except FileNotFoundError: 412 no_arv_copy = True 413 else: 414 no_arv_copy = result.returncode == 2 415 416 if no_arv_copy: 417 raise Exception('Copying workflows requires arvados-cwl-runner 2.7.1 or later to be installed in PATH.') 418 elif result.returncode != 0: 419 raise Exception('There was an error getting Keep dependencies from workflow using arvados-cwl-runner --print-keep-deps') 420 421 locations = json.loads(result.stdout) 422 423 if locations: 424 copy_collections(locations, src, dst, args) 425 426 # copy the workflow itself 427 del wf['uuid'] 428 wf['owner_uuid'] = args.project_uuid 429 430 existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid], 431 ["name", "=", wf["name"]]]).execute() 432 if len(existing["items"]) == 0: 433 return dst.workflows().create(body=wf).execute(num_retries=args.retries) 434 else: 435 return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
438def workflow_collections(obj, locations, docker_images): 439 if isinstance(obj, dict): 440 loc = obj.get('location', None) 441 if loc is not None: 442 if loc.startswith("keep:"): 443 locations.append(loc[5:]) 444 445 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None) 446 if docker_image is not None: 447 ds = docker_image.split(":", 1) 448 tag = ds[1] if len(ds)==2 else 'latest' 449 docker_images[ds[0]] = tag 450 451 for x in obj: 452 workflow_collections(obj[x], locations, docker_images) 453 elif isinstance(obj, list): 454 for x in obj: 455 workflow_collections(x, locations, docker_images)
469def copy_collections(obj, src, dst, args): 470 471 def copy_collection_fn(collection_match): 472 """Helper function for regex substitution: copies a single collection, 473 identified by the collection_match MatchObject, to the 474 destination. Returns the destination collection uuid (or the 475 portable data hash if that's what src_id is). 476 477 """ 478 src_id = collection_match.group(0) 479 if src_id not in collections_copied: 480 dst_col = copy_collection(src_id, src, dst, args) 481 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 482 collections_copied[src_id] = src_id 483 else: 484 collections_copied[src_id] = dst_col['uuid'] 485 return collections_copied[src_id] 486 487 if isinstance(obj, str): 488 # Copy any collections identified in this string to dst, replacing 489 # them with the dst uuids as necessary. 490 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 491 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 492 return obj 493 elif isinstance(obj, dict): 494 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 495 for v in obj) 496 elif isinstance(obj, list): 497 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 498 return obj
501def total_collection_size(manifest_text): 502 """Return the total number of bytes in this collection (excluding 503 duplicate blocks).""" 504 505 total_bytes = 0 506 locators_seen = {} 507 for line in manifest_text.splitlines(): 508 words = line.split() 509 for word in words[1:]: 510 try: 511 loc = arvados.KeepLocator(word) 512 except ValueError: 513 continue # this word isn't a locator, skip it 514 if loc.md5sum not in locators_seen: 515 locators_seen[loc.md5sum] = True 516 total_bytes += loc.size 517 518 return total_bytes
Return the total number of bytes in this collection (excluding duplicate blocks).
520def create_collection_from(c, src, dst, args): 521 """Create a new collection record on dst, and copy Docker metadata if 522 available.""" 523 524 collection_uuid = c['uuid'] 525 body = {} 526 for d in ('description', 'manifest_text', 'name', 'portable_data_hash', 'properties'): 527 body[d] = c[d] 528 529 if not body["name"]: 530 body['name'] = "copied from " + collection_uuid 531 532 if args.storage_classes: 533 body['storage_classes_desired'] = args.storage_classes 534 535 body['owner_uuid'] = args.project_uuid 536 537 dst_collection = dst.collections().create(body=body, ensure_unique_name=True).execute(num_retries=args.retries) 538 539 # Create docker_image_repo+tag and docker_image_hash links 540 # at the destination. 541 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 542 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 543 544 for src_link in docker_links: 545 body = {key: src_link[key] 546 for key in ['link_class', 'name', 'properties']} 547 body['head_uuid'] = dst_collection['uuid'] 548 body['owner_uuid'] = args.project_uuid 549 550 lk = dst.links().create(body=body).execute(num_retries=args.retries) 551 logger.debug('created dst link {}'.format(lk)) 552 553 return dst_collection
Create a new collection record on dst, and copy Docker metadata if available.
577def copy_collection(obj_uuid, src, dst, args): 578 if arvados.util.keep_locator_pattern.match(obj_uuid): 579 # If the obj_uuid is a portable data hash, it might not be 580 # uniquely identified with a particular collection. As a 581 # result, it is ambiguous as to what name to use for the copy. 582 # Apply some heuristics to pick which collection to get the 583 # name from. 584 srccol = src.collections().list( 585 filters=[['portable_data_hash', '=', obj_uuid]], 586 order="created_at asc" 587 ).execute(num_retries=args.retries) 588 589 items = srccol.get("items") 590 591 if not items: 592 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 593 return 594 595 c = None 596 597 if len(items) == 1: 598 # There's only one collection with the PDH, so use that. 599 c = items[0] 600 if not c: 601 # See if there is a collection that's in the same project 602 # as the root item (usually a workflow) being copied. 603 for i in items: 604 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 605 c = i 606 break 607 if not c: 608 # Didn't find any collections located in the same project, so 609 # pick the oldest collection that has a name assigned to it. 610 for i in items: 611 if i.get("name"): 612 c = i 613 break 614 if not c: 615 # None of the collections have names (?!), so just pick the 616 # first one. 617 c = items[0] 618 619 # list() doesn't return manifest text (and we don't want it to, 620 # because we don't need the same maninfest text sent to us 50 621 # times) so go and retrieve the collection object directly 622 # which will include the manifest text. 623 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 624 else: 625 # Assume this is an actual collection uuid, so fetch it directly. 626 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 627 628 # If a collection with this hash already exists at the 629 # destination, and 'force' is not true, just return that 630 # collection. 631 if not args.force: 632 if 'portable_data_hash' in c: 633 colhash = c['portable_data_hash'] 634 else: 635 colhash = c['uuid'] 636 dstcol = dst.collections().list( 637 filters=[['portable_data_hash', '=', colhash]] 638 ).execute(num_retries=args.retries) 639 if dstcol['items_available'] > 0: 640 for d in dstcol['items']: 641 if ((args.project_uuid == d['owner_uuid']) and 642 (c.get('name') == d['name']) and 643 (c['portable_data_hash'] == d['portable_data_hash'])): 644 return d 645 c['manifest_text'] = dst.collections().get( 646 uuid=dstcol['items'][0]['uuid'] 647 ).execute(num_retries=args.retries)['manifest_text'] 648 return create_collection_from(c, src, dst, args) 649 650 if args.replication is None: 651 # Obtain default or fallback collection replication setting on the 652 # destination 653 try: 654 args.replication = int(dst.config()["Collections"]["DefaultReplication"]) 655 except (KeyError, TypeError, ValueError): 656 args.replication = 2 657 658 # Fetch the collection's manifest. 659 manifest = c['manifest_text'] 660 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 661 662 # Copy each block from src_keep to dst_keep. 663 # Use the newly signed locators returned from dst_keep to build 664 # a new manifest as we go. 665 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 666 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 667 dst_manifest = io.StringIO() 668 dst_locators = {} 669 bytes_written = 0 670 bytes_expected = total_collection_size(manifest) 671 if args.progress: 672 progress_writer = ProgressWriter(human_progress) 673 else: 674 progress_writer = None 675 676 # go through the words 677 # put each block loc into 'get' queue 678 # 'get' threads get block and put it into 'put' queue 679 # 'put' threads put block and then update dst_locators 680 # 681 # after going through the whole manifest we go back through it 682 # again and build dst_manifest 683 684 lock = threading.Lock() 685 686 # the get queue should be unbounded because we'll add all the 687 # block hashes we want to get, but these are small 688 get_queue = queue.Queue() 689 690 threadcount = 4 691 692 # the put queue contains full data blocks 693 # and if 'get' is faster than 'put' we could end up consuming 694 # a great deal of RAM if it isn't bounded. 695 put_queue = queue.Queue(threadcount) 696 transfer_error = [] 697 698 def get_thread(): 699 while True: 700 word = get_queue.get() 701 if word is None: 702 put_queue.put(None) 703 get_queue.task_done() 704 return 705 706 blockhash = arvados.KeepLocator(word).md5sum 707 with lock: 708 if blockhash in dst_locators: 709 # Already uploaded 710 get_queue.task_done() 711 continue 712 713 try: 714 logger.debug("Getting block %s", word) 715 data = src_keep.get(word) 716 put_queue.put((word, data)) 717 except Exception as e: 718 logger.error("Error getting block %s: %s", word, e) 719 transfer_error.append(e) 720 try: 721 # Drain the 'get' queue so we end early 722 while True: 723 get_queue.get(False) 724 get_queue.task_done() 725 except queue.Empty: 726 pass 727 finally: 728 get_queue.task_done() 729 730 def put_thread(): 731 nonlocal bytes_written 732 while True: 733 item = put_queue.get() 734 if item is None: 735 put_queue.task_done() 736 return 737 738 word, data = item 739 loc = arvados.KeepLocator(word) 740 blockhash = loc.md5sum 741 with lock: 742 if blockhash in dst_locators: 743 # Already uploaded 744 put_queue.task_done() 745 continue 746 747 try: 748 logger.debug("Putting block %s (%s bytes)", blockhash, loc.size) 749 dst_locator = dst_keep.put(data, copies=args.replication, classes=(args.storage_classes or [])) 750 with lock: 751 dst_locators[blockhash] = dst_locator 752 bytes_written += loc.size 753 if progress_writer: 754 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 755 except Exception as e: 756 logger.error("Error putting block %s (%s bytes): %s", blockhash, loc.size, e) 757 try: 758 # Drain the 'get' queue so we end early 759 while True: 760 get_queue.get(False) 761 get_queue.task_done() 762 except queue.Empty: 763 pass 764 transfer_error.append(e) 765 finally: 766 put_queue.task_done() 767 768 for line in manifest.splitlines(): 769 words = line.split() 770 for word in words[1:]: 771 try: 772 loc = arvados.KeepLocator(word) 773 except ValueError: 774 # If 'word' can't be parsed as a locator, 775 # presume it's a filename. 776 continue 777 778 get_queue.put(word) 779 780 for i in range(0, threadcount): 781 get_queue.put(None) 782 783 for i in range(0, threadcount): 784 threading.Thread(target=get_thread, daemon=True).start() 785 786 for i in range(0, threadcount): 787 threading.Thread(target=put_thread, daemon=True).start() 788 789 get_queue.join() 790 put_queue.join() 791 792 if len(transfer_error) > 0: 793 return {"error_token": "Failed to transfer blocks"} 794 795 for line in manifest.splitlines(): 796 words = line.split() 797 dst_manifest.write(words[0]) 798 for word in words[1:]: 799 try: 800 loc = arvados.KeepLocator(word) 801 except ValueError: 802 # If 'word' can't be parsed as a locator, 803 # presume it's a filename. 804 dst_manifest.write(' ') 805 dst_manifest.write(word) 806 continue 807 blockhash = loc.md5sum 808 dst_manifest.write(' ') 809 dst_manifest.write(dst_locators[blockhash]) 810 dst_manifest.write("\n") 811 812 if progress_writer: 813 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 814 progress_writer.finish() 815 816 # Copy the manifest and save the collection. 817 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue()) 818 819 c['manifest_text'] = dst_manifest.getvalue() 820 return create_collection_from(c, src, dst, args)
822def copy_docker_image(docker_image, docker_image_tag, src, dst, args): 823 """Copy the docker image identified by docker_image and 824 docker_image_tag from src to dst. Create appropriate 825 docker_image_repo+tag and docker_image_hash links at dst. 826 827 """ 828 829 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 830 831 # Find the link identifying this docker image. 832 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 833 src, args.retries, docker_image, docker_image_tag) 834 if docker_image_list: 835 image_uuid, image_info = docker_image_list[0] 836 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 837 838 # Copy the collection it refers to. 839 dst_image_col = copy_collection(image_uuid, src, dst, args) 840 elif arvados.util.keep_locator_pattern.match(docker_image): 841 dst_image_col = copy_collection(docker_image, src, dst, args) 842 else: 843 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
Copy the docker image identified by docker_image and docker_image_tag from src to dst. Create appropriate docker_image_repo+tag and docker_image_hash links at dst.
845def copy_project(obj_uuid, src, dst, owner_uuid, args): 846 847 src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries) 848 849 # Create/update the destination project 850 existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid], 851 ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries) 852 if len(existing["items"]) == 0: 853 project_record = dst.groups().create(body={"group": {"group_class": "project", 854 "owner_uuid": owner_uuid, 855 "name": src_project_record["name"]}}).execute(num_retries=args.retries) 856 else: 857 project_record = existing["items"][0] 858 859 dst.groups().update(uuid=project_record["uuid"], 860 body={"group": { 861 "description": src_project_record["description"]}}).execute(num_retries=args.retries) 862 863 args.project_uuid = project_record["uuid"] 864 865 logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"]) 866 867 868 partial_error = "" 869 870 # Copy collections 871 try: 872 copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])], 873 src, dst, args) 874 except Exception as e: 875 partial_error += "\n" + str(e) 876 877 # Copy workflows 878 for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]): 879 try: 880 copy_workflow(w["uuid"], src, dst, args) 881 except Exception as e: 882 partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e) 883 884 if args.recursive: 885 for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]): 886 try: 887 copy_project(g["uuid"], src, dst, project_record["uuid"], args) 888 except Exception as e: 889 partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e) 890 891 project_record["partial_error"] = partial_error 892 893 return project_record
921def uuid_type(api, object_uuid): 922 if re.match(arvados.util.keep_locator_pattern, object_uuid): 923 return 'Collection' 924 925 if object_uuid.startswith("http:") or object_uuid.startswith("https:"): 926 return 'httpURL' 927 928 if object_uuid.startswith("s3:"): 929 return 's3URL' 930 931 p = object_uuid.split('-') 932 if len(p) == 3: 933 type_prefix = p[1] 934 for k in api._schema.schemas: 935 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 936 if type_prefix == obj_class: 937 return k 938 return None
941def copy_from_url(url, src, dst, args): 942 943 project_uuid = args.project_uuid 944 # Ensure string of varying parameters is well-formed 945 prefer_cached_downloads = args.prefer_cached_downloads 946 947 cached = to_keep_util.CheckCacheResult(None, None, None, None, None) 948 949 if url.startswith("http:") or url.startswith("https:"): 950 cached = http_to_keep.check_cached_url(src, project_uuid, url, {}, 951 varying_url_params=args.varying_url_params, 952 prefer_cached_downloads=prefer_cached_downloads) 953 elif url.startswith("s3:"): 954 import boto3.session 955 botosession = boto3.session.Session() 956 cached = s3_to_keep.check_cached_url(src, botosession, project_uuid, url, {}, 957 prefer_cached_downloads=prefer_cached_downloads) 958 959 if cached[2] is not None: 960 return copy_collection(cached[2], src, dst, args) 961 962 if url.startswith("http:") or url.startswith("https:"): 963 cached = http_to_keep.http_to_keep(dst, project_uuid, url, 964 varying_url_params=args.varying_url_params, 965 prefer_cached_downloads=prefer_cached_downloads) 966 elif url.startswith("s3:"): 967 cached = s3_to_keep.s3_to_keep(dst, botosession, project_uuid, url, 968 prefer_cached_downloads=prefer_cached_downloads) 969 970 if cached is not None: 971 return {"uuid": cached[2]}
993def human_progress(obj_uuid, bytes_written, bytes_expected): 994 if bytes_expected: 995 return "\r{}: {}M / {}M {:.1%} ".format( 996 obj_uuid, 997 bytes_written >> 20, bytes_expected >> 20, 998 float(bytes_written) / bytes_expected) 999 else: 1000 return "\r{}: {} ".format(obj_uuid, bytes_written)
1002class ProgressWriter(object): 1003 _progress_func = None 1004 outfile = sys.stderr 1005 1006 def __init__(self, progress_func): 1007 self._progress_func = progress_func 1008 1009 def report(self, obj_uuid, bytes_written, bytes_expected): 1010 if self._progress_func is not None: 1011 self.outfile.write( 1012 self._progress_func(obj_uuid, bytes_written, bytes_expected)) 1013 1014 def finish(self): 1015 self.outfile.write("\n")