Package arvados :: Package commands :: Module arv_copy
[hide private]
[frames] | no frames]

Source Code for Module arvados.commands.arv_copy

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  # arv-copy [--recursive] [--no-recursive] object-uuid src dst 
  6  # 
  7  # Copies an object from Arvados instance src to instance dst. 
  8  # 
  9  # By default, arv-copy recursively copies any dependent objects 
 10  # necessary to make the object functional in the new instance 
 11  # (e.g. for a pipeline instance, arv-copy copies the pipeline 
 12  # template, input collection, docker images, git repositories). If 
 13  # --no-recursive is given, arv-copy copies only the single record 
 14  # identified by object-uuid. 
 15  # 
 16  # The user must have files $HOME/.config/arvados/{src}.conf and 
 17  # $HOME/.config/arvados/{dst}.conf with valid login credentials for 
 18  # instances src and dst.  If either of these files is not found, 
 19  # arv-copy will issue an error. 
 20   
 21  from __future__ import division 
 22  from future import standard_library 
 23  from future.utils import listvalues 
 24  standard_library.install_aliases() 
 25  from past.builtins import basestring 
 26  from builtins import object 
 27  import argparse 
 28  import contextlib 
 29  import getpass 
 30  import os 
 31  import re 
 32  import shutil 
 33  import sys 
 34  import logging 
 35  import tempfile 
 36  import urllib.parse 
 37   
 38  import arvados 
 39  import arvados.config 
 40  import arvados.keep 
 41  import arvados.util 
 42  import arvados.commands._util as arv_cmd 
 43  import arvados.commands.keepdocker 
 44  import ruamel.yaml as yaml 
 45   
 46  from arvados.api import OrderedJsonModel 
 47  from arvados._version import __version__ 
 48   
 49  COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$') 
 50   
 51  logger = logging.getLogger('arvados.arv-copy') 
 52   
 53  # local_repo_dir records which git repositories from the Arvados source 
 54  # instance have been checked out locally during this run, and to which 
 55  # directories. 
 56  # e.g. if repository 'twp' from src_arv has been cloned into 
 57  # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A' 
 58  # 
 59  local_repo_dir = {} 
 60   
 61  # List of collections that have been copied in this session, and their 
 62  # destination collection UUIDs. 
 63  collections_copied = {} 
 64   
 65  # Set of (repository, script_version) two-tuples of commits copied in git. 
 66  scripts_copied = set() 
 67   
 68  # The owner_uuid of the object being copied 
 69  src_owner_uuid = None 
