arvados.commands.put
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import division 6from future.utils import listitems, listvalues 7from builtins import str 8from builtins import object 9import argparse 10import arvados 11import arvados.collection 12import base64 13import ciso8601 14import copy 15import datetime 16import errno 17import fcntl 18import fnmatch 19import hashlib 20import json 21import logging 22import os 23import pwd 24import re 25import signal 26import socket 27import sys 28import tempfile 29import threading 30import time 31import traceback 32 33from apiclient import errors as apiclient_errors 34from arvados._version import __version__ 35from arvados.util import keep_locator_pattern 36 37import arvados.commands._util as arv_cmd 38 39api_client = None 40 41upload_opts = argparse.ArgumentParser(add_help=False) 42 43upload_opts.add_argument('--version', action='version', 44 version="%s %s" % (sys.argv[0], __version__), 45 help='Print version and exit.') 46upload_opts.add_argument('paths', metavar='path', type=str, nargs='*', 47 help=""" 48Local file or directory. If path is a directory reference with a trailing 49slash, then just upload the directory's contents; otherwise upload the 50directory itself. Default: read from standard input. 51""") 52 53_group = upload_opts.add_mutually_exclusive_group() 54 55_group.add_argument('--max-manifest-depth', type=int, metavar='N', 56 default=-1, help=argparse.SUPPRESS) 57 58_group.add_argument('--normalize', action='store_true', 59 help=""" 60Normalize the manifest by re-ordering files and streams after writing 61data. 62""") 63 64_group.add_argument('--dry-run', action='store_true', default=False, 65 help=""" 66Don't actually upload files, but only check if any file should be 67uploaded. Exit with code=2 when files are pending for upload. 68""") 69 70_group = upload_opts.add_mutually_exclusive_group() 71 72_group.add_argument('--as-stream', action='store_true', dest='stream', 73 help=""" 74Synonym for --stream. 75""") 76 77_group.add_argument('--stream', action='store_true', 78 help=""" 79Store the file content and display the resulting manifest on 80stdout. Do not save a Collection object in Arvados. 81""") 82 83_group.add_argument('--as-manifest', action='store_true', dest='manifest', 84 help=""" 85Synonym for --manifest. 86""") 87 88_group.add_argument('--in-manifest', action='store_true', dest='manifest', 89 help=""" 90Synonym for --manifest. 91""") 92 93_group.add_argument('--manifest', action='store_true', 94 help=""" 95Store the file data and resulting manifest in Keep, save a Collection 96object in Arvados, and display the manifest locator (Collection uuid) 97on stdout. This is the default behavior. 98""") 99 100_group.add_argument('--as-raw', action='store_true', dest='raw', 101 help=""" 102Synonym for --raw. 103""") 104 105_group.add_argument('--raw', action='store_true', 106 help=""" 107Store the file content and display the data block locators on stdout, 108separated by commas, with a trailing newline. Do not store a 109manifest. 110""") 111 112upload_opts.add_argument('--update-collection', type=str, default=None, 113 dest='update_collection', metavar="UUID", help=""" 114Update an existing collection identified by the given Arvados collection 115UUID. All new local files will be uploaded. 116""") 117 118upload_opts.add_argument('--use-filename', type=str, default=None, 119 dest='filename', help=""" 120Synonym for --filename. 121""") 122 123upload_opts.add_argument('--filename', type=str, default=None, 124 help=""" 125Use the given filename in the manifest, instead of the name of the 126local file. This is useful when "-" or "/dev/stdin" is given as an 127input file. It can be used only if there is exactly one path given and 128it is not a directory. Implies --manifest. 129""") 130 131upload_opts.add_argument('--portable-data-hash', action='store_true', 132 help=""" 133Print the portable data hash instead of the Arvados UUID for the collection 134created by the upload. 135""") 136 137upload_opts.add_argument('--replication', type=int, metavar='N', default=None, 138 help=""" 139Set the replication level for the new collection: how many different 140physical storage devices (e.g., disks) should have a copy of each data 141block. Default is to use the server-provided default (if any) or 2. 142""") 143 144upload_opts.add_argument('--storage-classes', help=""" 145Specify comma separated list of storage classes to be used when saving data to Keep. 146""") 147 148upload_opts.add_argument('--threads', type=int, metavar='N', default=None, 149 help=""" 150Set the number of upload threads to be used. Take into account that 151using lots of threads will increase the RAM requirements. Default is 152to use 2 threads. 153On high latency installations, using a greater number will improve 154overall throughput. 155""") 156 157upload_opts.add_argument('--exclude', metavar='PATTERN', default=[], 158 action='append', help=""" 159Exclude files and directories whose names match the given glob pattern. When 160using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir' 161directory, relative to the provided input dirs will be excluded. 162When using a filename pattern like '*.txt', any text file will be excluded 163no matter where it is placed. 164For the special case of needing to exclude only files or dirs directly below 165the given input directory, you can use a pattern like './exclude_this.gif'. 166You can specify multiple patterns by using this argument more than once. 167""") 168 169_group = upload_opts.add_mutually_exclusive_group() 170_group.add_argument('--follow-links', action='store_true', default=True, 171 dest='follow_links', help=""" 172Follow file and directory symlinks (default). 173""") 174_group.add_argument('--no-follow-links', action='store_false', dest='follow_links', 175 help=""" 176Ignore file and directory symlinks. Even paths given explicitly on the 177command line will be skipped if they are symlinks. 178""") 179 180 181run_opts = argparse.ArgumentParser(add_help=False) 182 183run_opts.add_argument('--project-uuid', metavar='UUID', help=""" 184Store the collection in the specified project, instead of your Home 185project. 186""") 187 188run_opts.add_argument('--name', help=""" 189Save the collection with the specified name. 190""") 191 192_group = run_opts.add_mutually_exclusive_group() 193_group.add_argument('--progress', action='store_true', 194 help=""" 195Display human-readable progress on stderr (bytes and, if possible, 196percentage of total data size). This is the default behavior when 197stderr is a tty. 198""") 199 200_group.add_argument('--no-progress', action='store_true', 201 help=""" 202Do not display human-readable progress on stderr, even if stderr is a 203tty. 204""") 205 206_group.add_argument('--batch-progress', action='store_true', 207 help=""" 208Display machine-readable progress on stderr (bytes and, if known, 209total data size). 210""") 211 212run_opts.add_argument('--silent', action='store_true', 213 help=""" 214Do not print any debug messages to console. (Any error messages will 215still be displayed.) 216""") 217 218run_opts.add_argument('--batch', action='store_true', default=False, 219 help=""" 220Retries with '--no-resume --no-cache' if cached state contains invalid/expired 221block signatures. 222""") 223 224_group = run_opts.add_mutually_exclusive_group() 225_group.add_argument('--resume', action='store_true', default=True, 226 help=""" 227Continue interrupted uploads from cached state (default). 228""") 229_group.add_argument('--no-resume', action='store_false', dest='resume', 230 help=""" 231Do not continue interrupted uploads from cached state. 232""") 233 234_group = run_opts.add_mutually_exclusive_group() 235_group.add_argument('--cache', action='store_true', dest='use_cache', default=True, 236 help=""" 237Save upload state in a cache file for resuming (default). 238""") 239_group.add_argument('--no-cache', action='store_false', dest='use_cache', 240 help=""" 241Do not save upload state in a cache file for resuming. 242""") 243 244_group = upload_opts.add_mutually_exclusive_group() 245_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None, 246 help=""" 247Set the trash date of the resulting collection to an absolute date in the future. 248The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n 249Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone. 250""") 251_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None, 252 help=""" 253Set the trash date of the resulting collection to an amount of days from the 254date/time that the upload process finishes. 255""") 256 257arg_parser = argparse.ArgumentParser( 258 description='Copy data from the local filesystem to Keep.', 259 parents=[upload_opts, run_opts, arv_cmd.retry_opt]) 260 261def parse_arguments(arguments): 262 args = arg_parser.parse_args(arguments) 263 264 if len(args.paths) == 0: 265 args.paths = ['-'] 266 267 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths] 268 269 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])): 270 arg_parser.error(""" 271 --filename argument cannot be used when storing a directory or 272 multiple files. 273 """) 274 275 # Turn on --progress by default if stderr is a tty. 276 if (not (args.batch_progress or args.no_progress or args.silent) 277 and os.isatty(sys.stderr.fileno())): 278 args.progress = True 279 280 # Turn off --resume (default) if --no-cache is used. 281 if not args.use_cache: 282 args.resume = False 283 284 if args.paths == ['-']: 285 if args.update_collection: 286 arg_parser.error(""" 287 --update-collection cannot be used when reading from stdin. 288 """) 289 args.resume = False 290 args.use_cache = False 291 if not args.filename: 292 args.filename = 'stdin' 293 294 # Remove possible duplicated patterns 295 if len(args.exclude) > 0: 296 args.exclude = list(set(args.exclude)) 297 298 return args 299 300 301class PathDoesNotExistError(Exception): 302 pass 303 304 305class CollectionUpdateError(Exception): 306 pass 307 308 309class ResumeCacheConflict(Exception): 310 pass 311 312 313class ResumeCacheInvalidError(Exception): 314 pass 315 316class ArvPutArgumentConflict(Exception): 317 pass 318 319 320class ArvPutUploadIsPending(Exception): 321 pass 322 323 324class ArvPutUploadNotPending(Exception): 325 pass 326 327 328class FileUploadList(list): 329 def __init__(self, dry_run=False): 330 list.__init__(self) 331 self.dry_run = dry_run 332 333 def append(self, other): 334 if self.dry_run: 335 raise ArvPutUploadIsPending() 336 super(FileUploadList, self).append(other) 337 338 339# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG 340class ArvPutLogFormatter(logging.Formatter): 341 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format) 342 err_fmtr = None 343 request_id_informed = False 344 345 def __init__(self, request_id): 346 self.err_fmtr = logging.Formatter( 347 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 348 arvados.log_date_format) 349 350 def format(self, record): 351 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 352 self.request_id_informed = True 353 return self.err_fmtr.format(record) 354 return self.std_fmtr.format(record) 355 356 357class ResumeCache(object): 358 CACHE_DIR = '.cache/arvados/arv-put' 359 360 def __init__(self, file_spec): 361 self.cache_file = open(file_spec, 'a+') 362 self._lock_file(self.cache_file) 363 self.filename = self.cache_file.name 364 365 @classmethod 366 def make_path(cls, args): 367 md5 = hashlib.md5() 368 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 369 realpaths = sorted(os.path.realpath(path) for path in args.paths) 370 md5.update(b'\0'.join([p.encode() for p in realpaths])) 371 if any(os.path.isdir(path) for path in realpaths): 372 md5.update(b'-1') 373 elif args.filename: 374 md5.update(args.filename.encode()) 375 return os.path.join( 376 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), 377 md5.hexdigest()) 378 379 def _lock_file(self, fileobj): 380 try: 381 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 382 except IOError: 383 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 384 385 def load(self): 386 self.cache_file.seek(0) 387 return json.load(self.cache_file) 388 389 def check_cache(self, api_client=None, num_retries=0): 390 try: 391 state = self.load() 392 locator = None 393 try: 394 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 395 locator = state["_finished_streams"][0][1][0] 396 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 397 locator = state["_current_stream_locators"][0] 398 if locator is not None: 399 kc = arvados.keep.KeepClient(api_client=api_client) 400 kc.head(locator, num_retries=num_retries) 401 except Exception as e: 402 self.restart() 403 except (ValueError): 404 pass 405 406 def save(self, data): 407 try: 408 new_cache_fd, new_cache_name = tempfile.mkstemp( 409 dir=os.path.dirname(self.filename)) 410 self._lock_file(new_cache_fd) 411 new_cache = os.fdopen(new_cache_fd, 'r+') 412 json.dump(data, new_cache) 413 os.rename(new_cache_name, self.filename) 414 except (IOError, OSError, ResumeCacheConflict): 415 try: 416 os.unlink(new_cache_name) 417 except NameError: # mkstemp failed. 418 pass 419 else: 420 self.cache_file.close() 421 self.cache_file = new_cache 422 423 def close(self): 424 self.cache_file.close() 425 426 def destroy(self): 427 try: 428 os.unlink(self.filename) 429 except OSError as error: 430 if error.errno != errno.ENOENT: # That's what we wanted anyway. 431 raise 432 self.close() 433 434 def restart(self): 435 self.destroy() 436 self.__init__(self.filename) 437 438 439class ArvPutUploadJob(object): 440 CACHE_DIR = '.cache/arvados/arv-put' 441 EMPTY_STATE = { 442 'manifest' : None, # Last saved manifest checkpoint 443 'files' : {} # Previous run file list: {path : {size, mtime}} 444 } 445 446 def __init__(self, paths, resume=True, use_cache=True, reporter=None, 447 name=None, owner_uuid=None, api_client=None, batch_mode=False, 448 ensure_unique_name=False, num_retries=None, 449 put_threads=None, replication_desired=None, filename=None, 450 update_time=60.0, update_collection=None, storage_classes=None, 451 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 452 follow_links=True, exclude_paths=[], exclude_names=None, 453 trash_at=None): 454 self.paths = paths 455 self.resume = resume 456 self.use_cache = use_cache 457 self.batch_mode = batch_mode 458 self.update = False 459 self.reporter = reporter 460 # This will set to 0 before start counting, if no special files are going 461 # to be read. 462 self.bytes_expected = None 463 self.bytes_written = 0 464 self.bytes_skipped = 0 465 self.name = name 466 self.owner_uuid = owner_uuid 467 self.ensure_unique_name = ensure_unique_name 468 self.num_retries = num_retries 469 self.replication_desired = replication_desired 470 self.put_threads = put_threads 471 self.filename = filename 472 self.storage_classes = storage_classes 473 self._api_client = api_client 474 self._state_lock = threading.Lock() 475 self._state = None # Previous run state (file list & manifest) 476 self._current_files = [] # Current run file list 477 self._cache_file = None 478 self._collection_lock = threading.Lock() 479 self._remote_collection = None # Collection being updated (if asked) 480 self._local_collection = None # Collection from previous run manifest 481 self._file_paths = set() # Files to be updated in remote collection 482 self._stop_checkpointer = threading.Event() 483 self._checkpointer = threading.Thread(target=self._update_task) 484 self._checkpointer.daemon = True 485 self._update_task_time = update_time # How many seconds wait between update runs 486 self._files_to_upload = FileUploadList(dry_run=dry_run) 487 self._upload_started = False 488 self.logger = logger 489 self.dry_run = dry_run 490 self._checkpoint_before_quit = True 491 self.follow_links = follow_links 492 self.exclude_paths = exclude_paths 493 self.exclude_names = exclude_names 494 self._trash_at = trash_at 495 496 if self._trash_at is not None: 497 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: 498 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') 499 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: 500 raise TypeError('provided trash_at datetime should be timezone-naive') 501 502 if not self.use_cache and self.resume: 503 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 504 505 # Check for obvious dry-run responses 506 if self.dry_run and (not self.use_cache or not self.resume): 507 raise ArvPutUploadIsPending() 508 509 # Load cached data if any and if needed 510 self._setup_state(update_collection) 511 512 # Build the upload file list, excluding requested files and counting the 513 # bytes expected to be uploaded. 514 self._build_upload_list() 515 516 def _build_upload_list(self): 517 """ 518 Scan the requested paths to count file sizes, excluding requested files 519 and dirs and building the upload file list. 520 """ 521 # If there aren't special files to be read, reset total bytes count to zero 522 # to start counting. 523 if not any([p for p in self.paths 524 if not (os.path.isfile(p) or os.path.isdir(p))]): 525 self.bytes_expected = 0 526 527 for path in self.paths: 528 # Test for stdin first, in case some file named '-' exist 529 if path == '-': 530 if self.dry_run: 531 raise ArvPutUploadIsPending() 532 self._write_stdin(self.filename or 'stdin') 533 elif not os.path.exists(path): 534 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path)) 535 elif (not self.follow_links) and os.path.islink(path): 536 self.logger.warning("Skipping symlink '{}'".format(path)) 537 continue 538 elif os.path.isdir(path): 539 # Use absolute paths on cache index so CWD doesn't interfere 540 # with the caching logic. 541 orig_path = path 542 path = os.path.abspath(path) 543 if orig_path[-1:] == os.sep: 544 # When passing a directory reference with a trailing slash, 545 # its contents should be uploaded directly to the 546 # collection's root. 547 prefixdir = path 548 else: 549 # When passing a directory reference with no trailing slash, 550 # upload the directory to the collection's root. 551 prefixdir = os.path.dirname(path) 552 prefixdir += os.sep 553 for root, dirs, files in os.walk(path, 554 followlinks=self.follow_links): 555 root_relpath = os.path.relpath(root, path) 556 if root_relpath == '.': 557 root_relpath = '' 558 # Exclude files/dirs by full path matching pattern 559 if self.exclude_paths: 560 dirs[:] = [d for d in dirs 561 if not any(pathname_match( 562 os.path.join(root_relpath, d), pat) 563 for pat in self.exclude_paths)] 564 files = [f for f in files 565 if not any(pathname_match( 566 os.path.join(root_relpath, f), pat) 567 for pat in self.exclude_paths)] 568 # Exclude files/dirs by name matching pattern 569 if self.exclude_names is not None: 570 dirs[:] = [d for d in dirs 571 if not self.exclude_names.match(d)] 572 files = [f for f in files 573 if not self.exclude_names.match(f)] 574 # Make os.walk()'s dir traversing order deterministic 575 dirs.sort() 576 files.sort() 577 for f in files: 578 filepath = os.path.join(root, f) 579 if not os.path.isfile(filepath): 580 self.logger.warning("Skipping non-regular file '{}'".format(filepath)) 581 continue 582 # Add its size to the total bytes count (if applicable) 583 if self.follow_links or (not os.path.islink(filepath)): 584 if self.bytes_expected is not None: 585 self.bytes_expected += os.path.getsize(filepath) 586 self._check_file(filepath, 587 os.path.join(root[len(prefixdir):], f)) 588 else: 589 filepath = os.path.abspath(path) 590 # Add its size to the total bytes count (if applicable) 591 if self.follow_links or (not os.path.islink(filepath)): 592 if self.bytes_expected is not None: 593 self.bytes_expected += os.path.getsize(filepath) 594 self._check_file(filepath, 595 self.filename or os.path.basename(path)) 596 # If dry-mode is on, and got up to this point, then we should notify that 597 # there aren't any file to upload. 598 if self.dry_run: 599 raise ArvPutUploadNotPending() 600 # Remove local_collection's files that don't exist locally anymore, so the 601 # bytes_written count is correct. 602 for f in self.collection_file_paths(self._local_collection, 603 path_prefix=""): 604 if f != 'stdin' and f != self.filename and not f in self._file_paths: 605 self._local_collection.remove(f) 606 607 def start(self, save_collection): 608 """ 609 Start supporting thread & file uploading 610 """ 611 self._checkpointer.start() 612 try: 613 # Update bytes_written from current local collection and 614 # report initial progress. 615 self._update() 616 # Actual file upload 617 self._upload_started = True # Used by the update thread to start checkpointing 618 self._upload_files() 619 except (SystemExit, Exception) as e: 620 self._checkpoint_before_quit = False 621 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 622 # Note: We're expecting SystemExit instead of 623 # KeyboardInterrupt because we have a custom signal 624 # handler in place that raises SystemExit with the catched 625 # signal's code. 626 if isinstance(e, PathDoesNotExistError): 627 # We aren't interested in the traceback for this case 628 pass 629 elif not isinstance(e, SystemExit) or e.code != -2: 630 self.logger.warning("Abnormal termination:\n{}".format( 631 traceback.format_exc())) 632 raise 633 finally: 634 if not self.dry_run: 635 # Stop the thread before doing anything else 636 self._stop_checkpointer.set() 637 self._checkpointer.join() 638 if self._checkpoint_before_quit: 639 # Commit all pending blocks & one last _update() 640 self._local_collection.manifest_text() 641 self._update(final=True) 642 if save_collection: 643 self.save_collection() 644 if self.use_cache: 645 self._cache_file.close() 646 647 def _collection_trash_at(self): 648 """ 649 Returns the trash date that the collection should use at save time. 650 Takes into account absolute/relative trash_at values requested 651 by the user. 652 """ 653 if type(self._trash_at) == datetime.timedelta: 654 # Get an absolute datetime for trash_at 655 return datetime.datetime.utcnow() + self._trash_at 656 return self._trash_at 657 658 def save_collection(self): 659 if self.update: 660 # Check if files should be updated on the remote collection. 661 for fp in self._file_paths: 662 remote_file = self._remote_collection.find(fp) 663 if not remote_file: 664 # File don't exist on remote collection, copy it. 665 self._remote_collection.copy(fp, fp, self._local_collection) 666 elif remote_file != self._local_collection.find(fp): 667 # A different file exist on remote collection, overwrite it. 668 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 669 else: 670 # The file already exist on remote collection, skip it. 671 pass 672 self._remote_collection.save(num_retries=self.num_retries, 673 trash_at=self._collection_trash_at()) 674 else: 675 if len(self._local_collection) == 0: 676 self.logger.warning("No files were uploaded, skipping collection creation.") 677 return 678 self._local_collection.save_new( 679 name=self.name, owner_uuid=self.owner_uuid, 680 ensure_unique_name=self.ensure_unique_name, 681 num_retries=self.num_retries, 682 trash_at=self._collection_trash_at()) 683 684 def destroy_cache(self): 685 if self.use_cache: 686 try: 687 os.unlink(self._cache_filename) 688 except OSError as error: 689 # That's what we wanted anyway. 690 if error.errno != errno.ENOENT: 691 raise 692 self._cache_file.close() 693 694 def _collection_size(self, collection): 695 """ 696 Recursively get the total size of the collection 697 """ 698 size = 0 699 for item in listvalues(collection): 700 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): 701 size += self._collection_size(item) 702 else: 703 size += item.size() 704 return size 705 706 def _update_task(self): 707 """ 708 Periodically called support task. File uploading is 709 asynchronous so we poll status from the collection. 710 """ 711 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time): 712 self._update() 713 714 def _update(self, final=False): 715 """ 716 Update cached manifest text and report progress. 717 """ 718 if self._upload_started: 719 with self._collection_lock: 720 self.bytes_written = self._collection_size(self._local_collection) 721 if self.use_cache: 722 if final: 723 manifest = self._local_collection.manifest_text() 724 else: 725 # Get the manifest text without comitting pending blocks 726 manifest = self._local_collection.manifest_text(strip=False, 727 normalize=False, 728 only_committed=True) 729 # Update cache 730 with self._state_lock: 731 self._state['manifest'] = manifest 732 if self.use_cache: 733 try: 734 self._save_state() 735 except Exception as e: 736 self.logger.error("Unexpected error trying to save cache file: {}".format(e)) 737 # Keep remote collection's trash_at attribute synced when using relative expire dates 738 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta: 739 try: 740 self._api_client.collections().update( 741 uuid=self._remote_collection.manifest_locator(), 742 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")} 743 ).execute(num_retries=self.num_retries) 744 except Exception as e: 745 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e)) 746 else: 747 self.bytes_written = self.bytes_skipped 748 # Call the reporter, if any 749 self.report_progress() 750 751 def report_progress(self): 752 if self.reporter is not None: 753 self.reporter(self.bytes_written, self.bytes_expected) 754 755 def _write_stdin(self, filename): 756 output = self._local_collection.open(filename, 'wb') 757 self._write(sys.stdin.buffer, output) 758 output.close() 759 760 def _check_file(self, source, filename): 761 """ 762 Check if this file needs to be uploaded 763 """ 764 # Ignore symlinks when requested 765 if (not self.follow_links) and os.path.islink(source): 766 return 767 resume_offset = 0 768 should_upload = False 769 new_file_in_cache = False 770 # Record file path for updating the remote collection before exiting 771 self._file_paths.add(filename) 772 773 with self._state_lock: 774 # If no previous cached data on this file, store it for an eventual 775 # repeated run. 776 if source not in self._state['files']: 777 self._state['files'][source] = { 778 'mtime': os.path.getmtime(source), 779 'size' : os.path.getsize(source) 780 } 781 new_file_in_cache = True 782 cached_file_data = self._state['files'][source] 783 784 # Check if file was already uploaded (at least partially) 785 file_in_local_collection = self._local_collection.find(filename) 786 787 # If not resuming, upload the full file. 788 if not self.resume: 789 should_upload = True 790 # New file detected from last run, upload it. 791 elif new_file_in_cache: 792 should_upload = True 793 # Local file didn't change from last run. 794 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): 795 if not file_in_local_collection: 796 # File not uploaded yet, upload it completely 797 should_upload = True 798 elif file_in_local_collection.permission_expired(): 799 # Permission token expired, re-upload file. This will change whenever 800 # we have a API for refreshing tokens. 801 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename)) 802 should_upload = True 803 self._local_collection.remove(filename) 804 elif cached_file_data['size'] == file_in_local_collection.size(): 805 # File already there, skip it. 806 self.bytes_skipped += cached_file_data['size'] 807 elif cached_file_data['size'] > file_in_local_collection.size(): 808 # File partially uploaded, resume! 809 resume_offset = file_in_local_collection.size() 810 self.bytes_skipped += resume_offset 811 should_upload = True 812 else: 813 # Inconsistent cache, re-upload the file 814 should_upload = True 815 self._local_collection.remove(filename) 816 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) 817 # Local file differs from cached data, re-upload it. 818 else: 819 if file_in_local_collection: 820 self._local_collection.remove(filename) 821 should_upload = True 822 823 if should_upload: 824 try: 825 self._files_to_upload.append((source, resume_offset, filename)) 826 except ArvPutUploadIsPending: 827 # This could happen when running on dry-mode, close cache file to 828 # avoid locking issues. 829 self._cache_file.close() 830 raise 831 832 def _upload_files(self): 833 for source, resume_offset, filename in self._files_to_upload: 834 with open(source, 'rb') as source_fd: 835 with self._state_lock: 836 self._state['files'][source]['mtime'] = os.path.getmtime(source) 837 self._state['files'][source]['size'] = os.path.getsize(source) 838 if resume_offset > 0: 839 # Start upload where we left off 840 output = self._local_collection.open(filename, 'ab') 841 source_fd.seek(resume_offset) 842 else: 843 # Start from scratch 844 output = self._local_collection.open(filename, 'wb') 845 self._write(source_fd, output) 846 output.close(flush=False) 847 848 def _write(self, source_fd, output): 849 while True: 850 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) 851 if not data: 852 break 853 output.write(data) 854 855 def _my_collection(self): 856 return self._remote_collection if self.update else self._local_collection 857 858 def _get_cache_filepath(self): 859 # Set up cache file name from input paths. 860 md5 = hashlib.md5() 861 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 862 realpaths = sorted(os.path.realpath(path) for path in self.paths) 863 md5.update(b'\0'.join([p.encode() for p in realpaths])) 864 if self.filename: 865 md5.update(self.filename.encode()) 866 cache_filename = md5.hexdigest() 867 cache_filepath = os.path.join( 868 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), 869 cache_filename) 870 return cache_filepath 871 872 def _setup_state(self, update_collection): 873 """ 874 Create a new cache file or load a previously existing one. 875 """ 876 # Load an already existing collection for update 877 if update_collection and re.match(arvados.util.collection_uuid_pattern, 878 update_collection): 879 try: 880 self._remote_collection = arvados.collection.Collection( 881 update_collection, 882 api_client=self._api_client, 883 storage_classes_desired=self.storage_classes, 884 num_retries=self.num_retries) 885 except arvados.errors.ApiError as error: 886 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) 887 else: 888 self.update = True 889 elif update_collection: 890 # Collection locator provided, but unknown format 891 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) 892 893 if self.use_cache: 894 cache_filepath = self._get_cache_filepath() 895 if self.resume and os.path.exists(cache_filepath): 896 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath)) 897 self._cache_file = open(cache_filepath, 'a+') 898 else: 899 # --no-resume means start with a empty cache file. 900 self.logger.info(u"Creating new cache file at {}".format(cache_filepath)) 901 self._cache_file = open(cache_filepath, 'w+') 902 self._cache_filename = self._cache_file.name 903 self._lock_file(self._cache_file) 904 self._cache_file.seek(0) 905 906 with self._state_lock: 907 if self.use_cache: 908 try: 909 self._state = json.load(self._cache_file) 910 if not set(['manifest', 'files']).issubset(set(self._state.keys())): 911 # Cache at least partially incomplete, set up new cache 912 self._state = copy.deepcopy(self.EMPTY_STATE) 913 except ValueError: 914 # Cache file empty, set up new cache 915 self._state = copy.deepcopy(self.EMPTY_STATE) 916 else: 917 self.logger.info("No cache usage requested for this run.") 918 # No cache file, set empty state 919 self._state = copy.deepcopy(self.EMPTY_STATE) 920 if not self._cached_manifest_valid(): 921 if not self.batch_mode: 922 raise ResumeCacheInvalidError() 923 else: 924 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name)) 925 self.use_cache = False # Don't overwrite preexisting cache file. 926 self._state = copy.deepcopy(self.EMPTY_STATE) 927 # Load the previous manifest so we can check if files were modified remotely. 928 self._local_collection = arvados.collection.Collection( 929 self._state['manifest'], 930 replication_desired=self.replication_desired, 931 storage_classes_desired=self.storage_classes, 932 put_threads=self.put_threads, 933 api_client=self._api_client, 934 num_retries=self.num_retries) 935 936 def _cached_manifest_valid(self): 937 """ 938 Validate the oldest non-expired block signature to check if cached manifest 939 is usable: checking if the cached manifest was not created with a different 940 arvados account. 941 """ 942 if self._state.get('manifest', None) is None: 943 # No cached manifest yet, all good. 944 return True 945 now = datetime.datetime.utcnow() 946 oldest_exp = None 947 oldest_loc = None 948 block_found = False 949 for m in keep_locator_pattern.finditer(self._state['manifest']): 950 loc = m.group(0) 951 try: 952 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16)) 953 except IndexError: 954 # Locator without signature 955 continue 956 block_found = True 957 if exp > now and (oldest_exp is None or exp < oldest_exp): 958 oldest_exp = exp 959 oldest_loc = loc 960 if not block_found: 961 # No block signatures found => no invalid block signatures. 962 return True 963 if oldest_loc is None: 964 # Locator signatures found, but all have expired. 965 # Reset the cache and move on. 966 self.logger.info('Cache expired, starting from scratch.') 967 self._state['manifest'] = '' 968 return True 969 kc = arvados.KeepClient(api_client=self._api_client, 970 num_retries=self.num_retries) 971 try: 972 kc.head(oldest_loc) 973 except arvados.errors.KeepRequestError: 974 # Something is wrong, cached manifest is not valid. 975 return False 976 return True 977 978 def collection_file_paths(self, col, path_prefix='.'): 979 """Return a list of file paths by recursively go through the entire collection `col`""" 980 file_paths = [] 981 for name, item in listitems(col): 982 if isinstance(item, arvados.arvfile.ArvadosFile): 983 file_paths.append(os.path.join(path_prefix, name)) 984 elif isinstance(item, arvados.collection.Subcollection): 985 new_prefix = os.path.join(path_prefix, name) 986 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 987 return file_paths 988 989 def _lock_file(self, fileobj): 990 try: 991 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 992 except IOError: 993 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 994 995 def _save_state(self): 996 """ 997 Atomically save current state into cache. 998 """ 999 with self._state_lock: 1000 # We're not using copy.deepcopy() here because it's a lot slower 1001 # than json.dumps(), and we're already needing JSON format to be 1002 # saved on disk. 1003 state = json.dumps(self._state) 1004 try: 1005 new_cache = tempfile.NamedTemporaryFile( 1006 mode='w+', 1007 dir=os.path.dirname(self._cache_filename), delete=False) 1008 self._lock_file(new_cache) 1009 new_cache.write(state) 1010 new_cache.flush() 1011 os.fsync(new_cache) 1012 os.rename(new_cache.name, self._cache_filename) 1013 except (IOError, OSError, ResumeCacheConflict) as error: 1014 self.logger.error("There was a problem while saving the cache file: {}".format(error)) 1015 try: 1016 os.unlink(new_cache_name) 1017 except NameError: # mkstemp failed. 1018 pass 1019 else: 1020 self._cache_file.close() 1021 self._cache_file = new_cache 1022 1023 def collection_name(self): 1024 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None 1025 1026 def collection_trash_at(self): 1027 return self._my_collection().get_trash_at() 1028 1029 def manifest_locator(self): 1030 return self._my_collection().manifest_locator() 1031 1032 def portable_data_hash(self): 1033 pdh = self._my_collection().portable_data_hash() 1034 m = self._my_collection().stripped_manifest().encode() 1035 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 1036 if pdh != local_pdh: 1037 self.logger.warning("\n".join([ 1038 "arv-put: API server provided PDH differs from local manifest.", 1039 " This should not happen; showing API server version."])) 1040 return pdh 1041 1042 def manifest_text(self, stream_name=".", strip=False, normalize=False): 1043 return self._my_collection().manifest_text(stream_name, strip, normalize) 1044 1045 def _datablocks_on_item(self, item): 1046 """ 1047 Return a list of datablock locators, recursively navigating 1048 through subcollections 1049 """ 1050 if isinstance(item, arvados.arvfile.ArvadosFile): 1051 if item.size() == 0: 1052 # Empty file locator 1053 return ["d41d8cd98f00b204e9800998ecf8427e+0"] 1054 else: 1055 locators = [] 1056 for segment in item.segments(): 1057 loc = segment.locator 1058 locators.append(loc) 1059 return locators 1060 elif isinstance(item, arvados.collection.Collection): 1061 l = [self._datablocks_on_item(x) for x in listvalues(item)] 1062 # Fast list flattener method taken from: 1063 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python 1064 return [loc for sublist in l for loc in sublist] 1065 else: 1066 return None 1067 1068 def data_locators(self): 1069 with self._collection_lock: 1070 # Make sure all datablocks are flushed before getting the locators 1071 self._my_collection().manifest_text() 1072 datablocks = self._datablocks_on_item(self._my_collection()) 1073 return datablocks 1074 1075_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0], 1076 os.getpid()) 1077 1078# Simulate glob.glob() matching behavior without the need to scan the filesystem 1079# Note: fnmatch() doesn't work correctly when used with pathnames. For example the 1080# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py', 1081# so instead we're using it on every path component. 1082def pathname_match(pathname, pattern): 1083 name = pathname.split(os.sep) 1084 # Fix patterns like 'some/subdir/' or 'some//subdir' 1085 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.'] 1086 if len(name) != len(pat): 1087 return False 1088 for i in range(len(name)): 1089 if not fnmatch.fnmatch(name[i], pat[i]): 1090 return False 1091 return True 1092 1093def machine_progress(bytes_written, bytes_expected): 1094 return _machine_format.format( 1095 bytes_written, -1 if (bytes_expected is None) else bytes_expected) 1096 1097def human_progress(bytes_written, bytes_expected): 1098 if bytes_expected: 1099 return "\r{}M / {}M {:.1%} ".format( 1100 bytes_written >> 20, bytes_expected >> 20, 1101 float(bytes_written) / bytes_expected) 1102 else: 1103 return "\r{} ".format(bytes_written) 1104 1105def progress_writer(progress_func, outfile=sys.stderr): 1106 def write_progress(bytes_written, bytes_expected): 1107 outfile.write(progress_func(bytes_written, bytes_expected)) 1108 return write_progress 1109 1110def desired_project_uuid(api_client, project_uuid, num_retries): 1111 if not project_uuid: 1112 query = api_client.users().current() 1113 elif arvados.util.user_uuid_pattern.match(project_uuid): 1114 query = api_client.users().get(uuid=project_uuid) 1115 elif arvados.util.group_uuid_pattern.match(project_uuid): 1116 query = api_client.groups().get(uuid=project_uuid) 1117 else: 1118 raise ValueError("Not a valid project UUID: {}".format(project_uuid)) 1119 return query.execute(num_retries=num_retries)['uuid'] 1120 1121def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, 1122 install_sig_handlers=True): 1123 global api_client 1124 1125 args = parse_arguments(arguments) 1126 logger = logging.getLogger('arvados.arv_put') 1127 if args.silent: 1128 logger.setLevel(logging.WARNING) 1129 else: 1130 logger.setLevel(logging.INFO) 1131 status = 0 1132 1133 request_id = arvados.util.new_request_id() 1134 1135 formatter = ArvPutLogFormatter(request_id) 1136 logging.getLogger('arvados').handlers[0].setFormatter(formatter) 1137 1138 if api_client is None: 1139 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries) 1140 1141 if install_sig_handlers: 1142 arv_cmd.install_signal_handlers() 1143 1144 # Trash arguments validation 1145 trash_at = None 1146 if args.trash_at is not None: 1147 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we 1148 # make sure the user provides a complete YYYY-MM-DD date. 1149 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at): 1150 logger.error("--trash-at argument format invalid, use --help to see examples.") 1151 sys.exit(1) 1152 # Check if no time information was provided. In that case, assume end-of-day. 1153 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at): 1154 args.trash_at += 'T23:59:59' 1155 try: 1156 trash_at = ciso8601.parse_datetime(args.trash_at) 1157 except: 1158 logger.error("--trash-at argument format invalid, use --help to see examples.") 1159 sys.exit(1) 1160 else: 1161 if trash_at.tzinfo is not None: 1162 # Timezone aware datetime provided. 1163 utcoffset = -trash_at.utcoffset() 1164 else: 1165 # Timezone naive datetime provided. Assume is local. 1166 if time.daylight: 1167 utcoffset = datetime.timedelta(seconds=time.altzone) 1168 else: 1169 utcoffset = datetime.timedelta(seconds=time.timezone) 1170 # Convert to UTC timezone naive datetime. 1171 trash_at = trash_at.replace(tzinfo=None) + utcoffset 1172 1173 if trash_at <= datetime.datetime.utcnow(): 1174 logger.error("--trash-at argument must be set in the future") 1175 sys.exit(1) 1176 if args.trash_after is not None: 1177 if args.trash_after < 1: 1178 logger.error("--trash-after argument must be >= 1") 1179 sys.exit(1) 1180 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60)) 1181 1182 # Determine the name to use 1183 if args.name: 1184 if args.stream or args.raw: 1185 logger.error("Cannot use --name with --stream or --raw") 1186 sys.exit(1) 1187 elif args.update_collection: 1188 logger.error("Cannot use --name with --update-collection") 1189 sys.exit(1) 1190 collection_name = args.name 1191 else: 1192 collection_name = "Saved at {} by {}@{}".format( 1193 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), 1194 pwd.getpwuid(os.getuid()).pw_name, 1195 socket.gethostname()) 1196 1197 if args.project_uuid and (args.stream or args.raw): 1198 logger.error("Cannot use --project-uuid with --stream or --raw") 1199 sys.exit(1) 1200 1201 # Determine the parent project 1202 try: 1203 project_uuid = desired_project_uuid(api_client, args.project_uuid, 1204 args.retries) 1205 except (apiclient_errors.Error, ValueError) as error: 1206 logger.error(error) 1207 sys.exit(1) 1208 1209 if args.progress: 1210 reporter = progress_writer(human_progress) 1211 elif args.batch_progress: 1212 reporter = progress_writer(machine_progress) 1213 else: 1214 reporter = None 1215 1216 # Split storage-classes argument 1217 storage_classes = None 1218 if args.storage_classes: 1219 storage_classes = args.storage_classes.strip().replace(' ', '').split(',') 1220 1221 # Setup exclude regex from all the --exclude arguments provided 1222 name_patterns = [] 1223 exclude_paths = [] 1224 exclude_names = None 1225 if len(args.exclude) > 0: 1226 # We're supporting 2 kinds of exclusion patterns: 1227 # 1) --exclude '*.jpg' (file/dir name patterns, will only match 1228 # the name, wherever the file is on the tree) 1229 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the 1230 # entire path, and should be relative to 1231 # any input dir argument) 1232 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs 1233 # placed directly underneath the input dir) 1234 for p in args.exclude: 1235 # Only relative paths patterns allowed 1236 if p.startswith(os.sep): 1237 logger.error("Cannot use absolute paths with --exclude") 1238 sys.exit(1) 1239 if os.path.dirname(p): 1240 # We don't support of path patterns with '..' 1241 p_parts = p.split(os.sep) 1242 if '..' in p_parts: 1243 logger.error( 1244 "Cannot use path patterns that include or '..'") 1245 sys.exit(1) 1246 # Path search pattern 1247 exclude_paths.append(p) 1248 else: 1249 # Name-only search pattern 1250 name_patterns.append(p) 1251 # For name only matching, we can combine all patterns into a single 1252 # regexp, for better performance. 1253 exclude_names = re.compile('|'.join( 1254 [fnmatch.translate(p) for p in name_patterns] 1255 )) if len(name_patterns) > 0 else None 1256 # Show the user the patterns to be used, just in case they weren't 1257 # specified inside quotes and got changed by the shell expansion. 1258 logger.info("Exclude patterns: {}".format(args.exclude)) 1259 1260 # If this is used by a human, and there's at least one directory to be 1261 # uploaded, the expected bytes calculation can take a moment. 1262 if args.progress and any([os.path.isdir(f) for f in args.paths]): 1263 logger.info("Calculating upload size, this could take some time...") 1264 try: 1265 writer = ArvPutUploadJob(paths = args.paths, 1266 resume = args.resume, 1267 use_cache = args.use_cache, 1268 batch_mode= args.batch, 1269 filename = args.filename, 1270 reporter = reporter, 1271 api_client = api_client, 1272 num_retries = args.retries, 1273 replication_desired = args.replication, 1274 put_threads = args.threads, 1275 name = collection_name, 1276 owner_uuid = project_uuid, 1277 ensure_unique_name = True, 1278 update_collection = args.update_collection, 1279 storage_classes=storage_classes, 1280 logger=logger, 1281 dry_run=args.dry_run, 1282 follow_links=args.follow_links, 1283 exclude_paths=exclude_paths, 1284 exclude_names=exclude_names, 1285 trash_at=trash_at) 1286 except ResumeCacheConflict: 1287 logger.error("\n".join([ 1288 "arv-put: Another process is already uploading this data.", 1289 " Use --no-cache if this is really what you want."])) 1290 sys.exit(1) 1291 except ResumeCacheInvalidError: 1292 logger.error("\n".join([ 1293 "arv-put: Resume cache contains invalid signature: it may have expired", 1294 " or been created with another Arvados user's credentials.", 1295 " Switch user or use one of the following options to restart upload:", 1296 " --no-resume to start a new resume cache.", 1297 " --no-cache to disable resume cache.", 1298 " --batch to ignore the resume cache if invalid."])) 1299 sys.exit(1) 1300 except (CollectionUpdateError, PathDoesNotExistError) as error: 1301 logger.error("\n".join([ 1302 "arv-put: %s" % str(error)])) 1303 sys.exit(1) 1304 except ArvPutUploadIsPending: 1305 # Dry run check successful, return proper exit code. 1306 sys.exit(2) 1307 except ArvPutUploadNotPending: 1308 # No files pending for upload 1309 sys.exit(0) 1310 1311 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: 1312 logger.warning("\n".join([ 1313 "arv-put: Resuming previous upload from last checkpoint.", 1314 " Use the --no-resume option to start over."])) 1315 1316 if not args.dry_run: 1317 writer.report_progress() 1318 output = None 1319 try: 1320 writer.start(save_collection=not(args.stream or args.raw)) 1321 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error: 1322 logger.error("\n".join([ 1323 "arv-put: %s" % str(error)])) 1324 sys.exit(1) 1325 1326 if args.progress: # Print newline to split stderr from stdout for humans. 1327 logger.info("\n") 1328 1329 if args.stream: 1330 if args.normalize: 1331 output = writer.manifest_text(normalize=True) 1332 else: 1333 output = writer.manifest_text() 1334 elif args.raw: 1335 output = ','.join(writer.data_locators()) 1336 elif writer.manifest_locator() is not None: 1337 try: 1338 expiration_notice = "" 1339 if writer.collection_trash_at() is not None: 1340 # Get the local timezone-naive version, and log it with timezone information. 1341 if time.daylight: 1342 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone) 1343 else: 1344 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone) 1345 expiration_notice = ". It will expire on {} {}.".format( 1346 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z")) 1347 if args.update_collection: 1348 logger.info(u"Collection updated: '{}'{}".format( 1349 writer.collection_name(), expiration_notice)) 1350 else: 1351 logger.info(u"Collection saved as '{}'{}".format( 1352 writer.collection_name(), expiration_notice)) 1353 if args.portable_data_hash: 1354 output = writer.portable_data_hash() 1355 else: 1356 output = writer.manifest_locator() 1357 except apiclient_errors.Error as error: 1358 logger.error( 1359 "arv-put: Error creating Collection on project: {}.".format( 1360 error)) 1361 status = 1 1362 else: 1363 status = 1 1364 1365 # Print the locator (uuid) of the new collection. 1366 if output is None: 1367 status = status or 1 1368 elif not args.silent: 1369 stdout.write(output) 1370 if not output.endswith('\n'): 1371 stdout.write('\n') 1372 1373 if install_sig_handlers: 1374 arv_cmd.restore_signal_handlers() 1375 1376 if status != 0: 1377 sys.exit(status) 1378 1379 # Success! 1380 return output 1381 1382 1383if __name__ == '__main__': 1384 main()
262def parse_arguments(arguments): 263 args = arg_parser.parse_args(arguments) 264 265 if len(args.paths) == 0: 266 args.paths = ['-'] 267 268 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths] 269 270 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])): 271 arg_parser.error(""" 272 --filename argument cannot be used when storing a directory or 273 multiple files. 274 """) 275 276 # Turn on --progress by default if stderr is a tty. 277 if (not (args.batch_progress or args.no_progress or args.silent) 278 and os.isatty(sys.stderr.fileno())): 279 args.progress = True 280 281 # Turn off --resume (default) if --no-cache is used. 282 if not args.use_cache: 283 args.resume = False 284 285 if args.paths == ['-']: 286 if args.update_collection: 287 arg_parser.error(""" 288 --update-collection cannot be used when reading from stdin. 289 """) 290 args.resume = False 291 args.use_cache = False 292 if not args.filename: 293 args.filename = 'stdin' 294 295 # Remove possible duplicated patterns 296 if len(args.exclude) > 0: 297 args.exclude = list(set(args.exclude)) 298 299 return args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
329class FileUploadList(list): 330 def __init__(self, dry_run=False): 331 list.__init__(self) 332 self.dry_run = dry_run 333 334 def append(self, other): 335 if self.dry_run: 336 raise ArvPutUploadIsPending() 337 super(FileUploadList, self).append(other)
Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
334 def append(self, other): 335 if self.dry_run: 336 raise ArvPutUploadIsPending() 337 super(FileUploadList, self).append(other)
Append object to the end of the list.
Inherited Members
- builtins.list
- clear
- copy
- insert
- extend
- pop
- remove
- index
- count
- reverse
- sort
341class ArvPutLogFormatter(logging.Formatter): 342 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format) 343 err_fmtr = None 344 request_id_informed = False 345 346 def __init__(self, request_id): 347 self.err_fmtr = logging.Formatter( 348 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 349 arvados.log_date_format) 350 351 def format(self, record): 352 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 353 self.request_id_informed = True 354 return self.err_fmtr.format(record) 355 return self.std_fmtr.format(record)
Formatter instances are used to convert a LogRecord to text.
Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, “%(message)s”, “{message}”, or “${message}”, is used.
The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user’s message and arguments are pre- formatted into a LogRecord’s message attribute. Currently, the useful attributes in a LogRecord are described by:
%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG”, “INFO”, “WARNING”, “ERROR”, “CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted
346 def __init__(self, request_id): 347 self.err_fmtr = logging.Formatter( 348 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 349 arvados.log_date_format)
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to
use one of %-formatting, str.format()
({}
) formatting or
string.Template
formatting in your format string.
Changed in version 3.2:
Added the style
parameter.
351 def format(self, record): 352 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 353 self.request_id_informed = True 354 return self.err_fmtr.format(record) 355 return self.std_fmtr.format(record)
Format the specified record as text.
The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
Inherited Members
- logging.Formatter
- converter
- datefmt
- default_time_format
- default_msec_format
- formatTime
- formatException
- usesTime
- formatMessage
- formatStack
358class ResumeCache(object): 359 CACHE_DIR = '.cache/arvados/arv-put' 360 361 def __init__(self, file_spec): 362 self.cache_file = open(file_spec, 'a+') 363 self._lock_file(self.cache_file) 364 self.filename = self.cache_file.name 365 366 @classmethod 367 def make_path(cls, args): 368 md5 = hashlib.md5() 369 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 370 realpaths = sorted(os.path.realpath(path) for path in args.paths) 371 md5.update(b'\0'.join([p.encode() for p in realpaths])) 372 if any(os.path.isdir(path) for path in realpaths): 373 md5.update(b'-1') 374 elif args.filename: 375 md5.update(args.filename.encode()) 376 return os.path.join( 377 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), 378 md5.hexdigest()) 379 380 def _lock_file(self, fileobj): 381 try: 382 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 383 except IOError: 384 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 385 386 def load(self): 387 self.cache_file.seek(0) 388 return json.load(self.cache_file) 389 390 def check_cache(self, api_client=None, num_retries=0): 391 try: 392 state = self.load() 393 locator = None 394 try: 395 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 396 locator = state["_finished_streams"][0][1][0] 397 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 398 locator = state["_current_stream_locators"][0] 399 if locator is not None: 400 kc = arvados.keep.KeepClient(api_client=api_client) 401 kc.head(locator, num_retries=num_retries) 402 except Exception as e: 403 self.restart() 404 except (ValueError): 405 pass 406 407 def save(self, data): 408 try: 409 new_cache_fd, new_cache_name = tempfile.mkstemp( 410 dir=os.path.dirname(self.filename)) 411 self._lock_file(new_cache_fd) 412 new_cache = os.fdopen(new_cache_fd, 'r+') 413 json.dump(data, new_cache) 414 os.rename(new_cache_name, self.filename) 415 except (IOError, OSError, ResumeCacheConflict): 416 try: 417 os.unlink(new_cache_name) 418 except NameError: # mkstemp failed. 419 pass 420 else: 421 self.cache_file.close() 422 self.cache_file = new_cache 423 424 def close(self): 425 self.cache_file.close() 426 427 def destroy(self): 428 try: 429 os.unlink(self.filename) 430 except OSError as error: 431 if error.errno != errno.ENOENT: # That's what we wanted anyway. 432 raise 433 self.close() 434 435 def restart(self): 436 self.destroy() 437 self.__init__(self.filename)
366 @classmethod 367 def make_path(cls, args): 368 md5 = hashlib.md5() 369 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 370 realpaths = sorted(os.path.realpath(path) for path in args.paths) 371 md5.update(b'\0'.join([p.encode() for p in realpaths])) 372 if any(os.path.isdir(path) for path in realpaths): 373 md5.update(b'-1') 374 elif args.filename: 375 md5.update(args.filename.encode()) 376 return os.path.join( 377 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), 378 md5.hexdigest())
390 def check_cache(self, api_client=None, num_retries=0): 391 try: 392 state = self.load() 393 locator = None 394 try: 395 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 396 locator = state["_finished_streams"][0][1][0] 397 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 398 locator = state["_current_stream_locators"][0] 399 if locator is not None: 400 kc = arvados.keep.KeepClient(api_client=api_client) 401 kc.head(locator, num_retries=num_retries) 402 except Exception as e: 403 self.restart() 404 except (ValueError): 405 pass
407 def save(self, data): 408 try: 409 new_cache_fd, new_cache_name = tempfile.mkstemp( 410 dir=os.path.dirname(self.filename)) 411 self._lock_file(new_cache_fd) 412 new_cache = os.fdopen(new_cache_fd, 'r+') 413 json.dump(data, new_cache) 414 os.rename(new_cache_name, self.filename) 415 except (IOError, OSError, ResumeCacheConflict): 416 try: 417 os.unlink(new_cache_name) 418 except NameError: # mkstemp failed. 419 pass 420 else: 421 self.cache_file.close() 422 self.cache_file = new_cache
440class ArvPutUploadJob(object): 441 CACHE_DIR = '.cache/arvados/arv-put' 442 EMPTY_STATE = { 443 'manifest' : None, # Last saved manifest checkpoint 444 'files' : {} # Previous run file list: {path : {size, mtime}} 445 } 446 447 def __init__(self, paths, resume=True, use_cache=True, reporter=None, 448 name=None, owner_uuid=None, api_client=None, batch_mode=False, 449 ensure_unique_name=False, num_retries=None, 450 put_threads=None, replication_desired=None, filename=None, 451 update_time=60.0, update_collection=None, storage_classes=None, 452 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 453 follow_links=True, exclude_paths=[], exclude_names=None, 454 trash_at=None): 455 self.paths = paths 456 self.resume = resume 457 self.use_cache = use_cache 458 self.batch_mode = batch_mode 459 self.update = False 460 self.reporter = reporter 461 # This will set to 0 before start counting, if no special files are going 462 # to be read. 463 self.bytes_expected = None 464 self.bytes_written = 0 465 self.bytes_skipped = 0 466 self.name = name 467 self.owner_uuid = owner_uuid 468 self.ensure_unique_name = ensure_unique_name 469 self.num_retries = num_retries 470 self.replication_desired = replication_desired 471 self.put_threads = put_threads 472 self.filename = filename 473 self.storage_classes = storage_classes 474 self._api_client = api_client 475 self._state_lock = threading.Lock() 476 self._state = None # Previous run state (file list & manifest) 477 self._current_files = [] # Current run file list 478 self._cache_file = None 479 self._collection_lock = threading.Lock() 480 self._remote_collection = None # Collection being updated (if asked) 481 self._local_collection = None # Collection from previous run manifest 482 self._file_paths = set() # Files to be updated in remote collection 483 self._stop_checkpointer = threading.Event() 484 self._checkpointer = threading.Thread(target=self._update_task) 485 self._checkpointer.daemon = True 486 self._update_task_time = update_time # How many seconds wait between update runs 487 self._files_to_upload = FileUploadList(dry_run=dry_run) 488 self._upload_started = False 489 self.logger = logger 490 self.dry_run = dry_run 491 self._checkpoint_before_quit = True 492 self.follow_links = follow_links 493 self.exclude_paths = exclude_paths 494 self.exclude_names = exclude_names 495 self._trash_at = trash_at 496 497 if self._trash_at is not None: 498 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: 499 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') 500 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: 501 raise TypeError('provided trash_at datetime should be timezone-naive') 502 503 if not self.use_cache and self.resume: 504 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 505 506 # Check for obvious dry-run responses 507 if self.dry_run and (not self.use_cache or not self.resume): 508 raise ArvPutUploadIsPending() 509 510 # Load cached data if any and if needed 511 self._setup_state(update_collection) 512 513 # Build the upload file list, excluding requested files and counting the 514 # bytes expected to be uploaded. 515 self._build_upload_list() 516 517 def _build_upload_list(self): 518 """ 519 Scan the requested paths to count file sizes, excluding requested files 520 and dirs and building the upload file list. 521 """ 522 # If there aren't special files to be read, reset total bytes count to zero 523 # to start counting. 524 if not any([p for p in self.paths 525 if not (os.path.isfile(p) or os.path.isdir(p))]): 526 self.bytes_expected = 0 527 528 for path in self.paths: 529 # Test for stdin first, in case some file named '-' exist 530 if path == '-': 531 if self.dry_run: 532 raise ArvPutUploadIsPending() 533 self._write_stdin(self.filename or 'stdin') 534 elif not os.path.exists(path): 535 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path)) 536 elif (not self.follow_links) and os.path.islink(path): 537 self.logger.warning("Skipping symlink '{}'".format(path)) 538 continue 539 elif os.path.isdir(path): 540 # Use absolute paths on cache index so CWD doesn't interfere 541 # with the caching logic. 542 orig_path = path 543 path = os.path.abspath(path) 544 if orig_path[-1:] == os.sep: 545 # When passing a directory reference with a trailing slash, 546 # its contents should be uploaded directly to the 547 # collection's root. 548 prefixdir = path 549 else: 550 # When passing a directory reference with no trailing slash, 551 # upload the directory to the collection's root. 552 prefixdir = os.path.dirname(path) 553 prefixdir += os.sep 554 for root, dirs, files in os.walk(path, 555 followlinks=self.follow_links): 556 root_relpath = os.path.relpath(root, path) 557 if root_relpath == '.': 558 root_relpath = '' 559 # Exclude files/dirs by full path matching pattern 560 if self.exclude_paths: 561 dirs[:] = [d for d in dirs 562 if not any(pathname_match( 563 os.path.join(root_relpath, d), pat) 564 for pat in self.exclude_paths)] 565 files = [f for f in files 566 if not any(pathname_match( 567 os.path.join(root_relpath, f), pat) 568 for pat in self.exclude_paths)] 569 # Exclude files/dirs by name matching pattern 570 if self.exclude_names is not None: 571 dirs[:] = [d for d in dirs 572 if not self.exclude_names.match(d)] 573 files = [f for f in files 574 if not self.exclude_names.match(f)] 575 # Make os.walk()'s dir traversing order deterministic 576 dirs.sort() 577 files.sort() 578 for f in files: 579 filepath = os.path.join(root, f) 580 if not os.path.isfile(filepath): 581 self.logger.warning("Skipping non-regular file '{}'".format(filepath)) 582 continue 583 # Add its size to the total bytes count (if applicable) 584 if self.follow_links or (not os.path.islink(filepath)): 585 if self.bytes_expected is not None: 586 self.bytes_expected += os.path.getsize(filepath) 587 self._check_file(filepath, 588 os.path.join(root[len(prefixdir):], f)) 589 else: 590 filepath = os.path.abspath(path) 591 # Add its size to the total bytes count (if applicable) 592 if self.follow_links or (not os.path.islink(filepath)): 593 if self.bytes_expected is not None: 594 self.bytes_expected += os.path.getsize(filepath) 595 self._check_file(filepath, 596 self.filename or os.path.basename(path)) 597 # If dry-mode is on, and got up to this point, then we should notify that 598 # there aren't any file to upload. 599 if self.dry_run: 600 raise ArvPutUploadNotPending() 601 # Remove local_collection's files that don't exist locally anymore, so the 602 # bytes_written count is correct. 603 for f in self.collection_file_paths(self._local_collection, 604 path_prefix=""): 605 if f != 'stdin' and f != self.filename and not f in self._file_paths: 606 self._local_collection.remove(f) 607 608 def start(self, save_collection): 609 """ 610 Start supporting thread & file uploading 611 """ 612 self._checkpointer.start() 613 try: 614 # Update bytes_written from current local collection and 615 # report initial progress. 616 self._update() 617 # Actual file upload 618 self._upload_started = True # Used by the update thread to start checkpointing 619 self._upload_files() 620 except (SystemExit, Exception) as e: 621 self._checkpoint_before_quit = False 622 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 623 # Note: We're expecting SystemExit instead of 624 # KeyboardInterrupt because we have a custom signal 625 # handler in place that raises SystemExit with the catched 626 # signal's code. 627 if isinstance(e, PathDoesNotExistError): 628 # We aren't interested in the traceback for this case 629 pass 630 elif not isinstance(e, SystemExit) or e.code != -2: 631 self.logger.warning("Abnormal termination:\n{}".format( 632 traceback.format_exc())) 633 raise 634 finally: 635 if not self.dry_run: 636 # Stop the thread before doing anything else 637 self._stop_checkpointer.set() 638 self._checkpointer.join() 639 if self._checkpoint_before_quit: 640 # Commit all pending blocks & one last _update() 641 self._local_collection.manifest_text() 642 self._update(final=True) 643 if save_collection: 644 self.save_collection() 645 if self.use_cache: 646 self._cache_file.close() 647 648 def _collection_trash_at(self): 649 """ 650 Returns the trash date that the collection should use at save time. 651 Takes into account absolute/relative trash_at values requested 652 by the user. 653 """ 654 if type(self._trash_at) == datetime.timedelta: 655 # Get an absolute datetime for trash_at 656 return datetime.datetime.utcnow() + self._trash_at 657 return self._trash_at 658 659 def save_collection(self): 660 if self.update: 661 # Check if files should be updated on the remote collection. 662 for fp in self._file_paths: 663 remote_file = self._remote_collection.find(fp) 664 if not remote_file: 665 # File don't exist on remote collection, copy it. 666 self._remote_collection.copy(fp, fp, self._local_collection) 667 elif remote_file != self._local_collection.find(fp): 668 # A different file exist on remote collection, overwrite it. 669 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 670 else: 671 # The file already exist on remote collection, skip it. 672 pass 673 self._remote_collection.save(num_retries=self.num_retries, 674 trash_at=self._collection_trash_at()) 675 else: 676 if len(self._local_collection) == 0: 677 self.logger.warning("No files were uploaded, skipping collection creation.") 678 return 679 self._local_collection.save_new( 680 name=self.name, owner_uuid=self.owner_uuid, 681 ensure_unique_name=self.ensure_unique_name, 682 num_retries=self.num_retries, 683 trash_at=self._collection_trash_at()) 684 685 def destroy_cache(self): 686 if self.use_cache: 687 try: 688 os.unlink(self._cache_filename) 689 except OSError as error: 690 # That's what we wanted anyway. 691 if error.errno != errno.ENOENT: 692 raise 693 self._cache_file.close() 694 695 def _collection_size(self, collection): 696 """ 697 Recursively get the total size of the collection 698 """ 699 size = 0 700 for item in listvalues(collection): 701 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): 702 size += self._collection_size(item) 703 else: 704 size += item.size() 705 return size 706 707 def _update_task(self): 708 """ 709 Periodically called support task. File uploading is 710 asynchronous so we poll status from the collection. 711 """ 712 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time): 713 self._update() 714 715 def _update(self, final=False): 716 """ 717 Update cached manifest text and report progress. 718 """ 719 if self._upload_started: 720 with self._collection_lock: 721 self.bytes_written = self._collection_size(self._local_collection) 722 if self.use_cache: 723 if final: 724 manifest = self._local_collection.manifest_text() 725 else: 726 # Get the manifest text without comitting pending blocks 727 manifest = self._local_collection.manifest_text(strip=False, 728 normalize=False, 729 only_committed=True) 730 # Update cache 731 with self._state_lock: 732 self._state['manifest'] = manifest 733 if self.use_cache: 734 try: 735 self._save_state() 736 except Exception as e: 737 self.logger.error("Unexpected error trying to save cache file: {}".format(e)) 738 # Keep remote collection's trash_at attribute synced when using relative expire dates 739 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta: 740 try: 741 self._api_client.collections().update( 742 uuid=self._remote_collection.manifest_locator(), 743 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")} 744 ).execute(num_retries=self.num_retries) 745 except Exception as e: 746 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e)) 747 else: 748 self.bytes_written = self.bytes_skipped 749 # Call the reporter, if any 750 self.report_progress() 751 752 def report_progress(self): 753 if self.reporter is not None: 754 self.reporter(self.bytes_written, self.bytes_expected) 755 756 def _write_stdin(self, filename): 757 output = self._local_collection.open(filename, 'wb') 758 self._write(sys.stdin.buffer, output) 759 output.close() 760 761 def _check_file(self, source, filename): 762 """ 763 Check if this file needs to be uploaded 764 """ 765 # Ignore symlinks when requested 766 if (not self.follow_links) and os.path.islink(source): 767 return 768 resume_offset = 0 769 should_upload = False 770 new_file_in_cache = False 771 # Record file path for updating the remote collection before exiting 772 self._file_paths.add(filename) 773 774 with self._state_lock: 775 # If no previous cached data on this file, store it for an eventual 776 # repeated run. 777 if source not in self._state['files']: 778 self._state['files'][source] = { 779 'mtime': os.path.getmtime(source), 780 'size' : os.path.getsize(source) 781 } 782 new_file_in_cache = True 783 cached_file_data = self._state['files'][source] 784 785 # Check if file was already uploaded (at least partially) 786 file_in_local_collection = self._local_collection.find(filename) 787 788 # If not resuming, upload the full file. 789 if not self.resume: 790 should_upload = True 791 # New file detected from last run, upload it. 792 elif new_file_in_cache: 793 should_upload = True 794 # Local file didn't change from last run. 795 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): 796 if not file_in_local_collection: 797 # File not uploaded yet, upload it completely 798 should_upload = True 799 elif file_in_local_collection.permission_expired(): 800 # Permission token expired, re-upload file. This will change whenever 801 # we have a API for refreshing tokens. 802 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename)) 803 should_upload = True 804 self._local_collection.remove(filename) 805 elif cached_file_data['size'] == file_in_local_collection.size(): 806 # File already there, skip it. 807 self.bytes_skipped += cached_file_data['size'] 808 elif cached_file_data['size'] > file_in_local_collection.size(): 809 # File partially uploaded, resume! 810 resume_offset = file_in_local_collection.size() 811 self.bytes_skipped += resume_offset 812 should_upload = True 813 else: 814 # Inconsistent cache, re-upload the file 815 should_upload = True 816 self._local_collection.remove(filename) 817 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) 818 # Local file differs from cached data, re-upload it. 819 else: 820 if file_in_local_collection: 821 self._local_collection.remove(filename) 822 should_upload = True 823 824 if should_upload: 825 try: 826 self._files_to_upload.append((source, resume_offset, filename)) 827 except ArvPutUploadIsPending: 828 # This could happen when running on dry-mode, close cache file to 829 # avoid locking issues. 830 self._cache_file.close() 831 raise 832 833 def _upload_files(self): 834 for source, resume_offset, filename in self._files_to_upload: 835 with open(source, 'rb') as source_fd: 836 with self._state_lock: 837 self._state['files'][source]['mtime'] = os.path.getmtime(source) 838 self._state['files'][source]['size'] = os.path.getsize(source) 839 if resume_offset > 0: 840 # Start upload where we left off 841 output = self._local_collection.open(filename, 'ab') 842 source_fd.seek(resume_offset) 843 else: 844 # Start from scratch 845 output = self._local_collection.open(filename, 'wb') 846 self._write(source_fd, output) 847 output.close(flush=False) 848 849 def _write(self, source_fd, output): 850 while True: 851 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) 852 if not data: 853 break 854 output.write(data) 855 856 def _my_collection(self): 857 return self._remote_collection if self.update else self._local_collection 858 859 def _get_cache_filepath(self): 860 # Set up cache file name from input paths. 861 md5 = hashlib.md5() 862 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 863 realpaths = sorted(os.path.realpath(path) for path in self.paths) 864 md5.update(b'\0'.join([p.encode() for p in realpaths])) 865 if self.filename: 866 md5.update(self.filename.encode()) 867 cache_filename = md5.hexdigest() 868 cache_filepath = os.path.join( 869 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), 870 cache_filename) 871 return cache_filepath 872 873 def _setup_state(self, update_collection): 874 """ 875 Create a new cache file or load a previously existing one. 876 """ 877 # Load an already existing collection for update 878 if update_collection and re.match(arvados.util.collection_uuid_pattern, 879 update_collection): 880 try: 881 self._remote_collection = arvados.collection.Collection( 882 update_collection, 883 api_client=self._api_client, 884 storage_classes_desired=self.storage_classes, 885 num_retries=self.num_retries) 886 except arvados.errors.ApiError as error: 887 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) 888 else: 889 self.update = True 890 elif update_collection: 891 # Collection locator provided, but unknown format 892 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) 893 894 if self.use_cache: 895 cache_filepath = self._get_cache_filepath() 896 if self.resume and os.path.exists(cache_filepath): 897 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath)) 898 self._cache_file = open(cache_filepath, 'a+') 899 else: 900 # --no-resume means start with a empty cache file. 901 self.logger.info(u"Creating new cache file at {}".format(cache_filepath)) 902 self._cache_file = open(cache_filepath, 'w+') 903 self._cache_filename = self._cache_file.name 904 self._lock_file(self._cache_file) 905 self._cache_file.seek(0) 906 907 with self._state_lock: 908 if self.use_cache: 909 try: 910 self._state = json.load(self._cache_file) 911 if not set(['manifest', 'files']).issubset(set(self._state.keys())): 912 # Cache at least partially incomplete, set up new cache 913 self._state = copy.deepcopy(self.EMPTY_STATE) 914 except ValueError: 915 # Cache file empty, set up new cache 916 self._state = copy.deepcopy(self.EMPTY_STATE) 917 else: 918 self.logger.info("No cache usage requested for this run.") 919 # No cache file, set empty state 920 self._state = copy.deepcopy(self.EMPTY_STATE) 921 if not self._cached_manifest_valid(): 922 if not self.batch_mode: 923 raise ResumeCacheInvalidError() 924 else: 925 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name)) 926 self.use_cache = False # Don't overwrite preexisting cache file. 927 self._state = copy.deepcopy(self.EMPTY_STATE) 928 # Load the previous manifest so we can check if files were modified remotely. 929 self._local_collection = arvados.collection.Collection( 930 self._state['manifest'], 931 replication_desired=self.replication_desired, 932 storage_classes_desired=self.storage_classes, 933 put_threads=self.put_threads, 934 api_client=self._api_client, 935 num_retries=self.num_retries) 936 937 def _cached_manifest_valid(self): 938 """ 939 Validate the oldest non-expired block signature to check if cached manifest 940 is usable: checking if the cached manifest was not created with a different 941 arvados account. 942 """ 943 if self._state.get('manifest', None) is None: 944 # No cached manifest yet, all good. 945 return True 946 now = datetime.datetime.utcnow() 947 oldest_exp = None 948 oldest_loc = None 949 block_found = False 950 for m in keep_locator_pattern.finditer(self._state['manifest']): 951 loc = m.group(0) 952 try: 953 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16)) 954 except IndexError: 955 # Locator without signature 956 continue 957 block_found = True 958 if exp > now and (oldest_exp is None or exp < oldest_exp): 959 oldest_exp = exp 960 oldest_loc = loc 961 if not block_found: 962 # No block signatures found => no invalid block signatures. 963 return True 964 if oldest_loc is None: 965 # Locator signatures found, but all have expired. 966 # Reset the cache and move on. 967 self.logger.info('Cache expired, starting from scratch.') 968 self._state['manifest'] = '' 969 return True 970 kc = arvados.KeepClient(api_client=self._api_client, 971 num_retries=self.num_retries) 972 try: 973 kc.head(oldest_loc) 974 except arvados.errors.KeepRequestError: 975 # Something is wrong, cached manifest is not valid. 976 return False 977 return True 978 979 def collection_file_paths(self, col, path_prefix='.'): 980 """Return a list of file paths by recursively go through the entire collection `col`""" 981 file_paths = [] 982 for name, item in listitems(col): 983 if isinstance(item, arvados.arvfile.ArvadosFile): 984 file_paths.append(os.path.join(path_prefix, name)) 985 elif isinstance(item, arvados.collection.Subcollection): 986 new_prefix = os.path.join(path_prefix, name) 987 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 988 return file_paths 989 990 def _lock_file(self, fileobj): 991 try: 992 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 993 except IOError: 994 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 995 996 def _save_state(self): 997 """ 998 Atomically save current state into cache. 999 """ 1000 with self._state_lock: 1001 # We're not using copy.deepcopy() here because it's a lot slower 1002 # than json.dumps(), and we're already needing JSON format to be 1003 # saved on disk. 1004 state = json.dumps(self._state) 1005 try: 1006 new_cache = tempfile.NamedTemporaryFile( 1007 mode='w+', 1008 dir=os.path.dirname(self._cache_filename), delete=False) 1009 self._lock_file(new_cache) 1010 new_cache.write(state) 1011 new_cache.flush() 1012 os.fsync(new_cache) 1013 os.rename(new_cache.name, self._cache_filename) 1014 except (IOError, OSError, ResumeCacheConflict) as error: 1015 self.logger.error("There was a problem while saving the cache file: {}".format(error)) 1016 try: 1017 os.unlink(new_cache_name) 1018 except NameError: # mkstemp failed. 1019 pass 1020 else: 1021 self._cache_file.close() 1022 self._cache_file = new_cache 1023 1024 def collection_name(self): 1025 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None 1026 1027 def collection_trash_at(self): 1028 return self._my_collection().get_trash_at() 1029 1030 def manifest_locator(self): 1031 return self._my_collection().manifest_locator() 1032 1033 def portable_data_hash(self): 1034 pdh = self._my_collection().portable_data_hash() 1035 m = self._my_collection().stripped_manifest().encode() 1036 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 1037 if pdh != local_pdh: 1038 self.logger.warning("\n".join([ 1039 "arv-put: API server provided PDH differs from local manifest.", 1040 " This should not happen; showing API server version."])) 1041 return pdh 1042 1043 def manifest_text(self, stream_name=".", strip=False, normalize=False): 1044 return self._my_collection().manifest_text(stream_name, strip, normalize) 1045 1046 def _datablocks_on_item(self, item): 1047 """ 1048 Return a list of datablock locators, recursively navigating 1049 through subcollections 1050 """ 1051 if isinstance(item, arvados.arvfile.ArvadosFile): 1052 if item.size() == 0: 1053 # Empty file locator 1054 return ["d41d8cd98f00b204e9800998ecf8427e+0"] 1055 else: 1056 locators = [] 1057 for segment in item.segments(): 1058 loc = segment.locator 1059 locators.append(loc) 1060 return locators 1061 elif isinstance(item, arvados.collection.Collection): 1062 l = [self._datablocks_on_item(x) for x in listvalues(item)] 1063 # Fast list flattener method taken from: 1064 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python 1065 return [loc for sublist in l for loc in sublist] 1066 else: 1067 return None 1068 1069 def data_locators(self): 1070 with self._collection_lock: 1071 # Make sure all datablocks are flushed before getting the locators 1072 self._my_collection().manifest_text() 1073 datablocks = self._datablocks_on_item(self._my_collection()) 1074 return datablocks
447 def __init__(self, paths, resume=True, use_cache=True, reporter=None, 448 name=None, owner_uuid=None, api_client=None, batch_mode=False, 449 ensure_unique_name=False, num_retries=None, 450 put_threads=None, replication_desired=None, filename=None, 451 update_time=60.0, update_collection=None, storage_classes=None, 452 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 453 follow_links=True, exclude_paths=[], exclude_names=None, 454 trash_at=None): 455 self.paths = paths 456 self.resume = resume 457 self.use_cache = use_cache 458 self.batch_mode = batch_mode 459 self.update = False 460 self.reporter = reporter 461 # This will set to 0 before start counting, if no special files are going 462 # to be read. 463 self.bytes_expected = None 464 self.bytes_written = 0 465 self.bytes_skipped = 0 466 self.name = name 467 self.owner_uuid = owner_uuid 468 self.ensure_unique_name = ensure_unique_name 469 self.num_retries = num_retries 470 self.replication_desired = replication_desired 471 self.put_threads = put_threads 472 self.filename = filename 473 self.storage_classes = storage_classes 474 self._api_client = api_client 475 self._state_lock = threading.Lock() 476 self._state = None # Previous run state (file list & manifest) 477 self._current_files = [] # Current run file list 478 self._cache_file = None 479 self._collection_lock = threading.Lock() 480 self._remote_collection = None # Collection being updated (if asked) 481 self._local_collection = None # Collection from previous run manifest 482 self._file_paths = set() # Files to be updated in remote collection 483 self._stop_checkpointer = threading.Event() 484 self._checkpointer = threading.Thread(target=self._update_task) 485 self._checkpointer.daemon = True 486 self._update_task_time = update_time # How many seconds wait between update runs 487 self._files_to_upload = FileUploadList(dry_run=dry_run) 488 self._upload_started = False 489 self.logger = logger 490 self.dry_run = dry_run 491 self._checkpoint_before_quit = True 492 self.follow_links = follow_links 493 self.exclude_paths = exclude_paths 494 self.exclude_names = exclude_names 495 self._trash_at = trash_at 496 497 if self._trash_at is not None: 498 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: 499 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') 500 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: 501 raise TypeError('provided trash_at datetime should be timezone-naive') 502 503 if not self.use_cache and self.resume: 504 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 505 506 # Check for obvious dry-run responses 507 if self.dry_run and (not self.use_cache or not self.resume): 508 raise ArvPutUploadIsPending() 509 510 # Load cached data if any and if needed 511 self._setup_state(update_collection) 512 513 # Build the upload file list, excluding requested files and counting the 514 # bytes expected to be uploaded. 515 self._build_upload_list()
608 def start(self, save_collection): 609 """ 610 Start supporting thread & file uploading 611 """ 612 self._checkpointer.start() 613 try: 614 # Update bytes_written from current local collection and 615 # report initial progress. 616 self._update() 617 # Actual file upload 618 self._upload_started = True # Used by the update thread to start checkpointing 619 self._upload_files() 620 except (SystemExit, Exception) as e: 621 self._checkpoint_before_quit = False 622 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 623 # Note: We're expecting SystemExit instead of 624 # KeyboardInterrupt because we have a custom signal 625 # handler in place that raises SystemExit with the catched 626 # signal's code. 627 if isinstance(e, PathDoesNotExistError): 628 # We aren't interested in the traceback for this case 629 pass 630 elif not isinstance(e, SystemExit) or e.code != -2: 631 self.logger.warning("Abnormal termination:\n{}".format( 632 traceback.format_exc())) 633 raise 634 finally: 635 if not self.dry_run: 636 # Stop the thread before doing anything else 637 self._stop_checkpointer.set() 638 self._checkpointer.join() 639 if self._checkpoint_before_quit: 640 # Commit all pending blocks & one last _update() 641 self._local_collection.manifest_text() 642 self._update(final=True) 643 if save_collection: 644 self.save_collection() 645 if self.use_cache: 646 self._cache_file.close()
Start supporting thread & file uploading
659 def save_collection(self): 660 if self.update: 661 # Check if files should be updated on the remote collection. 662 for fp in self._file_paths: 663 remote_file = self._remote_collection.find(fp) 664 if not remote_file: 665 # File don't exist on remote collection, copy it. 666 self._remote_collection.copy(fp, fp, self._local_collection) 667 elif remote_file != self._local_collection.find(fp): 668 # A different file exist on remote collection, overwrite it. 669 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 670 else: 671 # The file already exist on remote collection, skip it. 672 pass 673 self._remote_collection.save(num_retries=self.num_retries, 674 trash_at=self._collection_trash_at()) 675 else: 676 if len(self._local_collection) == 0: 677 self.logger.warning("No files were uploaded, skipping collection creation.") 678 return 679 self._local_collection.save_new( 680 name=self.name, owner_uuid=self.owner_uuid, 681 ensure_unique_name=self.ensure_unique_name, 682 num_retries=self.num_retries, 683 trash_at=self._collection_trash_at())
979 def collection_file_paths(self, col, path_prefix='.'): 980 """Return a list of file paths by recursively go through the entire collection `col`""" 981 file_paths = [] 982 for name, item in listitems(col): 983 if isinstance(item, arvados.arvfile.ArvadosFile): 984 file_paths.append(os.path.join(path_prefix, name)) 985 elif isinstance(item, arvados.collection.Subcollection): 986 new_prefix = os.path.join(path_prefix, name) 987 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 988 return file_paths
Return a list of file paths by recursively go through the entire collection col
1033 def portable_data_hash(self): 1034 pdh = self._my_collection().portable_data_hash() 1035 m = self._my_collection().stripped_manifest().encode() 1036 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 1037 if pdh != local_pdh: 1038 self.logger.warning("\n".join([ 1039 "arv-put: API server provided PDH differs from local manifest.", 1040 " This should not happen; showing API server version."])) 1041 return pdh
1083def pathname_match(pathname, pattern): 1084 name = pathname.split(os.sep) 1085 # Fix patterns like 'some/subdir/' or 'some//subdir' 1086 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.'] 1087 if len(name) != len(pat): 1088 return False 1089 for i in range(len(name)): 1090 if not fnmatch.fnmatch(name[i], pat[i]): 1091 return False 1092 return True
1111def desired_project_uuid(api_client, project_uuid, num_retries): 1112 if not project_uuid: 1113 query = api_client.users().current() 1114 elif arvados.util.user_uuid_pattern.match(project_uuid): 1115 query = api_client.users().get(uuid=project_uuid) 1116 elif arvados.util.group_uuid_pattern.match(project_uuid): 1117 query = api_client.groups().get(uuid=project_uuid) 1118 else: 1119 raise ValueError("Not a valid project UUID: {}".format(project_uuid)) 1120 return query.execute(num_retries=num_retries)['uuid']
1122def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, 1123 install_sig_handlers=True): 1124 global api_client 1125 1126 args = parse_arguments(arguments) 1127 logger = logging.getLogger('arvados.arv_put') 1128 if args.silent: 1129 logger.setLevel(logging.WARNING) 1130 else: 1131 logger.setLevel(logging.INFO) 1132 status = 0 1133 1134 request_id = arvados.util.new_request_id() 1135 1136 formatter = ArvPutLogFormatter(request_id) 1137 logging.getLogger('arvados').handlers[0].setFormatter(formatter) 1138 1139 if api_client is None: 1140 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries) 1141 1142 if install_sig_handlers: 1143 arv_cmd.install_signal_handlers() 1144 1145 # Trash arguments validation 1146 trash_at = None 1147 if args.trash_at is not None: 1148 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we 1149 # make sure the user provides a complete YYYY-MM-DD date. 1150 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at): 1151 logger.error("--trash-at argument format invalid, use --help to see examples.") 1152 sys.exit(1) 1153 # Check if no time information was provided. In that case, assume end-of-day. 1154 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at): 1155 args.trash_at += 'T23:59:59' 1156 try: 1157 trash_at = ciso8601.parse_datetime(args.trash_at) 1158 except: 1159 logger.error("--trash-at argument format invalid, use --help to see examples.") 1160 sys.exit(1) 1161 else: 1162 if trash_at.tzinfo is not None: 1163 # Timezone aware datetime provided. 1164 utcoffset = -trash_at.utcoffset() 1165 else: 1166 # Timezone naive datetime provided. Assume is local. 1167 if time.daylight: 1168 utcoffset = datetime.timedelta(seconds=time.altzone) 1169 else: 1170 utcoffset = datetime.timedelta(seconds=time.timezone) 1171 # Convert to UTC timezone naive datetime. 1172 trash_at = trash_at.replace(tzinfo=None) + utcoffset 1173 1174 if trash_at <= datetime.datetime.utcnow(): 1175 logger.error("--trash-at argument must be set in the future") 1176 sys.exit(1) 1177 if args.trash_after is not None: 1178 if args.trash_after < 1: 1179 logger.error("--trash-after argument must be >= 1") 1180 sys.exit(1) 1181 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60)) 1182 1183 # Determine the name to use 1184 if args.name: 1185 if args.stream or args.raw: 1186 logger.error("Cannot use --name with --stream or --raw") 1187 sys.exit(1) 1188 elif args.update_collection: 1189 logger.error("Cannot use --name with --update-collection") 1190 sys.exit(1) 1191 collection_name = args.name 1192 else: 1193 collection_name = "Saved at {} by {}@{}".format( 1194 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), 1195 pwd.getpwuid(os.getuid()).pw_name, 1196 socket.gethostname()) 1197 1198 if args.project_uuid and (args.stream or args.raw): 1199 logger.error("Cannot use --project-uuid with --stream or --raw") 1200 sys.exit(1) 1201 1202 # Determine the parent project 1203 try: 1204 project_uuid = desired_project_uuid(api_client, args.project_uuid, 1205 args.retries) 1206 except (apiclient_errors.Error, ValueError) as error: 1207 logger.error(error) 1208 sys.exit(1) 1209 1210 if args.progress: 1211 reporter = progress_writer(human_progress) 1212 elif args.batch_progress: 1213 reporter = progress_writer(machine_progress) 1214 else: 1215 reporter = None 1216 1217 # Split storage-classes argument 1218 storage_classes = None 1219 if args.storage_classes: 1220 storage_classes = args.storage_classes.strip().replace(' ', '').split(',') 1221 1222 # Setup exclude regex from all the --exclude arguments provided 1223 name_patterns = [] 1224 exclude_paths = [] 1225 exclude_names = None 1226 if len(args.exclude) > 0: 1227 # We're supporting 2 kinds of exclusion patterns: 1228 # 1) --exclude '*.jpg' (file/dir name patterns, will only match 1229 # the name, wherever the file is on the tree) 1230 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the 1231 # entire path, and should be relative to 1232 # any input dir argument) 1233 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs 1234 # placed directly underneath the input dir) 1235 for p in args.exclude: 1236 # Only relative paths patterns allowed 1237 if p.startswith(os.sep): 1238 logger.error("Cannot use absolute paths with --exclude") 1239 sys.exit(1) 1240 if os.path.dirname(p): 1241 # We don't support of path patterns with '..' 1242 p_parts = p.split(os.sep) 1243 if '..' in p_parts: 1244 logger.error( 1245 "Cannot use path patterns that include or '..'") 1246 sys.exit(1) 1247 # Path search pattern 1248 exclude_paths.append(p) 1249 else: 1250 # Name-only search pattern 1251 name_patterns.append(p) 1252 # For name only matching, we can combine all patterns into a single 1253 # regexp, for better performance. 1254 exclude_names = re.compile('|'.join( 1255 [fnmatch.translate(p) for p in name_patterns] 1256 )) if len(name_patterns) > 0 else None 1257 # Show the user the patterns to be used, just in case they weren't 1258 # specified inside quotes and got changed by the shell expansion. 1259 logger.info("Exclude patterns: {}".format(args.exclude)) 1260 1261 # If this is used by a human, and there's at least one directory to be 1262 # uploaded, the expected bytes calculation can take a moment. 1263 if args.progress and any([os.path.isdir(f) for f in args.paths]): 1264 logger.info("Calculating upload size, this could take some time...") 1265 try: 1266 writer = ArvPutUploadJob(paths = args.paths, 1267 resume = args.resume, 1268 use_cache = args.use_cache, 1269 batch_mode= args.batch, 1270 filename = args.filename, 1271 reporter = reporter, 1272 api_client = api_client, 1273 num_retries = args.retries, 1274 replication_desired = args.replication, 1275 put_threads = args.threads, 1276 name = collection_name, 1277 owner_uuid = project_uuid, 1278 ensure_unique_name = True, 1279 update_collection = args.update_collection, 1280 storage_classes=storage_classes, 1281 logger=logger, 1282 dry_run=args.dry_run, 1283 follow_links=args.follow_links, 1284 exclude_paths=exclude_paths, 1285 exclude_names=exclude_names, 1286 trash_at=trash_at) 1287 except ResumeCacheConflict: 1288 logger.error("\n".join([ 1289 "arv-put: Another process is already uploading this data.", 1290 " Use --no-cache if this is really what you want."])) 1291 sys.exit(1) 1292 except ResumeCacheInvalidError: 1293 logger.error("\n".join([ 1294 "arv-put: Resume cache contains invalid signature: it may have expired", 1295 " or been created with another Arvados user's credentials.", 1296 " Switch user or use one of the following options to restart upload:", 1297 " --no-resume to start a new resume cache.", 1298 " --no-cache to disable resume cache.", 1299 " --batch to ignore the resume cache if invalid."])) 1300 sys.exit(1) 1301 except (CollectionUpdateError, PathDoesNotExistError) as error: 1302 logger.error("\n".join([ 1303 "arv-put: %s" % str(error)])) 1304 sys.exit(1) 1305 except ArvPutUploadIsPending: 1306 # Dry run check successful, return proper exit code. 1307 sys.exit(2) 1308 except ArvPutUploadNotPending: 1309 # No files pending for upload 1310 sys.exit(0) 1311 1312 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: 1313 logger.warning("\n".join([ 1314 "arv-put: Resuming previous upload from last checkpoint.", 1315 " Use the --no-resume option to start over."])) 1316 1317 if not args.dry_run: 1318 writer.report_progress() 1319 output = None 1320 try: 1321 writer.start(save_collection=not(args.stream or args.raw)) 1322 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error: 1323 logger.error("\n".join([ 1324 "arv-put: %s" % str(error)])) 1325 sys.exit(1) 1326 1327 if args.progress: # Print newline to split stderr from stdout for humans. 1328 logger.info("\n") 1329 1330 if args.stream: 1331 if args.normalize: 1332 output = writer.manifest_text(normalize=True) 1333 else: 1334 output = writer.manifest_text() 1335 elif args.raw: 1336 output = ','.join(writer.data_locators()) 1337 elif writer.manifest_locator() is not None: 1338 try: 1339 expiration_notice = "" 1340 if writer.collection_trash_at() is not None: 1341 # Get the local timezone-naive version, and log it with timezone information. 1342 if time.daylight: 1343 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone) 1344 else: 1345 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone) 1346 expiration_notice = ". It will expire on {} {}.".format( 1347 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z")) 1348 if args.update_collection: 1349 logger.info(u"Collection updated: '{}'{}".format( 1350 writer.collection_name(), expiration_notice)) 1351 else: 1352 logger.info(u"Collection saved as '{}'{}".format( 1353 writer.collection_name(), expiration_notice)) 1354 if args.portable_data_hash: 1355 output = writer.portable_data_hash() 1356 else: 1357 output = writer.manifest_locator() 1358 except apiclient_errors.Error as error: 1359 logger.error( 1360 "arv-put: Error creating Collection on project: {}.".format( 1361 error)) 1362 status = 1 1363 else: 1364 status = 1 1365 1366 # Print the locator (uuid) of the new collection. 1367 if output is None: 1368 status = status or 1 1369 elif not args.silent: 1370 stdout.write(output) 1371 if not output.endswith('\n'): 1372 stdout.write('\n') 1373 1374 if install_sig_handlers: 1375 arv_cmd.restore_signal_handlers() 1376 1377 if status != 0: 1378 sys.exit(status) 1379 1380 # Success! 1381 return output