1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 from __future__ import division
22 from future import standard_library
23 from future.utils import listvalues
24 standard_library.install_aliases()
25 from past.builtins import basestring
26 from builtins import object
27 import argparse
28 import contextlib
29 import getpass
30 import os
31 import re
32 import shutil
33 import sys
34 import logging
35 import tempfile
36 import urllib.parse
37
38 import arvados
39 import arvados.config
40 import arvados.keep
41 import arvados.util
42 import arvados.commands._util as arv_cmd
43 import arvados.commands.keepdocker
44 import ruamel.yaml as yaml
45
46 from arvados.api import OrderedJsonModel
47 from arvados._version import __version__
48
49 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
50
51 logger = logging.getLogger('arvados.arv-copy')
52
53
54
55
56
57
58
59 local_repo_dir = {}
60
61
62
63 collections_copied = {}
64
65
66 scripts_copied = set()
67
68
69 src_owner_uuid = None
72 copy_opts = argparse.ArgumentParser(add_help=False)
73
74 copy_opts.add_argument(
75 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
76 help='Print version and exit.')
77 copy_opts.add_argument(
78 '-v', '--verbose', dest='verbose', action='store_true',
79 help='Verbose output.')
80 copy_opts.add_argument(
81 '--progress', dest='progress', action='store_true',
82 help='Report progress on copying collections. (default)')
83 copy_opts.add_argument(
84 '--no-progress', dest='progress', action='store_false',
85 help='Do not report progress on copying collections.')
86 copy_opts.add_argument(
87 '-f', '--force', dest='force', action='store_true',
88 help='Perform copy even if the object appears to exist at the remote destination.')
89 copy_opts.add_argument(
90 '--force-filters', action='store_true', default=False,
91 help="Copy pipeline template filters verbatim, even if they act differently on the destination cluster.")
92 copy_opts.add_argument(
93 '--src', dest='source_arvados', required=True,
94 help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
95 copy_opts.add_argument(
96 '--dst', dest='destination_arvados', required=True,
97 help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
98 copy_opts.add_argument(
99 '--recursive', dest='recursive', action='store_true',
100 help='Recursively copy any dependencies for this object. (default)')
101 copy_opts.add_argument(
102 '--no-recursive', dest='recursive', action='store_false',
103 help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
104 copy_opts.add_argument(
105 '--dst-git-repo', dest='dst_git_repo',
106 help='The name of the destination git repository. Required when copying a pipeline recursively.')
107 copy_opts.add_argument(
108 '--project-uuid', dest='project_uuid',
109 help='The UUID of the project at the destination to which the pipeline should be copied.')
110 copy_opts.add_argument(
111 '--allow-git-http-src', action="store_true",
112 help='Allow cloning git repositories over insecure http')
113 copy_opts.add_argument(
114 '--allow-git-http-dst', action="store_true",
115 help='Allow pushing git repositories over insecure http')
116
117 copy_opts.add_argument(
118 'object_uuid',
119 help='The UUID of the object to be copied.')
120 copy_opts.set_defaults(progress=True)
121 copy_opts.set_defaults(recursive=True)
122
123 parser = argparse.ArgumentParser(
124 description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.',
125 parents=[copy_opts, arv_cmd.retry_opt])
126 args = parser.parse_args()
127
128 if args.verbose:
129 logger.setLevel(logging.DEBUG)
130 else:
131 logger.setLevel(logging.INFO)
132
133
134 src_arv = api_for_instance(args.source_arvados)
135 dst_arv = api_for_instance(args.destination_arvados)
136
137 if not args.project_uuid:
138 args.project_uuid = dst_arv.users().current().execute(num_retries=args.retries)["uuid"]
139
140
141 t = uuid_type(src_arv, args.object_uuid)
142 if t == 'Collection':
143 set_src_owner_uuid(src_arv.collections(), args.object_uuid, args)
144 result = copy_collection(args.object_uuid,
145 src_arv, dst_arv,
146 args)
147 elif t == 'PipelineInstance':
148 set_src_owner_uuid(src_arv.pipeline_instances(), args.object_uuid, args)
149 result = copy_pipeline_instance(args.object_uuid,
150 src_arv, dst_arv,
151 args)
152 elif t == 'PipelineTemplate':
153 set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args)
154 result = copy_pipeline_template(args.object_uuid,
155 src_arv, dst_arv, args)
156 elif t == 'Workflow':
157 set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
158 result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
159 else:
160 abort("cannot copy object {} of type {}".format(args.object_uuid, t))
161
162
163 for d in listvalues(local_repo_dir):
164 shutil.rmtree(d, ignore_errors=True)
165
166
167
168 if 'error_token' in result or 'uuid' not in result:
169 logger.error("API server returned an error result: {}".format(result))
170 exit(1)
171
172 logger.info("")
173 logger.info("Success: created copy with uuid {}".format(result['uuid']))
174 exit(0)
175
180
181
182
183
184
185
186
187
188
189
190
191
192
193 -def api_for_instance(instance_name):
194 if '/' in instance_name:
195 config_file = instance_name
196 else:
197 config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
198
199 try:
200 cfg = arvados.config.load(config_file)
201 except (IOError, OSError) as e:
202 abort(("Could not open config file {}: {}\n" +
203 "You must make sure that your configuration tokens\n" +
204 "for Arvados instance {} are in {} and that this\n" +
205 "file is readable.").format(
206 config_file, e, instance_name, config_file))
207
208 if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
209 api_is_insecure = (
210 cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
211 ['1', 't', 'true', 'y', 'yes']))
212 client = arvados.api('v1',
213 host=cfg['ARVADOS_API_HOST'],
214 token=cfg['ARVADOS_API_TOKEN'],
215 insecure=api_is_insecure,
216 model=OrderedJsonModel())
217 else:
218 abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
219 return client
220
223 try:
224 arvados.util.run_command(['git', '--help'])
225 except Exception:
226 abort('git command is not available. Please ensure git is installed.')
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 -def copy_pipeline_instance(pi_uuid, src, dst, args):
248
249 pi = src.pipeline_instances().get(uuid=pi_uuid).execute(num_retries=args.retries)
250
251 if args.recursive:
252 check_git_availability()
253
254 if not args.dst_git_repo:
255 abort('--dst-git-repo is required when copying a pipeline recursively.')
256
257 if pi.get('pipeline_template_uuid', None):
258 pt = copy_pipeline_template(pi['pipeline_template_uuid'],
259 src, dst, args)
260
261
262 pi = copy_collections(pi, src, dst, args)
263 copy_git_repos(pi, src, dst, args.dst_git_repo, args)
264 copy_docker_images(pi, src, dst, args)
265
266
267
268 if pi.get('pipeline_template_uuid', None):
269 pi['pipeline_template_uuid'] = pt['uuid']
270
271 else:
272
273 logger.info("Copying only pipeline instance %s.", pi_uuid)
274 logger.info("You are responsible for making sure all pipeline dependencies have been updated.")
275
276
277
278 pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
279 pi['description'] = "Pipeline copied from {}\n\n{}".format(
280 pi_uuid,
281 pi['description'] if pi.get('description', None) else '')
282
283 pi['owner_uuid'] = args.project_uuid
284
285 del pi['uuid']
286
287 new_pi = dst.pipeline_instances().create(body=pi, ensure_unique_name=True).execute(num_retries=args.retries)
288 return new_pi
289
291 """Iterate a filter string-or-list.
292
293 Pass in a filter field that can either be a string or list.
294 This will iterate elements as if the field had been written as a list.
295 """
296 if isinstance(arg, basestring):
297 return iter((arg,))
298 else:
299 return iter(arg)
300
302 """Update a single repository filter in-place for the destination.
303
304 If the filter checks that the repository is src_repository, it is
305 updated to check that the repository is dst_repository. If it does
306 anything else, this function raises ValueError.
307 """
308 if src_repository is None:
309 raise ValueError("component does not specify a source repository")
310 elif dst_repository is None:
311 raise ValueError("no destination repository specified to update repository filter")
312 elif repo_filter[1:] == ['=', src_repository]:
313 repo_filter[2] = dst_repository
314 elif repo_filter[1:] == ['in', [src_repository]]:
315 repo_filter[2] = [dst_repository]
316 else:
317 raise ValueError("repository filter is not a simple source match")
318
320 """Update a single script_version filter in-place for the destination.
321
322 Currently this function checks that all the filter operands are Git
323 commit hashes. If they're not, it raises ValueError to indicate that
324 the filter is not portable. It could be extended to make other
325 transformations in the future.
326 """
327 if not all(COMMIT_HASH_RE.match(v) for v in filter_iter(version_filter[2])):
328 raise ValueError("script_version filter is not limited to commit hashes")
329
331 """Return True if filter_ applies to any of attr_names, else False."""
332 return any((name == 'any') or (name in attr_names)
333 for name in filter_iter(filter_[0]))
334
337 """If any exc_types are raised in the block, call handler on the exception."""
338 try:
339 yield
340 except exc_types as error:
341 handler(error)
342
344 """Update template component filters in-place for the destination.
345
346 template_components is a dictionary of components in a pipeline template.
347 This method walks over each component's filters, and updates them to have
348 identical semantics on the destination cluster. It returns a list of
349 error strings that describe what filters could not be updated safely.
350
351 dst_git_repo is the name of the destination Git repository, which can
352 be None if that is not known.
353 """
354 errors = []
355 for cname, cspec in template_components.items():
356 def add_error(errmsg):
357 errors.append("{}: {}".format(cname, errmsg))
358 if not isinstance(cspec, dict):
359 add_error("value is not a component definition")
360 continue
361 src_repository = cspec.get('repository')
362 filters = cspec.get('filters', [])
363 if not isinstance(filters, list):
364 add_error("filters are not a list")
365 continue
366 for cfilter in filters:
367 if not (isinstance(cfilter, list) and (len(cfilter) == 3)):
368 add_error("malformed filter {!r}".format(cfilter))
369 continue
370 if attr_filtered(cfilter, 'repository'):
371 with exception_handler(add_error, ValueError):
372 migrate_repository_filter(cfilter, src_repository, dst_git_repo)
373 if attr_filtered(cfilter, 'script_version'):
374 with exception_handler(add_error, ValueError):
375 migrate_script_version_filter(cfilter)
376 return errors
377
391
392 pt = src.pipeline_templates().get(uuid=pt_uuid).execute(num_retries=args.retries)
393
394 if not args.force_filters:
395 filter_errors = migrate_components_filters(pt['components'], args.dst_git_repo)
396 if filter_errors:
397 abort("Template filters cannot be copied safely. Use --force-filters to copy anyway.\n" +
398 "\n".join(filter_errors))
399
400 if args.recursive:
401 check_git_availability()
402
403 if not args.dst_git_repo:
404 abort('--dst-git-repo is required when copying a pipeline recursively.')
405
406 pt = copy_collections(pt, src, dst, args)
407 copy_git_repos(pt, src, dst, args.dst_git_repo, args)
408 copy_docker_images(pt, src, dst, args)
409
410 pt['description'] = "Pipeline template copied from {}\n\n{}".format(
411 pt_uuid,
412 pt['description'] if pt.get('description', None) else '')
413 pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
414 del pt['uuid']
415
416 pt['owner_uuid'] = args.project_uuid
417
418 return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries)
419
420
421
422
423
424
425
426
427
428
429
430
431
432 -def copy_workflow(wf_uuid, src, dst, args):
433
434 wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
435
436
437 if args.recursive:
438 wf_def = yaml.safe_load(wf["definition"])
439 if wf_def is not None:
440 locations = []
441 docker_images = {}
442 graph = wf_def.get('$graph', None)
443 if graph is not None:
444 workflow_collections(graph, locations, docker_images)
445 else:
446 workflow_collections(wf_def, locations, docker_images)
447
448 if locations:
449 copy_collections(locations, src, dst, args)
450
451 for image in docker_images:
452 copy_docker_image(image, docker_images[image], src, dst, args)
453
454
455 del wf['uuid']
456 wf['owner_uuid'] = args.project_uuid
457 return dst.workflows().create(body=wf).execute(num_retries=args.retries)
458
460 if isinstance(obj, dict):
461 loc = obj.get('location', None)
462 if loc is not None:
463 if loc.startswith("keep:"):
464 locations.append(loc[5:])
465
466 docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
467 if docker_image is not None:
468 ds = docker_image.split(":", 1)
469 tag = ds[1] if len(ds)==2 else 'latest'
470 docker_images[ds[0]] = tag
471
472 for x in obj:
473 workflow_collections(obj[x], locations, docker_images)
474 elif isinstance(obj, list):
475 for x in obj:
476 workflow_collections(x, locations, docker_images)
477
478
479
480
481
482
483
484
485
486
487
488
489
490 -def copy_collections(obj, src, dst, args):
491
492 def copy_collection_fn(collection_match):
493 """Helper function for regex substitution: copies a single collection,
494 identified by the collection_match MatchObject, to the
495 destination. Returns the destination collection uuid (or the
496 portable data hash if that's what src_id is).
497
498 """
499 src_id = collection_match.group(0)
500 if src_id not in collections_copied:
501 dst_col = copy_collection(src_id, src, dst, args)
502 if src_id in [dst_col['uuid'], dst_col['portable_data_hash']]:
503 collections_copied[src_id] = src_id
504 else:
505 collections_copied[src_id] = dst_col['uuid']
506 return collections_copied[src_id]
507
508 if isinstance(obj, basestring):
509
510
511 obj = arvados.util.portable_data_hash_pattern.sub(copy_collection_fn, obj)
512 obj = arvados.util.collection_uuid_pattern.sub(copy_collection_fn, obj)
513 return obj
514 elif isinstance(obj, dict):
515 return type(obj)((v, copy_collections(obj[v], src, dst, args))
516 for v in obj)
517 elif isinstance(obj, list):
518 return type(obj)(copy_collections(v, src, dst, args) for v in obj)
519 return obj
520
522 """Copy a job's script to the destination repository, and update its record.
523
524 Given a jobspec dictionary, this function finds the referenced script from
525 src and copies it to dst and dst_repo. It also updates jobspec in place to
526 refer to names on the destination.
527 """
528 repo = jobspec.get('repository')
529 if repo is None:
530 return
531
532
533
534
535 script_version = jobspec.get('script_version') or 'master'
536 script_key = (repo, script_version)
537 if script_key not in scripts_copied:
538 copy_git_repo(repo, src, dst, dst_repo, script_version, args)
539 scripts_copied.add(script_key)
540 jobspec['repository'] = dst_repo
541 repo_dir = local_repo_dir[repo]
542 for version_key in ['script_version', 'supplied_script_version']:
543 if version_key in jobspec:
544 jobspec[version_key] = git_rev_parse(jobspec[version_key], repo_dir)
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563 -def copy_git_repos(p, src, dst, dst_repo, args):
564 for component in p['components'].values():
565 migrate_jobspec(component, src, dst, dst_repo, args)
566 if 'job' in component:
567 migrate_jobspec(component['job'], src, dst, dst_repo, args)
568
570 """Return the total number of bytes in this collection (excluding
571 duplicate blocks)."""
572
573 total_bytes = 0
574 locators_seen = {}
575 for line in manifest_text.splitlines():
576 words = line.split()
577 for word in words[1:]:
578 try:
579 loc = arvados.KeepLocator(word)
580 except ValueError:
581 continue
582 if loc.md5sum not in locators_seen:
583 locators_seen[loc.md5sum] = True
584 total_bytes += loc.size
585
586 return total_bytes
587
589 """Create a new collection record on dst, and copy Docker metadata if
590 available."""
591
592 collection_uuid = c['uuid']
593 del c['uuid']
594
595 if not c["name"]:
596 c['name'] = "copied from " + collection_uuid
597
598 if 'properties' in c:
599 del c['properties']
600
601 c['owner_uuid'] = args.project_uuid
602
603 dst_collection = dst.collections().create(body=c, ensure_unique_name=True).execute(num_retries=args.retries)
604
605
606
607 for link_class in ("docker_image_repo+tag", "docker_image_hash"):
608 docker_links = src.links().list(filters=[["head_uuid", "=", collection_uuid], ["link_class", "=", link_class]]).execute(num_retries=args.retries)['items']
609
610 for src_link in docker_links:
611 body = {key: src_link[key]
612 for key in ['link_class', 'name', 'properties']}
613 body['head_uuid'] = dst_collection['uuid']
614 body['owner_uuid'] = args.project_uuid
615
616 lk = dst.links().create(body=body).execute(num_retries=args.retries)
617 logger.debug('created dst link {}'.format(lk))
618
619 return dst_collection
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643 -def copy_collection(obj_uuid, src, dst, args):
644 if arvados.util.keep_locator_pattern.match(obj_uuid):
645
646
647
648
649 srccol = src.collections().list(
650 filters=[['portable_data_hash', '=', obj_uuid]],
651 order="created_at asc"
652 ).execute(num_retries=args.retries)
653
654 items = srccol.get("items")
655
656 if not items:
657 logger.warning("Could not find collection with portable data hash %s", obj_uuid)
658 return
659
660 c = None
661
662 if len(items) == 1:
663
664 c = items[0]
665 if not c:
666
667
668 for i in items:
669 if i.get("owner_uuid") == src_owner_uuid and i.get("name"):
670 c = i
671 break
672 if not c:
673
674
675 for i in items:
676 if i.get("name"):
677 c = i
678 break
679 if not c:
680
681
682 c = items[0]
683
684
685
686
687
688 c = src.collections().get(uuid=c["uuid"]).execute(num_retries=args.retries)
689 else:
690
691 c = src.collections().get(uuid=obj_uuid).execute(num_retries=args.retries)
692
693
694
695
696 if not args.force:
697 if 'portable_data_hash' in c:
698 colhash = c['portable_data_hash']
699 else:
700 colhash = c['uuid']
701 dstcol = dst.collections().list(
702 filters=[['portable_data_hash', '=', colhash]]
703 ).execute(num_retries=args.retries)
704 if dstcol['items_available'] > 0:
705 for d in dstcol['items']:
706 if ((args.project_uuid == d['owner_uuid']) and
707 (c.get('name') == d['name']) and
708 (c['portable_data_hash'] == d['portable_data_hash'])):
709 return d
710 c['manifest_text'] = dst.collections().get(
711 uuid=dstcol['items'][0]['uuid']
712 ).execute(num_retries=args.retries)['manifest_text']
713 return create_collection_from(c, src, dst, args)
714
715
716 manifest = c['manifest_text']
717 logger.debug("Copying collection %s with manifest: <%s>", obj_uuid, manifest)
718
719
720
721
722 src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
723 dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
724 dst_manifest = ""
725 dst_locators = {}
726 bytes_written = 0
727 bytes_expected = total_collection_size(manifest)
728 if args.progress:
729 progress_writer = ProgressWriter(human_progress)
730 else:
731 progress_writer = None
732
733 for line in manifest.splitlines():
734 words = line.split()
735 dst_manifest += words[0]
736 for word in words[1:]:
737 try:
738 loc = arvados.KeepLocator(word)
739 except ValueError:
740
741
742 dst_manifest += ' ' + word
743 continue
744 blockhash = loc.md5sum
745
746
747 if blockhash not in dst_locators:
748 logger.debug("Copying block %s (%s bytes)", blockhash, loc.size)
749 if progress_writer:
750 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
751 data = src_keep.get(word)
752 dst_locator = dst_keep.put(data)
753 dst_locators[blockhash] = dst_locator
754 bytes_written += loc.size
755 dst_manifest += ' ' + dst_locators[blockhash]
756 dst_manifest += "\n"
757
758 if progress_writer:
759 progress_writer.report(obj_uuid, bytes_written, bytes_expected)
760 progress_writer.finish()
761
762
763 logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
764
765 c['manifest_text'] = dst_manifest
766 return create_collection_from(c, src, dst, args)
767
768 -def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
769 r = api.repositories().list(
770 filters=[['name', '=', repo_name]]).execute(num_retries=retries)
771 if r['items_available'] != 1:
772 raise Exception('cannot identify repo {}; {} repos found'
773 .format(repo_name, r['items_available']))
774
775 https_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("https:")]
776 http_url = [c for c in r['items'][0]["clone_urls"] if c.startswith("http:")]
777 other_url = [c for c in r['items'][0]["clone_urls"] if not c.startswith("http")]
778
779 priority = https_url + other_url + http_url
780
781 git_config = []
782 git_url = None
783 for url in priority:
784 if url.startswith("http"):
785 u = urllib.parse.urlsplit(url)
786 baseurl = urllib.parse.urlunsplit((u.scheme, u.netloc, "", "", ""))
787 git_config = ["-c", "credential.%s/.username=none" % baseurl,
788 "-c", "credential.%s/.helper=!cred(){ cat >/dev/null; if [ \"$1\" = get ]; then echo password=$ARVADOS_API_TOKEN; fi; };cred" % baseurl]
789 else:
790 git_config = []
791
792 try:
793 logger.debug("trying %s", url)
794 arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
795 env={"HOME": os.environ["HOME"],
796 "ARVADOS_API_TOKEN": api.api_token,
797 "GIT_ASKPASS": "/bin/false"})
798 except arvados.errors.CommandFailedError:
799 pass
800 else:
801 git_url = url
802 break
803
804 if not git_url:
805 raise Exception('Cannot access git repository, tried {}'
806 .format(priority))
807
808 if git_url.startswith("http:"):
809 if allow_insecure_http:
810 logger.warning("Using insecure git url %s but will allow this because %s", git_url, allow_insecure_http_opt)
811 else:
812 raise Exception("Refusing to use insecure git url %s, use %s if you really want this." % (git_url, allow_insecure_http_opt))
813
814 return (git_url, git_config)
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832 -def copy_git_repo(src_git_repo, src, dst, dst_git_repo, script_version, args):
833
834
835 (src_git_url, src_git_config) = select_git_url(src, src_git_repo, args.retries, args.allow_git_http_src, "--allow-git-http-src")
836 (dst_git_url, dst_git_config) = select_git_url(dst, dst_git_repo, args.retries, args.allow_git_http_dst, "--allow-git-http-dst")
837
838 logger.debug('src_git_url: {}'.format(src_git_url))
839 logger.debug('dst_git_url: {}'.format(dst_git_url))
840
841 dst_branch = re.sub(r'\W+', '_', "{}_{}".format(src_git_url, script_version))
842
843
844 if src_git_repo not in local_repo_dir:
845 local_repo_dir[src_git_repo] = tempfile.mkdtemp()
846 arvados.util.run_command(
847 ["git"] + src_git_config + ["clone", "--bare", src_git_url,
848 local_repo_dir[src_git_repo]],
849 cwd=os.path.dirname(local_repo_dir[src_git_repo]),
850 env={"HOME": os.environ["HOME"],
851 "ARVADOS_API_TOKEN": src.api_token,
852 "GIT_ASKPASS": "/bin/false"})
853 arvados.util.run_command(
854 ["git", "remote", "add", "dst", dst_git_url],
855 cwd=local_repo_dir[src_git_repo])
856 arvados.util.run_command(
857 ["git", "branch", dst_branch, script_version],
858 cwd=local_repo_dir[src_git_repo])
859 arvados.util.run_command(["git"] + dst_git_config + ["push", "dst", dst_branch],
860 cwd=local_repo_dir[src_git_repo],
861 env={"HOME": os.environ["HOME"],
862 "ARVADOS_API_TOKEN": dst.api_token,
863 "GIT_ASKPASS": "/bin/false"})
864
866 """Copy any docker images named in the pipeline components'
867 runtime_constraints field from src to dst."""
868
869 logger.debug('copy_docker_images: {}'.format(pipeline['uuid']))
870 for c_name, c_info in pipeline['components'].items():
871 if ('runtime_constraints' in c_info and
872 'docker_image' in c_info['runtime_constraints']):
873 copy_docker_image(
874 c_info['runtime_constraints']['docker_image'],
875 c_info['runtime_constraints'].get('docker_image_tag', 'latest'),
876 src, dst, args)
877
880 """Copy the docker image identified by docker_image and
881 docker_image_tag from src to dst. Create appropriate
882 docker_image_repo+tag and docker_image_hash links at dst.
883
884 """
885
886 logger.debug('copying docker image {}:{}'.format(docker_image, docker_image_tag))
887
888
889 docker_image_list = arvados.commands.keepdocker.list_images_in_arv(
890 src, args.retries, docker_image, docker_image_tag)
891 if docker_image_list:
892 image_uuid, image_info = docker_image_list[0]
893 logger.debug('copying collection {} {}'.format(image_uuid, image_info))
894
895
896 dst_image_col = copy_collection(image_uuid, src, dst, args)
897 elif arvados.util.keep_locator_pattern.match(docker_image):
898 dst_image_col = copy_collection(docker_image, src, dst, args)
899 else:
900 logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
901
909 gitout, giterr = arvados.util.run_command(
910 ['git', 'rev-parse', rev], cwd=repo)
911 return gitout.strip()
912
913
914
915
916
917
918
919
920
921
922
923 -def uuid_type(api, object_uuid):
924 if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
925 return 'Collection'
926 p = object_uuid.split('-')
927 if len(p) == 3:
928 type_prefix = p[1]
929 for k in api._schema.schemas:
930 obj_class = api._schema.schemas[k].get('uuidPrefix', None)
931 if type_prefix == obj_class:
932 return k
933 return None
934
936 logger.info("arv-copy: %s", msg)
937 exit(code)
938
939
940
941
942
943
944
945
946 -def machine_progress(obj_uuid, bytes_written, bytes_expected):
947 return "{} {}: {} {} written {} total\n".format(
948 sys.argv[0],
949 os.getpid(),
950 obj_uuid,
951 bytes_written,
952 -1 if (bytes_expected is None) else bytes_expected)
953
955 if bytes_expected:
956 return "\r{}: {}M / {}M {:.1%} ".format(
957 obj_uuid,
958 bytes_written >> 20, bytes_expected >> 20,
959 float(bytes_written) / bytes_expected)
960 else:
961 return "\r{}: {} ".format(obj_uuid, bytes_written)
962
977
978 if __name__ == '__main__':
979 main()
980