1
2
3
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import copy
14 import datetime
15 import errno
16 import fcntl
17 import fnmatch
18 import hashlib
19 import json
20 import logging
21 import os
22 import pwd
23 import re
24 import signal
25 import socket
26 import sys
27 import tempfile
28 import threading
29 import time
30 import traceback
31
32 from apiclient import errors as apiclient_errors
33 from arvados._version import __version__
34
35 import arvados.commands._util as arv_cmd
36
37 api_client = None
38
39 upload_opts = argparse.ArgumentParser(add_help=False)
40
41 upload_opts.add_argument('--version', action='version',
42 version="%s %s" % (sys.argv[0], __version__),
43 help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
45 help="""
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54 default=-1, help=argparse.SUPPRESS)
55
56 _group.add_argument('--normalize', action='store_true',
57 help="""
58 Normalize the manifest by re-ordering files and streams after writing
59 data.
60 """)
61
62 _group.add_argument('--dry-run', action='store_true', default=False,
63 help="""
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
66 """)
67
68 _group = upload_opts.add_mutually_exclusive_group()
69
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
71 help="""
72 Synonym for --stream.
73 """)
74
75 _group.add_argument('--stream', action='store_true',
76 help="""
77 Store the file content and display the resulting manifest on
78 stdout. Do not write the manifest to Keep or save a Collection object
79 in Arvados.
80 """)
81
82 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
83 help="""
84 Synonym for --manifest.
85 """)
86
87 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
88 help="""
89 Synonym for --manifest.
90 """)
91
92 _group.add_argument('--manifest', action='store_true',
93 help="""
94 Store the file data and resulting manifest in Keep, save a Collection
95 object in Arvados, and display the manifest locator (Collection uuid)
96 on stdout. This is the default behavior.
97 """)
98
99 _group.add_argument('--as-raw', action='store_true', dest='raw',
100 help="""
101 Synonym for --raw.
102 """)
103
104 _group.add_argument('--raw', action='store_true',
105 help="""
106 Store the file content and display the data block locators on stdout,
107 separated by commas, with a trailing newline. Do not store a
108 manifest.
109 """)
110
111 upload_opts.add_argument('--update-collection', type=str, default=None,
112 dest='update_collection', metavar="UUID", help="""
113 Update an existing collection identified by the given Arvados collection
114 UUID. All new local files will be uploaded.
115 """)
116
117 upload_opts.add_argument('--use-filename', type=str, default=None,
118 dest='filename', help="""
119 Synonym for --filename.
120 """)
121
122 upload_opts.add_argument('--filename', type=str, default=None,
123 help="""
124 Use the given filename in the manifest, instead of the name of the
125 local file. This is useful when "-" or "/dev/stdin" is given as an
126 input file. It can be used only if there is exactly one path given and
127 it is not a directory. Implies --manifest.
128 """)
129
130 upload_opts.add_argument('--portable-data-hash', action='store_true',
131 help="""
132 Print the portable data hash instead of the Arvados UUID for the collection
133 created by the upload.
134 """)
135
136 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
137 help="""
138 Set the replication level for the new collection: how many different
139 physical storage devices (e.g., disks) should have a copy of each data
140 block. Default is to use the server-provided default (if any) or 2.
141 """)
142
143 upload_opts.add_argument('--storage-classes', help="""
144 Specify comma separated list of storage classes to be used when saving data to Keep.
145 """)
146
147 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
148 help="""
149 Set the number of upload threads to be used. Take into account that
150 using lots of threads will increase the RAM requirements. Default is
151 to use 2 threads.
152 On high latency installations, using a greater number will improve
153 overall throughput.
154 """)
155
156 run_opts = argparse.ArgumentParser(add_help=False)
157
158 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
159 Store the collection in the specified project, instead of your Home
160 project.
161 """)
162
163 run_opts.add_argument('--name', help="""
164 Save the collection with the specified name.
165 """)
166
167 run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
168 action='append', help="""
169 Exclude files and directories whose names match the given glob pattern. When
170 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
171 directory, relative to the provided input dirs will be excluded.
172 When using a filename pattern like '*.txt', any text file will be excluded
173 no matter where is placed.
174 For the special case of needing to exclude only files or dirs directly below
175 the given input directory, you can use a pattern like './exclude_this.gif'.
176 You can specify multiple patterns by using this argument more than once.
177 """)
178
179 _group = run_opts.add_mutually_exclusive_group()
180 _group.add_argument('--progress', action='store_true',
181 help="""
182 Display human-readable progress on stderr (bytes and, if possible,
183 percentage of total data size). This is the default behavior when
184 stderr is a tty.
185 """)
186
187 _group.add_argument('--no-progress', action='store_true',
188 help="""
189 Do not display human-readable progress on stderr, even if stderr is a
190 tty.
191 """)
192
193 _group.add_argument('--batch-progress', action='store_true',
194 help="""
195 Display machine-readable progress on stderr (bytes and, if known,
196 total data size).
197 """)
198
199 run_opts.add_argument('--silent', action='store_true',
200 help="""
201 Do not print any debug messages to console. (Any error messages will
202 still be displayed.)
203 """)
204
205 _group = run_opts.add_mutually_exclusive_group()
206 _group.add_argument('--resume', action='store_true', default=True,
207 help="""
208 Continue interrupted uploads from cached state (default).
209 """)
210 _group.add_argument('--no-resume', action='store_false', dest='resume',
211 help="""
212 Do not continue interrupted uploads from cached state.
213 """)
214
215 _group = run_opts.add_mutually_exclusive_group()
216 _group.add_argument('--follow-links', action='store_true', default=True,
217 dest='follow_links', help="""
218 Follow file and directory symlinks (default).
219 """)
220 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
221 help="""
222 Do not follow file and directory symlinks.
223 """)
224
225 _group = run_opts.add_mutually_exclusive_group()
226 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
227 help="""
228 Save upload state in a cache file for resuming (default).
229 """)
230 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
231 help="""
232 Do not save upload state in a cache file for resuming.
233 """)
234
235 arg_parser = argparse.ArgumentParser(
236 description='Copy data from the local filesystem to Keep.',
237 parents=[upload_opts, run_opts, arv_cmd.retry_opt])
240 args = arg_parser.parse_args(arguments)
241
242 if len(args.paths) == 0:
243 args.paths = ['-']
244
245 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
246
247 if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
248 if args.filename:
249 arg_parser.error("""
250 --filename argument cannot be used when storing a directory or
251 multiple files.
252 """)
253
254
255 if (not (args.batch_progress or args.no_progress or args.silent)
256 and os.isatty(sys.stderr.fileno())):
257 args.progress = True
258
259
260 if not args.use_cache:
261 args.resume = False
262
263 if args.paths == ['-']:
264 if args.update_collection:
265 arg_parser.error("""
266 --update-collection cannot be used when reading from stdin.
267 """)
268 args.resume = False
269 args.use_cache = False
270 if not args.filename:
271 args.filename = 'stdin'
272
273
274 if len(args.exclude) > 0:
275 args.exclude = list(set(args.exclude))
276
277 return args
278
282
286
290
294
298
302
306 list.__init__(self)
307 self.dry_run = dry_run
308
313
331
334 CACHE_DIR = '.cache/arvados/arv-put'
335
337 self.cache_file = open(file_spec, 'a+')
338 self._lock_file(self.cache_file)
339 self.filename = self.cache_file.name
340
341 @classmethod
343 md5 = hashlib.md5()
344 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
345 realpaths = sorted(os.path.realpath(path) for path in args.paths)
346 md5.update(b'\0'.join([p.encode() for p in realpaths]))
347 if any(os.path.isdir(path) for path in realpaths):
348 md5.update(b'-1')
349 elif args.filename:
350 md5.update(args.filename.encode())
351 return os.path.join(
352 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
353 md5.hexdigest())
354
356 try:
357 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
358 except IOError:
359 raise ResumeCacheConflict("{} locked".format(fileobj.name))
360
362 self.cache_file.seek(0)
363 return json.load(self.cache_file)
364
365 - def check_cache(self, api_client=None, num_retries=0):
381
382 - def save(self, data):
383 try:
384 new_cache_fd, new_cache_name = tempfile.mkstemp(
385 dir=os.path.dirname(self.filename))
386 self._lock_file(new_cache_fd)
387 new_cache = os.fdopen(new_cache_fd, 'r+')
388 json.dump(data, new_cache)
389 os.rename(new_cache_name, self.filename)
390 except (IOError, OSError, ResumeCacheConflict) as error:
391 try:
392 os.unlink(new_cache_name)
393 except NameError:
394 pass
395 else:
396 self.cache_file.close()
397 self.cache_file = new_cache
398
400 self.cache_file.close()
401
403 try:
404 os.unlink(self.filename)
405 except OSError as error:
406 if error.errno != errno.ENOENT:
407 raise
408 self.close()
409
413
416 CACHE_DIR = '.cache/arvados/arv-put'
417 EMPTY_STATE = {
418 'manifest' : None,
419 'files' : {}
420 }
421
422 - def __init__(self, paths, resume=True, use_cache=True, reporter=None,
423 name=None, owner_uuid=None, api_client=None,
424 ensure_unique_name=False, num_retries=None,
425 put_threads=None, replication_desired=None, filename=None,
426 update_time=60.0, update_collection=None, storage_classes=None,
427 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
428 follow_links=True, exclude_paths=[], exclude_names=None):
429 self.paths = paths
430 self.resume = resume
431 self.use_cache = use_cache
432 self.update = False
433 self.reporter = reporter
434
435
436 self.bytes_expected = None
437 self.bytes_written = 0
438 self.bytes_skipped = 0
439 self.name = name
440 self.owner_uuid = owner_uuid
441 self.ensure_unique_name = ensure_unique_name
442 self.num_retries = num_retries
443 self.replication_desired = replication_desired
444 self.put_threads = put_threads
445 self.filename = filename
446 self.storage_classes = storage_classes
447 self._api_client = api_client
448 self._state_lock = threading.Lock()
449 self._state = None
450 self._current_files = []
451 self._cache_file = None
452 self._collection_lock = threading.Lock()
453 self._remote_collection = None
454 self._local_collection = None
455 self._file_paths = set()
456 self._stop_checkpointer = threading.Event()
457 self._checkpointer = threading.Thread(target=self._update_task)
458 self._checkpointer.daemon = True
459 self._update_task_time = update_time
460 self._files_to_upload = FileUploadList(dry_run=dry_run)
461 self._upload_started = False
462 self.logger = logger
463 self.dry_run = dry_run
464 self._checkpoint_before_quit = True
465 self.follow_links = follow_links
466 self.exclude_paths = exclude_paths
467 self.exclude_names = exclude_names
468
469 if not self.use_cache and self.resume:
470 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
471
472
473 if self.dry_run and (not self.use_cache or not self.resume):
474 raise ArvPutUploadIsPending()
475
476
477 self._setup_state(update_collection)
478
479
480
481 self._build_upload_list()
482
484 """
485 Scan the requested paths to count file sizes, excluding files & dirs if requested
486 and building the upload file list.
487 """
488
489
490 if not any([p for p in self.paths
491 if not (os.path.isfile(p) or os.path.isdir(p))]):
492 self.bytes_expected = 0
493
494 for path in self.paths:
495
496 if path == '-':
497 if self.dry_run:
498 raise ArvPutUploadIsPending()
499 self._write_stdin(self.filename or 'stdin')
500 elif not os.path.exists(path):
501 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
502 elif os.path.isdir(path):
503
504
505 orig_path = path
506 path = os.path.abspath(path)
507 if orig_path[-1:] == os.sep:
508
509
510
511 prefixdir = path
512 else:
513
514
515 prefixdir = os.path.dirname(path)
516 prefixdir += os.sep
517 for root, dirs, files in os.walk(path,
518 followlinks=self.follow_links):
519 root_relpath = os.path.relpath(root, path)
520 if root_relpath == '.':
521 root_relpath = ''
522
523 if self.exclude_paths:
524 dirs[:] = [d for d in dirs
525 if not any(pathname_match(
526 os.path.join(root_relpath, d), pat)
527 for pat in self.exclude_paths)]
528 files = [f for f in files
529 if not any(pathname_match(
530 os.path.join(root_relpath, f), pat)
531 for pat in self.exclude_paths)]
532
533 if self.exclude_names is not None:
534 dirs[:] = [d for d in dirs
535 if not self.exclude_names.match(d)]
536 files = [f for f in files
537 if not self.exclude_names.match(f)]
538
539 dirs.sort()
540 files.sort()
541 for f in files:
542 filepath = os.path.join(root, f)
543
544 if self.follow_links or (not os.path.islink(filepath)):
545 if self.bytes_expected is not None:
546 self.bytes_expected += os.path.getsize(filepath)
547 self._check_file(filepath,
548 os.path.join(root[len(prefixdir):], f))
549 else:
550 filepath = os.path.abspath(path)
551
552 if self.follow_links or (not os.path.islink(filepath)):
553 if self.bytes_expected is not None:
554 self.bytes_expected += os.path.getsize(filepath)
555 self._check_file(filepath,
556 self.filename or os.path.basename(path))
557
558
559 if self.dry_run:
560 raise ArvPutUploadNotPending()
561
562
563 for f in self.collection_file_paths(self._local_collection,
564 path_prefix=""):
565 if f != 'stdin' and f != self.filename and not f in self._file_paths:
566 self._local_collection.remove(f)
567
568 - def start(self, save_collection):
569 """
570 Start supporting thread & file uploading
571 """
572 self._checkpointer.start()
573 try:
574
575
576 self._update()
577
578 self._upload_started = True
579 self._upload_files()
580 except (SystemExit, Exception) as e:
581 self._checkpoint_before_quit = False
582
583
584
585
586
587 if isinstance(e, PathDoesNotExistError):
588
589 pass
590 elif not isinstance(e, SystemExit) or e.code != -2:
591 self.logger.warning("Abnormal termination:\n{}".format(
592 traceback.format_exc()))
593 raise
594 finally:
595 if not self.dry_run:
596
597 self._stop_checkpointer.set()
598 self._checkpointer.join()
599 if self._checkpoint_before_quit:
600
601 self._local_collection.manifest_text()
602 self._update(final=True)
603 if save_collection:
604 self.save_collection()
605 if self.use_cache:
606 self._cache_file.close()
607
609 if self.update:
610
611 for fp in self._file_paths:
612 remote_file = self._remote_collection.find(fp)
613 if not remote_file:
614
615 self._remote_collection.copy(fp, fp, self._local_collection)
616 elif remote_file != self._local_collection.find(fp):
617
618 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
619 else:
620
621 pass
622 self._remote_collection.save(storage_classes=self.storage_classes,
623 num_retries=self.num_retries)
624 else:
625 if self.storage_classes is None:
626 self.storage_classes = ['default']
627 self._local_collection.save_new(
628 name=self.name, owner_uuid=self.owner_uuid,
629 storage_classes=self.storage_classes,
630 ensure_unique_name=self.ensure_unique_name,
631 num_retries=self.num_retries)
632
634 if self.use_cache:
635 try:
636 os.unlink(self._cache_filename)
637 except OSError as error:
638
639 if error.errno != errno.ENOENT:
640 raise
641 self._cache_file.close()
642
654
656 """
657 Periodically called support task. File uploading is
658 asynchronous so we poll status from the collection.
659 """
660 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
661 self._update()
662
664 """
665 Update cached manifest text and report progress.
666 """
667 if self._upload_started:
668 with self._collection_lock:
669 self.bytes_written = self._collection_size(self._local_collection)
670 if self.use_cache:
671 if final:
672 manifest = self._local_collection.manifest_text()
673 else:
674
675 manifest = self._local_collection.manifest_text(strip=False,
676 normalize=False,
677 only_committed=True)
678
679 with self._state_lock:
680 self._state['manifest'] = manifest
681 if self.use_cache:
682 try:
683 self._save_state()
684 except Exception as e:
685 self.logger.error("Unexpected error trying to save cache file: {}".format(e))
686 else:
687 self.bytes_written = self.bytes_skipped
688
689 self.report_progress()
690
692 if self.reporter is not None:
693 self.reporter(self.bytes_written, self.bytes_expected)
694
696 output = self._local_collection.open(filename, 'wb')
697 self._write(sys.stdin, output)
698 output.close()
699
701 """
702 Check if this file needs to be uploaded
703 """
704
705 if (not self.follow_links) and os.path.islink(source):
706 return
707 resume_offset = 0
708 should_upload = False
709 new_file_in_cache = False
710
711 self._file_paths.add(filename)
712
713 with self._state_lock:
714
715
716 if source not in self._state['files']:
717 self._state['files'][source] = {
718 'mtime': os.path.getmtime(source),
719 'size' : os.path.getsize(source)
720 }
721 new_file_in_cache = True
722 cached_file_data = self._state['files'][source]
723
724
725 file_in_local_collection = self._local_collection.find(filename)
726
727
728 if not self.resume:
729 should_upload = True
730
731 elif new_file_in_cache:
732 should_upload = True
733
734 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
735 if not file_in_local_collection:
736
737 should_upload = True
738 elif file_in_local_collection.permission_expired():
739
740
741 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
742 should_upload = True
743 self._local_collection.remove(filename)
744 elif cached_file_data['size'] == file_in_local_collection.size():
745
746 self.bytes_skipped += cached_file_data['size']
747 elif cached_file_data['size'] > file_in_local_collection.size():
748
749 resume_offset = file_in_local_collection.size()
750 self.bytes_skipped += resume_offset
751 should_upload = True
752 else:
753
754 should_upload = True
755 self._local_collection.remove(filename)
756 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
757
758 else:
759 if file_in_local_collection:
760 self._local_collection.remove(filename)
761 should_upload = True
762
763 if should_upload:
764 try:
765 self._files_to_upload.append((source, resume_offset, filename))
766 except ArvPutUploadIsPending:
767
768
769 self._cache_file.close()
770 raise
771
773 for source, resume_offset, filename in self._files_to_upload:
774 with open(source, 'rb') as source_fd:
775 with self._state_lock:
776 self._state['files'][source]['mtime'] = os.path.getmtime(source)
777 self._state['files'][source]['size'] = os.path.getsize(source)
778 if resume_offset > 0:
779
780 output = self._local_collection.open(filename, 'ab')
781 source_fd.seek(resume_offset)
782 else:
783
784 output = self._local_collection.open(filename, 'wb')
785 self._write(source_fd, output)
786 output.close(flush=False)
787
788 - def _write(self, source_fd, output):
794
796 return self._remote_collection if self.update else self._local_collection
797
799 """
800 Create a new cache file or load a previously existing one.
801 """
802
803 if update_collection and re.match(arvados.util.collection_uuid_pattern,
804 update_collection):
805 try:
806 self._remote_collection = arvados.collection.Collection(
807 update_collection, api_client=self._api_client)
808 except arvados.errors.ApiError as error:
809 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
810 else:
811 self.update = True
812 elif update_collection:
813
814 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
815
816 if self.use_cache:
817
818 md5 = hashlib.md5()
819 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
820 realpaths = sorted(os.path.realpath(path) for path in self.paths)
821 md5.update(b'\0'.join([p.encode() for p in realpaths]))
822 if self.filename:
823 md5.update(self.filename.encode())
824 cache_filename = md5.hexdigest()
825 cache_filepath = os.path.join(
826 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
827 cache_filename)
828 if self.resume and os.path.exists(cache_filepath):
829 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
830 self._cache_file = open(cache_filepath, 'a+')
831 else:
832
833 self.logger.info("Creating new cache file at {}".format(cache_filepath))
834 self._cache_file = open(cache_filepath, 'w+')
835 self._cache_filename = self._cache_file.name
836 self._lock_file(self._cache_file)
837 self._cache_file.seek(0)
838
839 with self._state_lock:
840 if self.use_cache:
841 try:
842 self._state = json.load(self._cache_file)
843 if not set(['manifest', 'files']).issubset(set(self._state.keys())):
844
845 self._state = copy.deepcopy(self.EMPTY_STATE)
846 except ValueError:
847
848 self._state = copy.deepcopy(self.EMPTY_STATE)
849 else:
850 self.logger.info("No cache usage requested for this run.")
851
852 self._state = copy.deepcopy(self.EMPTY_STATE)
853
854 self._local_collection = arvados.collection.Collection(
855 self._state['manifest'],
856 replication_desired=self.replication_desired,
857 put_threads=self.put_threads,
858 api_client=self._api_client)
859
870
872 try:
873 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
874 except IOError:
875 raise ResumeCacheConflict("{} locked".format(fileobj.name))
876
878 """
879 Atomically save current state into cache.
880 """
881 with self._state_lock:
882
883
884
885 state = json.dumps(self._state)
886 try:
887 new_cache = tempfile.NamedTemporaryFile(
888 mode='w+',
889 dir=os.path.dirname(self._cache_filename), delete=False)
890 self._lock_file(new_cache)
891 new_cache.write(state)
892 new_cache.flush()
893 os.fsync(new_cache)
894 os.rename(new_cache.name, self._cache_filename)
895 except (IOError, OSError, ResumeCacheConflict) as error:
896 self.logger.error("There was a problem while saving the cache file: {}".format(error))
897 try:
898 os.unlink(new_cache_name)
899 except NameError:
900 pass
901 else:
902 self._cache_file.close()
903 self._cache_file = new_cache
904
907
910
920
921 - def manifest_text(self, stream_name=".", strip=False, normalize=False):
923
925 """
926 Return a list of datablock locators, recursively navigating
927 through subcollections
928 """
929 if isinstance(item, arvados.arvfile.ArvadosFile):
930 if item.size() == 0:
931
932 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
933 else:
934 locators = []
935 for segment in item.segments():
936 loc = segment.locator
937 locators.append(loc)
938 return locators
939 elif isinstance(item, arvados.collection.Collection):
940 l = [self._datablocks_on_item(x) for x in listvalues(item)]
941
942
943 return [loc for sublist in l for loc in sublist]
944 else:
945 return None
946
953
954 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
955 os.getpid())
962 name = pathname.split(os.sep)
963
964 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
965 if len(name) != len(pat):
966 return False
967 for i in range(len(name)):
968 if not fnmatch.fnmatch(name[i], pat[i]):
969 return False
970 return True
971
973 return _machine_format.format(
974 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
975
977 if bytes_expected:
978 return "\r{}M / {}M {:.1%} ".format(
979 bytes_written >> 20, bytes_expected >> 20,
980 float(bytes_written) / bytes_expected)
981 else:
982 return "\r{} ".format(bytes_written)
983
985 def write_progress(bytes_written, bytes_expected):
986 outfile.write(progress_func(bytes_written, bytes_expected))
987 return write_progress
988
999
1000 -def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1001 install_sig_handlers=True):
1002 global api_client
1003
1004 args = parse_arguments(arguments)
1005 logger = logging.getLogger('arvados.arv_put')
1006 if args.silent:
1007 logger.setLevel(logging.WARNING)
1008 else:
1009 logger.setLevel(logging.INFO)
1010 status = 0
1011
1012 request_id = arvados.util.new_request_id()
1013
1014 formatter = ArvPutLogFormatter(request_id)
1015 logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1016
1017 if api_client is None:
1018 api_client = arvados.api('v1', request_id=request_id)
1019
1020 if install_sig_handlers:
1021 arv_cmd.install_signal_handlers()
1022
1023
1024 if args.name:
1025 if args.stream or args.raw:
1026 logger.error("Cannot use --name with --stream or --raw")
1027 sys.exit(1)
1028 elif args.update_collection:
1029 logger.error("Cannot use --name with --update-collection")
1030 sys.exit(1)
1031 collection_name = args.name
1032 else:
1033 collection_name = "Saved at {} by {}@{}".format(
1034 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1035 pwd.getpwuid(os.getuid()).pw_name,
1036 socket.gethostname())
1037
1038 if args.project_uuid and (args.stream or args.raw):
1039 logger.error("Cannot use --project-uuid with --stream or --raw")
1040 sys.exit(1)
1041
1042
1043 try:
1044 project_uuid = desired_project_uuid(api_client, args.project_uuid,
1045 args.retries)
1046 except (apiclient_errors.Error, ValueError) as error:
1047 logger.error(error)
1048 sys.exit(1)
1049
1050 if args.progress:
1051 reporter = progress_writer(human_progress)
1052 elif args.batch_progress:
1053 reporter = progress_writer(machine_progress)
1054 else:
1055 reporter = None
1056
1057
1058 storage_classes = None
1059 if args.storage_classes:
1060 storage_classes = args.storage_classes.strip().split(',')
1061 if len(storage_classes) > 1:
1062 logger.error("Multiple storage classes are not supported currently.")
1063 sys.exit(1)
1064
1065
1066
1067 name_patterns = []
1068 exclude_paths = []
1069 exclude_names = None
1070 if len(args.exclude) > 0:
1071
1072
1073
1074
1075
1076
1077
1078
1079 for p in args.exclude:
1080
1081 if p.startswith(os.sep):
1082 logger.error("Cannot use absolute paths with --exclude")
1083 sys.exit(1)
1084 if os.path.dirname(p):
1085
1086 p_parts = p.split(os.sep)
1087 if '..' in p_parts:
1088 logger.error(
1089 "Cannot use path patterns that include or '..'")
1090 sys.exit(1)
1091
1092 exclude_paths.append(p)
1093 else:
1094
1095 name_patterns.append(p)
1096
1097
1098 exclude_names = re.compile('|'.join(
1099 [fnmatch.translate(p) for p in name_patterns]
1100 )) if len(name_patterns) > 0 else None
1101
1102
1103 logger.info("Exclude patterns: {}".format(args.exclude))
1104
1105
1106
1107 if args.progress and any([os.path.isdir(f) for f in args.paths]):
1108 logger.info("Calculating upload size, this could take some time...")
1109 try:
1110 writer = ArvPutUploadJob(paths = args.paths,
1111 resume = args.resume,
1112 use_cache = args.use_cache,
1113 filename = args.filename,
1114 reporter = reporter,
1115 api_client = api_client,
1116 num_retries = args.retries,
1117 replication_desired = args.replication,
1118 put_threads = args.threads,
1119 name = collection_name,
1120 owner_uuid = project_uuid,
1121 ensure_unique_name = True,
1122 update_collection = args.update_collection,
1123 storage_classes=storage_classes,
1124 logger=logger,
1125 dry_run=args.dry_run,
1126 follow_links=args.follow_links,
1127 exclude_paths=exclude_paths,
1128 exclude_names=exclude_names)
1129 except ResumeCacheConflict:
1130 logger.error("\n".join([
1131 "arv-put: Another process is already uploading this data.",
1132 " Use --no-cache if this is really what you want."]))
1133 sys.exit(1)
1134 except CollectionUpdateError as error:
1135 logger.error("\n".join([
1136 "arv-put: %s" % str(error)]))
1137 sys.exit(1)
1138 except ArvPutUploadIsPending:
1139
1140 sys.exit(2)
1141 except ArvPutUploadNotPending:
1142
1143 sys.exit(0)
1144 except PathDoesNotExistError as error:
1145 logger.error("\n".join([
1146 "arv-put: %s" % str(error)]))
1147 sys.exit(1)
1148
1149 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1150 logger.warning("\n".join([
1151 "arv-put: Resuming previous upload from last checkpoint.",
1152 " Use the --no-resume option to start over."]))
1153
1154 if not args.dry_run:
1155 writer.report_progress()
1156 output = None
1157 try:
1158 writer.start(save_collection=not(args.stream or args.raw))
1159 except arvados.errors.ApiError as error:
1160 logger.error("\n".join([
1161 "arv-put: %s" % str(error)]))
1162 sys.exit(1)
1163
1164 if args.progress:
1165 logger.info("\n")
1166
1167 if args.stream:
1168 if args.normalize:
1169 output = writer.manifest_text(normalize=True)
1170 else:
1171 output = writer.manifest_text()
1172 elif args.raw:
1173 output = ','.join(writer.data_locators())
1174 else:
1175 try:
1176 if args.update_collection:
1177 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1178 else:
1179 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1180 if args.portable_data_hash:
1181 output = writer.portable_data_hash()
1182 else:
1183 output = writer.manifest_locator()
1184 except apiclient_errors.Error as error:
1185 logger.error(
1186 "arv-put: Error creating Collection on project: {}.".format(
1187 error))
1188 status = 1
1189
1190
1191 if output is None:
1192 status = status or 1
1193 elif not args.silent:
1194 stdout.write(output)
1195 if not output.endswith('\n'):
1196 stdout.write('\n')
1197
1198 if install_sig_handlers:
1199 arv_cmd.restore_signal_handlers()
1200
1201 if status != 0:
1202 sys.exit(status)
1203
1204
1205 return output
1206
1207
1208 if __name__ == '__main__':
1209 main()
1210