70 71 -def main():
72 copy_opts = argparse.ArgumentParser(add_help=False) 73 74 copy_opts.add_argument( 75 '--version', action='version', version="%s %s" % (sys.argv[0], __version__), 76 help='Print version and exit.') 77 copy_opts.add_argument( 78 '-v', '--verbose', dest='verbose', action='store_true', 79 help='Verbose output.') 80 copy_opts.add_argument( 81 '--progress', dest='progress', action='store_true', 82 help='Report progress on copying collections. (default)') 83 copy_opts.add_argument( 84 '--no-progress', dest='progress', action='store_false', 85 help='Do not report progress on copying collections.') 86 copy_opts.add_argument( 87 '-f', '--force', dest='force', action='store_true', 88 help='Perform copy even if the object appears to exist at the remote destination.') 89 copy_opts.add_argument( 90 '--force-filters', action='store_true', default=False, 91 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.") 92 copy_opts.add_argument( 93 '--src', dest='source_arvados', required=True, 94 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.') 95 copy_opts.add_argument( 96 '--dst', dest='destination_arvados', required=True, 97 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.') 98 copy_opts.add_argument( 99 '--recursive', dest='recursive', action='store_true', 100 help='Recursively copy any dependencies for this object. (default)') 101 copy_opts.add_argument( 102 '--no-recursive', dest='recursive', action='store_false', 103 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.') 104 copy_opts.add_argument( 105 '--dst-git-repo', dest='dst_git_repo', 106 help='The name of the destination git repository. Required when copying a pipeline recursively.') 107 copy_opts.add_argument( 108 '--project-uuid', dest='project_uuid', 109 help='The UUID of the project at the destination to which the pipeline should be copied.') 110 copy_opts.add_argument( 111 '--allow-git-http-src', action="store_true", 112 help='Allow cloning git repositories over insecure http') 113 copy_opts.add_argument( 114 '--allow-git-http-dst', action="store_true", 115 help='Allow pushing git repositories over insecure http') 116 117 copy_opts.add_argument( 118 'object_uuid', 119 help='The UUID of the object to be copied.') 120 copy_opts.set_defaults(progress=True) 121 copy_opts.set_defaults(recursive=True) 122 123 parser = argparse.ArgumentParser( 124 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.', 125 parents=[copy_opts, arv_cmd.retry_opt]) 126 args = parser.parse_args() 127 128 if args.verbose: 129 logger.setLevel(logging.DEBUG) 130 else: 131 logger.setLevel(logging.INFO) 132 133 # Create API clients for the source and destination instances 134 src_arv = api_for_instance(args.source_arvados) 135 dst_arv = api_for_instance(args.destination_arvados) 136 137 if not args.project_uuid: 138 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"] 139 140 # Identify the kind of object we have been given, and begin copying. 141 t = uuid_type(src_arv, args.object_uuid) 142 if t == 'Collection': 143 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args) 144 result = copy_collection(args.object_uuid, 145 src_arv, dst_arv, 146 args) 147 elif t == 'PipelineInstance': 148 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args) 149 result = copy_pipeline_instance(args.object_uuid, 150 src_arv, dst_arv, 151 args) 152 elif t == 'PipelineTemplate': 153 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args) 154 result = copy_pipeline_template(args.object_uuid, 155 src_arv, dst_arv, args) 156 elif t == 'Workflow': 157 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) 158 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) 159 else: 160 abort("cannot copy object {} of type {}".format(args.object_uuid, t)) 161 162 # Clean up any outstanding temp git repositories. 163 for d in listvalues(local_repo_dir): 164 shutil.rmtree(d, ignore_errors=True) 165 166 # If no exception was thrown and the response does not have an 167 # error_token field, presume success 168 if 'error_token' in result or 'uuid' not in result: 169 logger.error("API server returned an error result: {}".format(result)) 170 exit(1) 171 172 logger.info("") 173 logger.info("Success: created copy with uuid {}".format(result['uuid'])) 174 exit(0)
175
176 -def set_src_owner_uuid(resource, uuid, args):
177 global src_owner_uuid 178 c = resource.get(uuid=uuid).execute(num_retries=args.retries) 179 src_owner_uuid = c.get("owner_uuid")
180
181 # api_for_instance(instance_name) 182 # 183 # Creates an API client for the Arvados instance identified by 184 # instance_name. 185 # 186 # If instance_name contains a slash, it is presumed to be a path 187 # (either local or absolute) to a file with Arvados configuration 188 # settings. 189 # 190 # Otherwise, it is presumed to be the name of a file in 191 # $HOME/.config/arvados/instance_name.conf 192 # 193 -def api_for_instance(instance_name):
194 if '/' in instance_name: 195 config_file = instance_name 196 else: 197 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name)) 198 199 try: 200 cfg = arvados.config.load(config_file) 201 except (IOError, OSError) as e: 202 abort(("Could not open config file {}: {}\n" + 203 "You must make sure that your configuration tokens\n" + 204 "for Arvados instance {} are in {} and that this\n" + 205 "file is readable.").format( 206 config_file, e, instance_name, config_file)) 207 208 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg: 209 api_is_insecure = ( 210 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set( 211 ['1', 't', 'true', 'y', 'yes'])) 212 client = arvados.api('v1', 213 host=cfg['ARVADOS_API_HOST'], 214 token=cfg['ARVADOS_API_TOKEN'], 215 insecure=api_is_insecure, 216 model=OrderedJsonModel()) 217 else: 218 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name)) 219 return client
220
221 # Check if git is available 222 -def check_git_availability():
223 try: 224 arvados.util.run_command(['git', '--help']) 225 except Exception: 226 abort('git command is not available. Please ensure git is installed.')
227
228 # copy_pipeline_instance(pi_uuid, src, dst, args) 229 # 230 # Copies a pipeline instance identified by pi_uuid from src to dst. 231 # 232 # If the args.recursive option is set: 233 # 1. Copies all input collections 234 # * For each component in the pipeline, include all collections 235 # listed as job dependencies for that component) 236 # 2. Copy docker images 237 # 3. Copy git repositories 238 # 4. Copy the pipeline template 239 # 240 # The only changes made to the copied pipeline instance are: 241 # 1. The original pipeline instance UUID is preserved in 242 # the 'properties' hash as 'copied_from_pipeline_instance_uuid'. 243 # 2. The pipeline_template_uuid is changed to the new template uuid. 244 # 3. The owner_uuid of the instance is changed to the user who 245 # copied it. 246 # 247 -def copy_pipeline_instance(pi_uuid, src, dst, args):
248 # Fetch the pipeline instance record. 249 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries) 250 251 if args.recursive: 252 check_git_availability() 253 254 if not args.dst_git_repo: 255 abort('--dst-git-repo is required when copying a pipeline recursively.') 256 # Copy the pipeline template and save the copied template. 257 if pi.get('pipeline_template_uuid', None): 258 pt = copy_pipeline_template(pi['pipeline_template_uuid'], 259 src, dst, args) 260 261 # Copy input collections, docker images and git repos. 262 pi = copy_collections(pi, src, dst, args) 263 copy_git_repos(pi, src, dst, args.dst_git_repo, args) 264 copy_docker_images(pi, src, dst, args) 265 266 # Update the fields of the pipeline instance with the copied 267 # pipeline template. 268 if pi.get('pipeline_template_uuid', None): 269 pi['pipeline_template_uuid'] = pt['uuid'] 270 271 else: 272 # not recursive 273 logger.info("Copying only pipeline instance %s.", pi_uuid) 274 logger.info("You are responsible for making sure all pipeline dependencies have been updated.") 275 276 # Update the pipeline instance properties, and create the new 277 # instance at dst. 278 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid 279 pi['description'] = "Pipeline copied from {}\n\n{}".format( 280 pi_uuid, 281 pi['description'] if pi.get('description', None) else '') 282 283 pi['owner_uuid'] = args.project_uuid 284 285 del pi['uuid'] 286 287 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries) 288 return new_pi
289
290 -def filter_iter(arg):
291 """Iterate a filter string-or-list. 292 293 Pass in a filter field that can either be a string or list. 294 This will iterate elements as if the field had been written as a list. 295 """ 296 if isinstance(arg, basestring): 297 return iter((arg,)) 298 else: 299 return iter(arg)
300
301 -def migrate_repository_filter(repo_filter, src_repository, dst_repository):
302 """Update a single repository filter in-place for the destination. 303 304 If the filter checks that the repository is src_repository, it is 305 updated to check that the repository is dst_repository. If it does 306 anything else, this function raises ValueError. 307 """ 308 if src_repository is None: 309 raise ValueError("component does not specify a source repository") 310 elif dst_repository is None: 311 raise ValueError("no destination repository specified to update repository filter") 312 elif repo_filter[1:] == ['=', src_repository]: 313 repo_filter[2] = dst_repository 314 elif repo_filter[1:] == ['in', [src_repository]]: 315 repo_filter[2] = [dst_repository] 316 else: 317 raise ValueError("repository filter is not a simple source match")
318
319 -def migrate_script_version_filter(version_filter):
320 """Update a single script_version filter in-place for the destination. 321 322 Currently this function checks that all the filter operands are Git 323 commit hashes. If they're not, it raises ValueError to indicate that 324 the filter is not portable. It could be extended to make other 325 transformations in the future. 326 """ 327 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])): 328 raise ValueError("script_version filter is not limited to commit hashes")
329
330 -def attr_filtered(filter_, *attr_names):
331 """Return True if filter_ applies to any of attr_names, else False.""" 332 return any((name == 'any') or (name in attr_names) 333 for name in filter_iter(filter_[0]))
334
335 @contextlib.contextmanager 336 -def exception_handler(handler, *exc_types):
337 """If any exc_types are raised in the block, call handler on the exception.""" 338 try: 339 yield 340 except exc_types as error: 341 handler(error)
342
343 -def migrate_components_filters(template_components, dst_git_repo):
344 """Update template component filters in-place for the destination. 345 346 template_components is a dictionary of components in a pipeline template. 347 This method walks over each component's filters, and updates them to have 348 identical semantics on the destination cluster. It returns a list of 349 error strings that describe what filters could not be updated safely. 350 351 dst_git_repo is the name of the destination Git repository, which can 352 be None if that is not known. 353 """ 354 errors = [] 355 for cname, cspec in template_components.items(): 356 def add_error(errmsg): 357 errors.append("{}: {}".format(cname, errmsg))
358 if not isinstance(cspec, dict): 359 add_error("value is not a component definition") 360 continue 361 src_repository = cspec.get('repository') 362 filters = cspec.get('filters', []) 363 if not isinstance(filters, list): 364 add_error("filters are not a list") 365 continue 366 for cfilter in filters: 367 if not (isinstance(cfilter, list) and (len(cfilter) == 3)): 368 add_error("malformed filter {!r}".format(cfilter)) 369 continue 370 if attr_filtered(cfilter, 'repository'): 371 with exception_handler(add_error, ValueError): 372 migrate_repository_filter(cfilter, src_repository, dst_git_repo) 373 if attr_filtered(cfilter, 'script_version'): 374 with exception_handler(add_error, ValueError): 375 migrate_script_version_filter(cfilter) 376 return errors 377
378 # copy_pipeline_template(pt_uuid, src, dst, args) 379 # 380 # Copies a pipeline template identified by pt_uuid from src to dst. 381 # 382 # If args.recursive is True, also copy any collections, docker 383 # images and git repositories that this template references. 384 # 385 # The owner_uuid of the new template is changed to that of the user 386 # who copied the template. 387 # 388 # Returns the copied pipeline template object. 389 # 390 -def copy_pipeline_template(pt_uuid, src, dst, args):
391 # fetch the pipeline template from the source instance 392 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries) 393 394 if not args.force_filters: 395 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo) 396 if filter_errors: 397 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" + 398 "\n".join(filter_errors)) 399 400 if args.recursive: 401 check_git_availability() 402 403 if not args.dst_git_repo: 404 abort('--dst-git-repo is required when copying a pipeline recursively.') 405 # Copy input collections, docker images and git repos. 406 pt = copy_collections(pt, src, dst, args) 407 copy_git_repos(pt, src, dst, args.dst_git_repo, args) 408 copy_docker_images(pt, src, dst, args) 409 410 pt['description'] = "Pipeline template copied from {}\n\n{}".format( 411 pt_uuid, 412 pt['description'] if pt.get('description', None) else '') 413 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid) 414 del pt['uuid'] 415 416 pt['owner_uuid'] = args.project_uuid 417 418 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
419
420 # copy_workflow(wf_uuid, src, dst, args) 421 # 422 # Copies a workflow identified by wf_uuid from src to dst. 423 # 424 # If args.recursive is True, also copy any collections 425 # referenced in the workflow definition yaml. 426 # 427 # The owner_uuid of the new workflow is set to any given 428 # project_uuid or the user who copied the template. 429 # 430 # Returns the copied workflow object. 431 # 432 -def copy_workflow(wf_uuid, src, dst, args):
433 # fetch the workflow from the source instance 434 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) 435 436 # copy collections and docker images 437 if args.recursive: 438 wf_def = yaml.safe_load(wf["definition"]) 439 if wf_def is not None: 440 locations = [] 441 docker_images = {} 442 graph = wf_def.get('$graph', None) 443 if graph is not None: 444 workflow_collections(graph, locations, docker_images) 445 else: 446 workflow_collections(wf_def, locations, docker_images) 447 448 if locations: 449 copy_collections(locations, src, dst, args) 450 451 for image in docker_images: 452 copy_docker_image(image, docker_images[image], src, dst, args) 453 454 # copy the workflow itself 455 del wf['uuid'] 456 wf['owner_uuid'] = args.project_uuid 457 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
458
459 -def workflow_collections(obj, locations, docker_images):
460 if isinstance(obj, dict): 461 loc = obj.get('location', None) 462 if loc is not None: 463 if loc.startswith("keep:"): 464 locations.append(loc[5:]) 465 466 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) 467 if docker_image is not None: 468 ds = docker_image.split(":", 1) 469 tag = ds[1] if len(ds)==2 else 'latest' 470 docker_images[ds[0]] = tag 471 472 for x in obj: 473 workflow_collections(obj[x], locations, docker_images) 474 elif isinstance(obj, list): 475 for x in obj: 476 workflow_collections(x, locations, docker_images)
477
478 # copy_collections(obj, src, dst, args) 479 # 480 # Recursively copies all collections referenced by 'obj' from src 481 # to dst. obj may be a dict or a list, in which case we run 482 # copy_collections on every value it contains. If it is a string, 483 # search it for any substring that matches a collection hash or uuid 484 # (this will find hidden references to collections like 485 # "input0": "$(file 3229739b505d2b878b62aed09895a55a+142/HWI-ST1027_129_D0THKACXX.1_1.fastq)") 486 # 487 # Returns a copy of obj with any old collection uuids replaced by 488 # the new ones. 489 # 490 -def copy_collections(obj, src, dst, args):
491 492 def copy_collection_fn(collection_match): 493 """Helper function for regex substitution: copies a single collection, 494 identified by the collection_match MatchObject, to the 495 destination. Returns the destination collection uuid (or the 496 portable data hash if that's what src_id is). 497 498 """ 499 src_id = collection_match.group(0) 500 if src_id not in collections_copied: 501 dst_col = copy_collection(src_id, src, dst, args) 502 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]: 503 collections_copied[src_id] = src_id 504 else: 505 collections_copied[src_id] = dst_col['uuid'] 506 return collections_copied[src_id]
507 508 if isinstance(obj, basestring): 509 # Copy any collections identified in this string to dst, replacing 510 # them with the dst uuids as necessary. 511 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj) 512 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj) 513 return obj 514 elif isinstance(obj, dict): 515 return type(obj)((v, copy_collections(obj[v], src, dst, args)) 516 for v in obj) 517 elif isinstance(obj, list): 518 return type(obj)(copy_collections(v, src, dst, args) for v in obj) 519 return obj 520
521 -def migrate_jobspec(jobspec, src, dst, dst_repo, args):
522 """Copy a job's script to the destination repository, and update its record. 523 524 Given a jobspec dictionary, this function finds the referenced script from 525 src and copies it to dst and dst_repo. It also updates jobspec in place to 526 refer to names on the destination. 527 """ 528 repo = jobspec.get('repository') 529 if repo is None: 530 return 531 # script_version is the "script_version" parameter from the source 532 # component or job. If no script_version was supplied in the 533 # component or job, it is a mistake in the pipeline, but for the 534 # purposes of copying the repository, default to "master". 535 script_version = jobspec.get('script_version') or 'master' 536 script_key = (repo, script_version) 537 if script_key not in scripts_copied: 538 copy_git_repo(repo, src, dst, dst_repo, script_version, args) 539 scripts_copied.add(script_key) 540 jobspec['repository'] = dst_repo 541 repo_dir = local_repo_dir[repo] 542 for version_key in ['script_version', 'supplied_script_version']: 543 if version_key in jobspec: 544 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
545
546 # copy_git_repos(p, src, dst, dst_repo, args) 547 # 548 # Copies all git repositories referenced by pipeline instance or 549 # template 'p' from src to dst. 550 # 551 # For each component c in the pipeline: 552 # * Copy git repositories named in c['repository'] and c['job']['repository'] if present 553 # * Rename script versions: 554 # * c['script_version'] 555 # * c['job']['script_version'] 556 # * c['job']['supplied_script_version'] 557 # to the commit hashes they resolve to, since any symbolic 558 # names (tags, branches) are not preserved in the destination repo. 559 # 560 # The pipeline object is updated in place with the new repository 561 # names. The return value is undefined. 562 # 563 -def copy_git_repos(p, src, dst, dst_repo, args):
564 for component in p['components'].values(): 565 migrate_jobspec(component, src, dst, dst_repo, args) 566 if 'job' in component: 567 migrate_jobspec(component['job'], src, dst, dst_repo, args)
568
569 -def total_collection_size(manifest_text):
570 """Return the total number of bytes in this collection (excluding 571 duplicate blocks).""" 572 573 total_bytes = 0 574 locators_seen = {} 575 for line in manifest_text.splitlines(): 576 words = line.split() 577 for word in words[1:]: 578 try: 579 loc = arvados.KeepLocator(word) 580 except ValueError: 581 continue # this word isn't a locator, skip it 582 if loc.md5sum not in locators_seen: 583 locators_seen[loc.md5sum] = True 584 total_bytes += loc.size 585 586 return total_bytes
587
588 -def create_collection_from(c, src, dst, args):
589 """Create a new collection record on dst, and copy Docker metadata if 590 available.""" 591 592 collection_uuid = c['uuid'] 593 del c['uuid'] 594 595 if not c["name"]: 596 c['name'] = "copied from " + collection_uuid 597 598 if 'properties' in c: 599 del c['properties'] 600 601 c['owner_uuid'] = args.project_uuid 602 603 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries) 604 605 # Create docker_image_repo+tag and docker_image_hash links 606 # at the destination. 607 for link_class in ("docker_image_repo+tag", "docker_image_hash"): 608 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items'] 609 610 for src_link in docker_links: 611 body = {key: src_link[key] 612 for key in ['link_class', 'name', 'properties']} 613 body['head_uuid'] = dst_collection['uuid'] 614 body['owner_uuid'] = args.project_uuid 615 616 lk = dst.links().create(body=body).execute(num_retries=args.retries) 617 logger.debug('created dst link {}'.format(lk)) 618 619 return dst_collection
620
621 # copy_collection(obj_uuid, src, dst, args) 622 # 623 # Copies the collection identified by obj_uuid from src to dst. 624 # Returns the collection object created at dst. 625 # 626 # If args.progress is True, produce a human-friendly progress 627 # report. 628 # 629 # If a collection with the desired portable_data_hash already 630 # exists at dst, and args.force is False, copy_collection returns 631 # the existing collection without copying any blocks. Otherwise 632 # (if no collection exists or if args.force is True) 633 # copy_collection copies all of the collection data blocks from src 634 # to dst. 635 # 636 # For this application, it is critical to preserve the 637 # collection's manifest hash, which is not guaranteed with the 638 # arvados.CollectionReader and arvados.CollectionWriter classes. 639 # Copying each block in the collection manually, followed by 640 # the manifest block, ensures that the collection's manifest 641 # hash will not change. 642 # 643 -def copy_collection(obj_uuid, src, dst, args):
644 if arvados.util.keep_locator_pattern.match(obj_uuid): 645 # If the obj_uuid is a portable data hash, it might not be uniquely 646 # identified with a particular collection. As a result, it is 647 # ambigious as to what name to use for the copy. Apply some heuristics 648 # to pick which collection to get the name from. 649 srccol = src.collections().list( 650 filters=[['portable_data_hash', '=', obj_uuid]], 651 order="created_at asc" 652 ).execute(num_retries=args.retries) 653 654 items = srccol.get("items") 655 656 if not items: 657 logger.warning("Could not find collection with portable data hash %s", obj_uuid) 658 return 659 660 c = None 661 662 if len(items) == 1: 663 # There's only one collection with the PDH, so use that. 664 c = items[0] 665 if not c: 666 # See if there is a collection that's in the same project 667 # as the root item (usually a pipeline) being copied. 668 for i in items: 669 if i.get("owner_uuid") == src_owner_uuid and i.get("name"): 670 c = i 671 break 672 if not c: 673 # Didn't find any collections located in the same project, so 674 # pick the oldest collection that has a name assigned to it. 675 for i in items: 676 if i.get("name"): 677 c = i 678 break 679 if not c: 680 # None of the collections have names (?!), so just pick the 681 # first one. 682 c = items[0] 683 684 # list() doesn't return manifest text (and we don't want it to, 685 # because we don't need the same maninfest text sent to us 50 686 # times) so go and retrieve the collection object directly 687 # which will include the manifest text. 688 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries) 689 else: 690 # Assume this is an actual collection uuid, so fetch it directly. 691 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries) 692 693 # If a collection with this hash already exists at the 694 # destination, and 'force' is not true, just return that 695 # collection. 696 if not args.force: 697 if 'portable_data_hash' in c: 698 colhash = c['portable_data_hash'] 699 else: 700 colhash = c['uuid'] 701 dstcol = dst.collections().list( 702 filters=[['portable_data_hash', '=', colhash]] 703 ).execute(num_retries=args.retries) 704 if dstcol['items_available'] > 0: 705 for d in dstcol['items']: 706 if ((args.project_uuid == d['owner_uuid']) and 707 (c.get('name') == d['name']) and 708 (c['portable_data_hash'] == d['portable_data_hash'])): 709 return d 710 c['manifest_text'] = dst.collections().get( 711 uuid=dstcol['items'][0]['uuid'] 712 ).execute(num_retries=args.retries)['manifest_text'] 713 return create_collection_from(c, src, dst, args) 714 715 # Fetch the collection's manifest. 716 manifest = c['manifest_text'] 717 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest) 718 719 # Copy each block from src_keep to dst_keep. 720 # Use the newly signed locators returned from dst_keep to build 721 # a new manifest as we go. 722 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries) 723 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries) 724 dst_manifest = "" 725 dst_locators = {} 726 bytes_written = 0 727 bytes_expected = total_collection_size(manifest) 728 if args.progress: 729 progress_writer = ProgressWriter(human_progress) 730 else: 731 progress_writer = None 732 733 for line in manifest.splitlines(): 734 words = line.split() 735 dst_manifest += words[0] 736 for word in words[1:]: 737 try: 738 loc = arvados.KeepLocator(word) 739 except ValueError: 740 # If 'word' can't be parsed as a locator, 741 # presume it's a filename. 742 dst_manifest += ' ' + word 743 continue 744 blockhash = loc.md5sum 745 # copy this block if we haven't seen it before 746 # (otherwise, just reuse the existing dst_locator) 747 if blockhash not in dst_locators: 748 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size) 749 if progress_writer: 750 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 751 data = src_keep.get(word) 752 dst_locator = dst_keep.put(data) 753 dst_locators[blockhash] = dst_locator 754 bytes_written += loc.size 755 dst_manifest += ' ' + dst_locators[blockhash] 756 dst_manifest += "\n" 757 758 if progress_writer: 759 progress_writer.report(obj_uuid, bytes_written, bytes_expected) 760 progress_writer.finish() 761 762 # Copy the manifest and save the collection. 763 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest) 764 765 c['manifest_text'] = dst_manifest 766 return create_collection_from(c, src, dst, args)
767
768 -def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
769 r = api.repositories().list( 770 filters=[['name', '=', repo_name]]).execute(num_retries=retries) 771 if r['items_available'] != 1: 772 raise Exception('cannot identify repo {}; {} repos found' 773 .format(repo_name, r['items_available'])) 774 775 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")] 776 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")] 777 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")] 778 779 priority = https_url + other_url + http_url 780 781 git_config = [] 782 git_url = None 783 for url in priority: 784 if url.startswith("http"): 785 u = urllib.parse.urlsplit(url) 786 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", "")) 787 git_config = ["-c", "credential.%s/.username=none" % baseurl, 788 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl] 789 else: 790 git_config = [] 791 792 try: 793 logger.debug("trying %s", url) 794 arvados.util.run_command(["git"] + git_config + ["ls-remote", url], 795 env={"HOME": os.environ["HOME"], 796 "ARVADOS_API_TOKEN": api.api_token, 797 "GIT_ASKPASS": "/bin/false"}) 798 except arvados.errors.CommandFailedError: 799 pass 800 else: 801 git_url = url 802 break 803 804 if not git_url: 805 raise Exception('Cannot access git repository, tried {}' 806 .format(priority)) 807 808 if git_url.startswith("http:"): 809 if allow_insecure_http: 810 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt) 811 else: 812 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt)) 813 814 return (git_url, git_config)
815
816 817 # copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args) 818 # 819 # Copies commits from git repository 'src_git_repo' on Arvados 820 # instance 'src' to 'dst_git_repo' on 'dst'. Both src_git_repo 821 # and dst_git_repo are repository names, not UUIDs (i.e. "arvados" 822 # or "jsmith") 823 # 824 # All commits will be copied to a destination branch named for the 825 # source repository URL. 826 # 827 # The destination repository must already exist. 828 # 829 # The user running this command must be authenticated 830 # to both repositories. 831 # 832 -def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
833 # Identify the fetch and push URLs for the git repositories. 834 835 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src") 836 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst") 837 838 logger.debug('src_git_url: {}'.format(src_git_url)) 839 logger.debug('dst_git_url: {}'.format(dst_git_url)) 840 841 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version)) 842 843 # Copy git commits from src repo to dst repo. 844 if src_git_repo not in local_repo_dir: 845 local_repo_dir[src_git_repo] = tempfile.mkdtemp() 846 arvados.util.run_command( 847 ["git"] + src_git_config + ["clone", "--bare", src_git_url, 848 local_repo_dir[src_git_repo]], 849 cwd=os.path.dirname(local_repo_dir[src_git_repo]), 850 env={"HOME": os.environ["HOME"], 851 "ARVADOS_API_TOKEN": src.api_token, 852 "GIT_ASKPASS": "/bin/false"}) 853 arvados.util.run_command( 854 ["git", "remote", "add", "dst", dst_git_url], 855 cwd=local_repo_dir[src_git_repo]) 856 arvados.util.run_command( 857 ["git", "branch", dst_branch, script_version], 858 cwd=local_repo_dir[src_git_repo]) 859 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch], 860 cwd=local_repo_dir[src_git_repo], 861 env={"HOME": os.environ["HOME"], 862 "ARVADOS_API_TOKEN": dst.api_token, 863 "GIT_ASKPASS": "/bin/false"})
864
865 -def copy_docker_images(pipeline, src, dst, args):
866 """Copy any docker images named in the pipeline components' 867 runtime_constraints field from src to dst.""" 868 869 logger.debug('copy_docker_images: {}'.format(pipeline['uuid'])) 870 for c_name, c_info in pipeline['components'].items(): 871 if ('runtime_constraints' in c_info and 872 'docker_image' in c_info['runtime_constraints']): 873 copy_docker_image( 874 c_info['runtime_constraints']['docker_image'], 875 c_info['runtime_constraints'].get('docker_image_tag', 'latest'), 876 src, dst, args)
877
878 879 -def copy_docker_image(docker_image, docker_image_tag, src, dst, args):
880 """Copy the docker image identified by docker_image and 881 docker_image_tag from src to dst. Create appropriate 882 docker_image_repo+tag and docker_image_hash links at dst. 883 884 """ 885 886 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag)) 887 888 # Find the link identifying this docker image. 889 docker_image_list = arvados.commands.keepdocker.list_images_in_arv( 890 src, args.retries, docker_image, docker_image_tag) 891 if docker_image_list: 892 image_uuid, image_info = docker_image_list[0] 893 logger.debug('copying collection {} {}'.format(image_uuid, image_info)) 894 895 # Copy the collection it refers to. 896 dst_image_col = copy_collection(image_uuid, src, dst, args) 897 elif arvados.util.keep_locator_pattern.match(docker_image): 898 dst_image_col = copy_collection(docker_image, src, dst, args) 899 else: 900 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
901
902 # git_rev_parse(rev, repo) 903 # 904 # Returns the 40-character commit hash corresponding to 'rev' in 905 # git repository 'repo' (which must be the path of a local git 906 # repository) 907 # 908 -def git_rev_parse(rev, repo):
909 gitout, giterr = arvados.util.run_command( 910 ['git', 'rev-parse', rev], cwd=repo) 911 return gitout.strip()
912
913 # uuid_type(api, object_uuid) 914 # 915 # Returns the name of the class that object_uuid belongs to, based on 916 # the second field of the uuid. This function consults the api's 917 # schema to identify the object class. 918 # 919 # It returns a string such as 'Collection', 'PipelineInstance', etc. 920 # 921 # Special case: if handed a Keep locator hash, return 'Collection'. 922 # 923 -def uuid_type(api, object_uuid):
924 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid): 925 return 'Collection' 926 p = object_uuid.split('-') 927 if len(p) == 3: 928 type_prefix = p[1] 929 for k in api._schema.schemas: 930 obj_class = api._schema.schemas[k].get('uuidPrefix', None) 931 if type_prefix == obj_class: 932 return k 933 return None
934
935 -def abort(msg, code=1):
936 logger.info("arv-copy: %s", msg) 937 exit(code)
938
939 940 # Code for reporting on the progress of a collection upload. 941 # Stolen from arvados.commands.put.ArvPutCollectionWriter 942 # TODO(twp): figure out how to refactor into a shared library 943 # (may involve refactoring some arvados.commands.arv_copy.copy_collection 944 # code) 945 946 -def machine_progress(obj_uuid, bytes_written, bytes_expected):
947 return "{} {}: {} {} written {} total\n".format( 948 sys.argv[0], 949 os.getpid(), 950 obj_uuid, 951 bytes_written, 952 -1 if (bytes_expected is None) else bytes_expected)
953
954 -def human_progress(obj_uuid, bytes_written, bytes_expected):
955 if bytes_expected: 956 return "\r{}: {}M / {}M {:.1%} ".format( 957 obj_uuid, 958 bytes_written >> 20, bytes_expected >> 20, 959 float(bytes_written) / bytes_expected) 960 else: 961 return "\r{}: {} ".format(obj_uuid, bytes_written)
962
963 -class ProgressWriter(object):
964 _progress_func = None 965 outfile = sys.stderr 966
967 - def __init__(self, progress_func):
968 self._progress_func = progress_func
969
970 - def report(self, obj_uuid, bytes_written, bytes_expected):
971 if self._progress_func is not None: 972 self.outfile.write( 973 self._progress_func(obj_uuid, bytes_written, bytes_expected))
974
975 - def finish(self):
976 self.outfile.write("\n")
977 978 if __name__ == '__main__': 979 main() 980