arvados.commands.put
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import argparse 6import arvados 7import arvados.collection 8import base64 9import ciso8601 10import copy 11import datetime 12import errno 13import fcntl 14import fnmatch 15import hashlib 16import json 17import logging 18import os 19import pwd 20import re 21import signal 22import socket 23import sys 24import tempfile 25import threading 26import time 27import traceback 28 29from pathlib import Path 30 31import arvados.util 32import arvados.commands._util as arv_cmd 33 34from apiclient import errors as apiclient_errors 35from arvados._internal import basedirs 36from arvados._version import __version__ 37 38api_client = None 39 40upload_opts = argparse.ArgumentParser(add_help=False) 41 42upload_opts.add_argument('--version', action='version', 43 version="%s %s" % (sys.argv[0], __version__), 44 help='Print version and exit.') 45upload_opts.add_argument('paths', metavar='path', type=str, nargs='*', 46 help=""" 47Local file or directory. If path is a directory reference with a trailing 48slash, then just upload the directory's contents; otherwise upload the 49directory itself. Default: read from standard input. 50""") 51 52_group = upload_opts.add_mutually_exclusive_group() 53 54_group.add_argument('--max-manifest-depth', type=int, metavar='N', 55 default=-1, help=argparse.SUPPRESS) 56 57_group.add_argument('--normalize', action='store_true', 58 help=""" 59Normalize the manifest by re-ordering files and streams after writing 60data. 61""") 62 63_group.add_argument('--dry-run', action='store_true', default=False, 64 help=""" 65Don't actually upload files, but only check if any file should be 66uploaded. Exit with code=2 when files are pending for upload. 67""") 68 69_group = upload_opts.add_mutually_exclusive_group() 70 71_group.add_argument('--as-stream', action='store_true', dest='stream', 72 help=""" 73Synonym for --stream. 74""") 75 76_group.add_argument('--stream', action='store_true', 77 help=""" 78Store the file content and display the resulting manifest on 79stdout. Do not save a Collection object in Arvados. 80""") 81 82_group.add_argument('--as-manifest', action='store_true', dest='manifest', 83 help=""" 84Synonym for --manifest. 85""") 86 87_group.add_argument('--in-manifest', action='store_true', dest='manifest', 88 help=""" 89Synonym for --manifest. 90""") 91 92_group.add_argument('--manifest', action='store_true', 93 help=""" 94Store the file data and resulting manifest in Keep, save a Collection 95object in Arvados, and display the manifest locator (Collection uuid) 96on stdout. This is the default behavior. 97""") 98 99_group.add_argument('--as-raw', action='store_true', dest='raw', 100 help=""" 101Synonym for --raw. 102""") 103 104_group.add_argument('--raw', action='store_true', 105 help=""" 106Store the file content and display the data block locators on stdout, 107separated by commas, with a trailing newline. Do not store a 108manifest. 109""") 110 111upload_opts.add_argument('--update-collection', type=str, default=None, 112 dest='update_collection', metavar="UUID", help=""" 113Update an existing collection identified by the given Arvados collection 114UUID. All new local files will be uploaded. 115""") 116 117upload_opts.add_argument('--use-filename', type=str, default=None, 118 dest='filename', help=""" 119Synonym for --filename. 120""") 121 122upload_opts.add_argument('--filename', type=str, default=None, 123 help=""" 124Use the given filename in the manifest, instead of the name of the 125local file. This is useful when "-" or "/dev/stdin" is given as an 126input file. It can be used only if there is exactly one path given and 127it is not a directory. Implies --manifest. 128""") 129 130upload_opts.add_argument('--portable-data-hash', action='store_true', 131 help=""" 132Print the portable data hash instead of the Arvados UUID for the collection 133created by the upload. 134""") 135 136upload_opts.add_argument('--replication', type=int, metavar='N', default=None, 137 help=""" 138Set the replication level for the new collection: how many different 139physical storage devices (e.g., disks) should have a copy of each data 140block. Default is to use the server-provided default (if any) or 2. 141""") 142 143upload_opts.add_argument( 144 '--storage-classes', 145 type=arv_cmd.UniqueSplit(), 146 help=""" 147Specify comma separated list of storage classes to be used when saving data to Keep. 148""") 149 150upload_opts.add_argument('--threads', type=int, metavar='N', default=None, 151 help=""" 152Set the number of upload threads to be used. Take into account that 153using lots of threads will increase the RAM requirements. Default is 154to use 2 threads. 155On high latency installations, using a greater number will improve 156overall throughput. 157""") 158 159upload_opts.add_argument('--exclude', metavar='PATTERN', default=[], 160 action='append', help=""" 161Exclude files and directories whose names match the given glob pattern. When 162using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir' 163directory, relative to the provided input dirs will be excluded. 164When using a filename pattern like '*.txt', any text file will be excluded 165no matter where it is placed. 166For the special case of needing to exclude only files or dirs directly below 167the given input directory, you can use a pattern like './exclude_this.gif'. 168You can specify multiple patterns by using this argument more than once. 169""") 170 171_group = upload_opts.add_mutually_exclusive_group() 172_group.add_argument('--follow-links', action='store_true', default=True, 173 dest='follow_links', help=""" 174Follow file and directory symlinks (default). 175""") 176_group.add_argument('--no-follow-links', action='store_false', dest='follow_links', 177 help=""" 178Ignore file and directory symlinks. Even paths given explicitly on the 179command line will be skipped if they are symlinks. 180""") 181 182 183run_opts = argparse.ArgumentParser(add_help=False) 184 185run_opts.add_argument('--project-uuid', metavar='UUID', help=""" 186Store the collection in the specified project, instead of your Home 187project. 188""") 189 190run_opts.add_argument('--name', help=""" 191Save the collection with the specified name. 192""") 193 194_group = run_opts.add_mutually_exclusive_group() 195_group.add_argument('--progress', action='store_true', 196 help=""" 197Display human-readable progress on stderr (bytes and, if possible, 198percentage of total data size). This is the default behavior when 199stderr is a tty. 200""") 201 202_group.add_argument('--no-progress', action='store_true', 203 help=""" 204Do not display human-readable progress on stderr, even if stderr is a 205tty. 206""") 207 208_group.add_argument('--batch-progress', action='store_true', 209 help=""" 210Display machine-readable progress on stderr (bytes and, if known, 211total data size). 212""") 213 214run_opts.add_argument('--silent', action='store_true', 215 help=""" 216Do not print any debug messages to console. (Any error messages will 217still be displayed.) 218""") 219 220run_opts.add_argument('--batch', action='store_true', default=False, 221 help=""" 222Retries with '--no-resume --no-cache' if cached state contains invalid/expired 223block signatures. 224""") 225 226_group = run_opts.add_mutually_exclusive_group() 227_group.add_argument('--resume', action='store_true', default=True, 228 help=""" 229Continue interrupted uploads from cached state (default). 230""") 231_group.add_argument('--no-resume', action='store_false', dest='resume', 232 help=""" 233Do not continue interrupted uploads from cached state. 234""") 235 236_group = run_opts.add_mutually_exclusive_group() 237_group.add_argument('--cache', action='store_true', dest='use_cache', default=True, 238 help=""" 239Save upload state in a cache file for resuming (default). 240""") 241_group.add_argument('--no-cache', action='store_false', dest='use_cache', 242 help=""" 243Do not save upload state in a cache file for resuming. 244""") 245 246_group = upload_opts.add_mutually_exclusive_group() 247_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None, 248 help=""" 249Set the trash date of the resulting collection to an absolute date in the future. 250The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n 251Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone. 252""") 253_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None, 254 help=""" 255Set the trash date of the resulting collection to an amount of days from the 256date/time that the upload process finishes. 257""") 258 259arg_parser = argparse.ArgumentParser( 260 description='Copy data from the local filesystem to Keep.', 261 parents=[upload_opts, run_opts, arv_cmd.retry_opt]) 262 263def parse_arguments(arguments): 264 args = arg_parser.parse_args(arguments) 265 266 if len(args.paths) == 0: 267 args.paths = ['-'] 268 269 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths] 270 271 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])): 272 arg_parser.error(""" 273 --filename argument cannot be used when storing a directory or 274 multiple files. 275 """) 276 277 # Turn on --progress by default if stderr is a tty. 278 if (not (args.batch_progress or args.no_progress or args.silent) 279 and os.isatty(sys.stderr.fileno())): 280 args.progress = True 281 282 # Turn off --resume (default) if --no-cache is used. 283 if not args.use_cache: 284 args.resume = False 285 286 if args.paths == ['-']: 287 if args.update_collection: 288 arg_parser.error(""" 289 --update-collection cannot be used when reading from stdin. 290 """) 291 args.resume = False 292 args.use_cache = False 293 if not args.filename: 294 args.filename = 'stdin' 295 296 # Remove possible duplicated patterns 297 if len(args.exclude) > 0: 298 args.exclude = list(set(args.exclude)) 299 300 return args 301 302 303class PathDoesNotExistError(Exception): 304 pass 305 306 307class CollectionUpdateError(Exception): 308 pass 309 310 311class ResumeCacheConflict(Exception): 312 pass 313 314 315class ResumeCacheInvalidError(Exception): 316 pass 317 318class ArvPutArgumentConflict(Exception): 319 pass 320 321 322class ArvPutUploadIsPending(Exception): 323 pass 324 325 326class ArvPutUploadNotPending(Exception): 327 pass 328 329 330class FileUploadList(list): 331 def __init__(self, dry_run=False): 332 list.__init__(self) 333 self.dry_run = dry_run 334 335 def append(self, other): 336 if self.dry_run: 337 raise ArvPutUploadIsPending() 338 super(FileUploadList, self).append(other) 339 340 341# Appends the X-Request-Id to the log message when log level is ERROR or DEBUG 342class ArvPutLogFormatter(logging.Formatter): 343 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format) 344 err_fmtr = None 345 request_id_informed = False 346 347 def __init__(self, request_id): 348 self.err_fmtr = logging.Formatter( 349 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 350 arvados.log_date_format) 351 352 def format(self, record): 353 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 354 self.request_id_informed = True 355 return self.err_fmtr.format(record) 356 return self.std_fmtr.format(record) 357 358 359class ResumeCache(object): 360 CACHE_DIR = 'arv-put' 361 362 def __init__(self, file_spec): 363 self.cache_file = open(file_spec, 'a+') 364 self._lock_file(self.cache_file) 365 self.filename = self.cache_file.name 366 367 @classmethod 368 def make_path(cls, args): 369 md5 = hashlib.md5() 370 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 371 realpaths = sorted(os.path.realpath(path) for path in args.paths) 372 md5.update(b'\0'.join([p.encode() for p in realpaths])) 373 if any(os.path.isdir(path) for path in realpaths): 374 md5.update(b'-1') 375 elif args.filename: 376 md5.update(args.filename.encode()) 377 cache_path = Path(cls.CACHE_DIR) 378 if len(cache_path.parts) == 1: 379 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path) 380 else: 381 # Note this is a noop if cache_path is absolute, which is what we want. 382 cache_path = Path.home() / cache_path 383 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700) 384 return str(cache_path / md5.hexdigest()) 385 386 def _lock_file(self, fileobj): 387 try: 388 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 389 except IOError: 390 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 391 392 def load(self): 393 self.cache_file.seek(0) 394 return json.load(self.cache_file) 395 396 def check_cache(self, api_client=None, num_retries=0): 397 try: 398 state = self.load() 399 locator = None 400 try: 401 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 402 locator = state["_finished_streams"][0][1][0] 403 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 404 locator = state["_current_stream_locators"][0] 405 if locator is not None: 406 kc = arvados.keep.KeepClient(api_client=api_client) 407 kc.head(locator, num_retries=num_retries) 408 except Exception as e: 409 self.restart() 410 except (ValueError): 411 pass 412 413 def save(self, data): 414 try: 415 new_cache_fd, new_cache_name = tempfile.mkstemp( 416 dir=os.path.dirname(self.filename)) 417 self._lock_file(new_cache_fd) 418 new_cache = os.fdopen(new_cache_fd, 'r+') 419 json.dump(data, new_cache) 420 os.rename(new_cache_name, self.filename) 421 except (IOError, OSError, ResumeCacheConflict): 422 try: 423 os.unlink(new_cache_name) 424 except NameError: # mkstemp failed. 425 pass 426 else: 427 self.cache_file.close() 428 self.cache_file = new_cache 429 430 def close(self): 431 self.cache_file.close() 432 433 def destroy(self): 434 try: 435 os.unlink(self.filename) 436 except OSError as error: 437 if error.errno != errno.ENOENT: # That's what we wanted anyway. 438 raise 439 self.close() 440 441 def restart(self): 442 self.destroy() 443 self.__init__(self.filename) 444 445 446class ArvPutUploadJob(object): 447 CACHE_DIR = 'arv-put' 448 EMPTY_STATE = { 449 'manifest' : None, # Last saved manifest checkpoint 450 'files' : {} # Previous run file list: {path : {size, mtime}} 451 } 452 453 def __init__(self, paths, resume=True, use_cache=True, reporter=None, 454 name=None, owner_uuid=None, api_client=None, batch_mode=False, 455 ensure_unique_name=False, num_retries=None, 456 put_threads=None, replication_desired=None, filename=None, 457 update_time=60.0, update_collection=None, storage_classes=None, 458 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 459 follow_links=True, exclude_paths=[], exclude_names=None, 460 trash_at=None): 461 self.paths = paths 462 self.resume = resume 463 self.use_cache = use_cache 464 self.batch_mode = batch_mode 465 self.update = False 466 self.reporter = reporter 467 # This will set to 0 before start counting, if no special files are going 468 # to be read. 469 self.bytes_expected = None 470 self.bytes_written = 0 471 self.bytes_skipped = 0 472 self.name = name 473 self.owner_uuid = owner_uuid 474 self.ensure_unique_name = ensure_unique_name 475 self.num_retries = num_retries 476 self.replication_desired = replication_desired 477 self.put_threads = put_threads 478 self.filename = filename 479 self.storage_classes = storage_classes 480 self._api_client = api_client 481 self._state_lock = threading.Lock() 482 self._state = None # Previous run state (file list & manifest) 483 self._current_files = [] # Current run file list 484 self._cache_file = None 485 self._collection_lock = threading.Lock() 486 self._remote_collection = None # Collection being updated (if asked) 487 self._local_collection = None # Collection from previous run manifest 488 self._file_paths = set() # Files to be updated in remote collection 489 self._stop_checkpointer = threading.Event() 490 self._checkpointer = threading.Thread(target=self._update_task) 491 self._checkpointer.daemon = True 492 self._update_task_time = update_time # How many seconds wait between update runs 493 self._files_to_upload = FileUploadList(dry_run=dry_run) 494 self._upload_started = False 495 self.logger = logger 496 self.dry_run = dry_run 497 self._checkpoint_before_quit = True 498 self.follow_links = follow_links 499 self.exclude_paths = exclude_paths 500 self.exclude_names = exclude_names 501 self._trash_at = trash_at 502 503 if self._trash_at is not None: 504 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: 505 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') 506 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: 507 raise TypeError('provided trash_at datetime should be timezone-naive') 508 509 if not self.use_cache and self.resume: 510 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 511 512 # Check for obvious dry-run responses 513 if self.dry_run and (not self.use_cache or not self.resume): 514 raise ArvPutUploadIsPending() 515 516 # Load cached data if any and if needed 517 self._setup_state(update_collection) 518 519 # Build the upload file list, excluding requested files and counting the 520 # bytes expected to be uploaded. 521 self._build_upload_list() 522 523 def _build_upload_list(self): 524 """ 525 Scan the requested paths to count file sizes, excluding requested files 526 and dirs and building the upload file list. 527 """ 528 # If there aren't special files to be read, reset total bytes count to zero 529 # to start counting. 530 if not any([p for p in self.paths 531 if not (os.path.isfile(p) or os.path.isdir(p))]): 532 self.bytes_expected = 0 533 534 for path in self.paths: 535 # Test for stdin first, in case some file named '-' exist 536 if path == '-': 537 if self.dry_run: 538 raise ArvPutUploadIsPending() 539 self._write_stdin(self.filename or 'stdin') 540 elif not os.path.exists(path): 541 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path)) 542 elif (not self.follow_links) and os.path.islink(path): 543 self.logger.warning("Skipping symlink '{}'".format(path)) 544 continue 545 elif os.path.isdir(path): 546 # Use absolute paths on cache index so CWD doesn't interfere 547 # with the caching logic. 548 orig_path = path 549 path = os.path.abspath(path) 550 if orig_path[-1:] == os.sep: 551 # When passing a directory reference with a trailing slash, 552 # its contents should be uploaded directly to the 553 # collection's root. 554 prefixdir = path 555 else: 556 # When passing a directory reference with no trailing slash, 557 # upload the directory to the collection's root. 558 prefixdir = os.path.dirname(path) 559 prefixdir += os.sep 560 for root, dirs, files in os.walk(path, 561 followlinks=self.follow_links): 562 root_relpath = os.path.relpath(root, path) 563 if root_relpath == '.': 564 root_relpath = '' 565 # Exclude files/dirs by full path matching pattern 566 if self.exclude_paths: 567 dirs[:] = [d for d in dirs 568 if not any(pathname_match( 569 os.path.join(root_relpath, d), pat) 570 for pat in self.exclude_paths)] 571 files = [f for f in files 572 if not any(pathname_match( 573 os.path.join(root_relpath, f), pat) 574 for pat in self.exclude_paths)] 575 # Exclude files/dirs by name matching pattern 576 if self.exclude_names is not None: 577 dirs[:] = [d for d in dirs 578 if not self.exclude_names.match(d)] 579 files = [f for f in files 580 if not self.exclude_names.match(f)] 581 # Make os.walk()'s dir traversing order deterministic 582 dirs.sort() 583 files.sort() 584 for f in files: 585 filepath = os.path.join(root, f) 586 if not os.path.isfile(filepath): 587 self.logger.warning("Skipping non-regular file '{}'".format(filepath)) 588 continue 589 # Add its size to the total bytes count (if applicable) 590 if self.follow_links or (not os.path.islink(filepath)): 591 if self.bytes_expected is not None: 592 self.bytes_expected += os.path.getsize(filepath) 593 self._check_file(filepath, 594 os.path.join(root[len(prefixdir):], f)) 595 else: 596 filepath = os.path.abspath(path) 597 # Add its size to the total bytes count (if applicable) 598 if self.follow_links or (not os.path.islink(filepath)): 599 if self.bytes_expected is not None: 600 self.bytes_expected += os.path.getsize(filepath) 601 self._check_file(filepath, 602 self.filename or os.path.basename(path)) 603 # If dry-mode is on, and got up to this point, then we should notify that 604 # there aren't any file to upload. 605 if self.dry_run: 606 raise ArvPutUploadNotPending() 607 # Remove local_collection's files that don't exist locally anymore, so the 608 # bytes_written count is correct. 609 for f in self.collection_file_paths(self._local_collection, 610 path_prefix=""): 611 if f != 'stdin' and f != self.filename and not f in self._file_paths: 612 self._local_collection.remove(f) 613 614 def start(self, save_collection): 615 """ 616 Start supporting thread & file uploading 617 """ 618 self._checkpointer.start() 619 try: 620 # Update bytes_written from current local collection and 621 # report initial progress. 622 self._update() 623 # Actual file upload 624 self._upload_started = True # Used by the update thread to start checkpointing 625 self._upload_files() 626 except (SystemExit, Exception) as e: 627 self._checkpoint_before_quit = False 628 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 629 # Note: We're expecting SystemExit instead of 630 # KeyboardInterrupt because we have a custom signal 631 # handler in place that raises SystemExit with the catched 632 # signal's code. 633 if isinstance(e, PathDoesNotExistError): 634 # We aren't interested in the traceback for this case 635 pass 636 elif not isinstance(e, SystemExit) or e.code != -2: 637 self.logger.warning("Abnormal termination:\n{}".format( 638 traceback.format_exc())) 639 raise 640 finally: 641 if not self.dry_run: 642 # Stop the thread before doing anything else 643 self._stop_checkpointer.set() 644 self._checkpointer.join() 645 if self._checkpoint_before_quit: 646 # Commit all pending blocks & one last _update() 647 self._local_collection.manifest_text() 648 self._update(final=True) 649 if save_collection: 650 self.save_collection() 651 if self.use_cache: 652 self._cache_file.close() 653 654 def _collection_trash_at(self): 655 """ 656 Returns the trash date that the collection should use at save time. 657 Takes into account absolute/relative trash_at values requested 658 by the user. 659 """ 660 if type(self._trash_at) == datetime.timedelta: 661 # Get an absolute datetime for trash_at 662 return datetime.datetime.utcnow() + self._trash_at 663 return self._trash_at 664 665 def save_collection(self): 666 if self.update: 667 # Check if files should be updated on the remote collection. 668 for fp in self._file_paths: 669 remote_file = self._remote_collection.find(fp) 670 if not remote_file: 671 # File don't exist on remote collection, copy it. 672 self._remote_collection.copy(fp, fp, self._local_collection) 673 elif remote_file != self._local_collection.find(fp): 674 # A different file exist on remote collection, overwrite it. 675 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 676 else: 677 # The file already exist on remote collection, skip it. 678 pass 679 self._remote_collection.save(num_retries=self.num_retries, 680 trash_at=self._collection_trash_at()) 681 else: 682 if len(self._local_collection) == 0: 683 self.logger.warning("No files were uploaded, skipping collection creation.") 684 return 685 self._local_collection.save_new( 686 name=self.name, owner_uuid=self.owner_uuid, 687 ensure_unique_name=self.ensure_unique_name, 688 num_retries=self.num_retries, 689 trash_at=self._collection_trash_at()) 690 691 def destroy_cache(self): 692 if self.use_cache: 693 try: 694 os.unlink(self._cache_filename) 695 except OSError as error: 696 # That's what we wanted anyway. 697 if error.errno != errno.ENOENT: 698 raise 699 self._cache_file.close() 700 701 def _collection_size(self, collection): 702 """ 703 Recursively get the total size of the collection 704 """ 705 size = 0 706 for item in collection.values(): 707 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): 708 size += self._collection_size(item) 709 else: 710 size += item.size() 711 return size 712 713 def _update_task(self): 714 """ 715 Periodically called support task. File uploading is 716 asynchronous so we poll status from the collection. 717 """ 718 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time): 719 self._update() 720 721 def _update(self, final=False): 722 """ 723 Update cached manifest text and report progress. 724 """ 725 if self._upload_started: 726 with self._collection_lock: 727 self.bytes_written = self._collection_size(self._local_collection) 728 if self.use_cache: 729 if final: 730 manifest = self._local_collection.manifest_text() 731 else: 732 # Get the manifest text without comitting pending blocks 733 manifest = self._local_collection.manifest_text(strip=False, 734 normalize=False, 735 only_committed=True) 736 # Update cache 737 with self._state_lock: 738 self._state['manifest'] = manifest 739 if self.use_cache: 740 try: 741 self._save_state() 742 except Exception as e: 743 self.logger.error("Unexpected error trying to save cache file: {}".format(e)) 744 # Keep remote collection's trash_at attribute synced when using relative expire dates 745 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta: 746 try: 747 self._api_client.collections().update( 748 uuid=self._remote_collection.manifest_locator(), 749 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")} 750 ).execute(num_retries=self.num_retries) 751 except Exception as e: 752 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e)) 753 else: 754 self.bytes_written = self.bytes_skipped 755 # Call the reporter, if any 756 self.report_progress() 757 758 def report_progress(self): 759 if self.reporter is not None: 760 self.reporter(self.bytes_written, self.bytes_expected) 761 762 def _write_stdin(self, filename): 763 output = self._local_collection.open(filename, 'wb') 764 self._write(sys.stdin.buffer, output) 765 output.close() 766 767 def _check_file(self, source, filename): 768 """ 769 Check if this file needs to be uploaded 770 """ 771 # Ignore symlinks when requested 772 if (not self.follow_links) and os.path.islink(source): 773 return 774 resume_offset = 0 775 should_upload = False 776 new_file_in_cache = False 777 # Record file path for updating the remote collection before exiting 778 self._file_paths.add(filename) 779 780 with self._state_lock: 781 # If no previous cached data on this file, store it for an eventual 782 # repeated run. 783 if source not in self._state['files']: 784 self._state['files'][source] = { 785 'mtime': os.path.getmtime(source), 786 'size' : os.path.getsize(source) 787 } 788 new_file_in_cache = True 789 cached_file_data = self._state['files'][source] 790 791 # Check if file was already uploaded (at least partially) 792 file_in_local_collection = self._local_collection.find(filename) 793 794 # If not resuming, upload the full file. 795 if not self.resume: 796 should_upload = True 797 # New file detected from last run, upload it. 798 elif new_file_in_cache: 799 should_upload = True 800 # Local file didn't change from last run. 801 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): 802 if not file_in_local_collection: 803 # File not uploaded yet, upload it completely 804 should_upload = True 805 elif file_in_local_collection.permission_expired(): 806 # Permission token expired, re-upload file. This will change whenever 807 # we have a API for refreshing tokens. 808 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename)) 809 should_upload = True 810 self._local_collection.remove(filename) 811 elif cached_file_data['size'] == file_in_local_collection.size(): 812 # File already there, skip it. 813 self.bytes_skipped += cached_file_data['size'] 814 elif cached_file_data['size'] > file_in_local_collection.size(): 815 # File partially uploaded, resume! 816 resume_offset = file_in_local_collection.size() 817 self.bytes_skipped += resume_offset 818 should_upload = True 819 else: 820 # Inconsistent cache, re-upload the file 821 should_upload = True 822 self._local_collection.remove(filename) 823 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) 824 # Local file differs from cached data, re-upload it. 825 else: 826 if file_in_local_collection: 827 self._local_collection.remove(filename) 828 should_upload = True 829 830 if should_upload: 831 try: 832 self._files_to_upload.append((source, resume_offset, filename)) 833 except ArvPutUploadIsPending: 834 # This could happen when running on dry-mode, close cache file to 835 # avoid locking issues. 836 self._cache_file.close() 837 raise 838 839 def _upload_files(self): 840 for source, resume_offset, filename in self._files_to_upload: 841 with open(source, 'rb') as source_fd: 842 with self._state_lock: 843 self._state['files'][source]['mtime'] = os.path.getmtime(source) 844 self._state['files'][source]['size'] = os.path.getsize(source) 845 if resume_offset > 0: 846 # Start upload where we left off 847 output = self._local_collection.open(filename, 'ab') 848 source_fd.seek(resume_offset) 849 else: 850 # Start from scratch 851 output = self._local_collection.open(filename, 'wb') 852 self._write(source_fd, output) 853 output.close(flush=False) 854 855 def _write(self, source_fd, output): 856 while True: 857 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) 858 if not data: 859 break 860 output.write(data) 861 862 def _my_collection(self): 863 return self._remote_collection if self.update else self._local_collection 864 865 def _get_cache_filepath(self): 866 # Set up cache file name from input paths. 867 md5 = hashlib.md5() 868 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 869 realpaths = sorted(os.path.realpath(path) for path in self.paths) 870 md5.update(b'\0'.join([p.encode() for p in realpaths])) 871 if self.filename: 872 md5.update(self.filename.encode()) 873 cache_path = Path(self.CACHE_DIR) 874 if len(cache_path.parts) == 1: 875 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path) 876 else: 877 # Note this is a noop if cache_path is absolute, which is what we want. 878 cache_path = Path.home() / cache_path 879 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700) 880 return str(cache_path / md5.hexdigest()) 881 882 def _setup_state(self, update_collection): 883 """ 884 Create a new cache file or load a previously existing one. 885 """ 886 # Load an already existing collection for update 887 if update_collection and re.match(arvados.util.collection_uuid_pattern, 888 update_collection): 889 try: 890 self._remote_collection = arvados.collection.Collection( 891 update_collection, 892 api_client=self._api_client, 893 storage_classes_desired=self.storage_classes, 894 num_retries=self.num_retries) 895 except arvados.errors.ApiError as error: 896 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) 897 else: 898 self.update = True 899 elif update_collection: 900 # Collection locator provided, but unknown format 901 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) 902 903 if self.use_cache: 904 cache_filepath = self._get_cache_filepath() 905 if self.resume and os.path.exists(cache_filepath): 906 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath)) 907 self._cache_file = open(cache_filepath, 'a+') 908 else: 909 # --no-resume means start with a empty cache file. 910 self.logger.info(u"Creating new cache file at {}".format(cache_filepath)) 911 self._cache_file = open(cache_filepath, 'w+') 912 self._cache_filename = self._cache_file.name 913 self._lock_file(self._cache_file) 914 self._cache_file.seek(0) 915 916 with self._state_lock: 917 if self.use_cache: 918 try: 919 self._state = json.load(self._cache_file) 920 if not set(['manifest', 'files']).issubset(set(self._state.keys())): 921 # Cache at least partially incomplete, set up new cache 922 self._state = copy.deepcopy(self.EMPTY_STATE) 923 except ValueError: 924 # Cache file empty, set up new cache 925 self._state = copy.deepcopy(self.EMPTY_STATE) 926 else: 927 self.logger.info("No cache usage requested for this run.") 928 # No cache file, set empty state 929 self._state = copy.deepcopy(self.EMPTY_STATE) 930 if not self._cached_manifest_valid(): 931 if not self.batch_mode: 932 raise ResumeCacheInvalidError() 933 else: 934 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name)) 935 self.use_cache = False # Don't overwrite preexisting cache file. 936 self._state = copy.deepcopy(self.EMPTY_STATE) 937 # Load the previous manifest so we can check if files were modified remotely. 938 self._local_collection = arvados.collection.Collection( 939 self._state['manifest'], 940 replication_desired=self.replication_desired, 941 storage_classes_desired=self.storage_classes, 942 put_threads=self.put_threads, 943 api_client=self._api_client, 944 num_retries=self.num_retries) 945 946 def _cached_manifest_valid(self): 947 """ 948 Validate the oldest non-expired block signature to check if cached manifest 949 is usable: checking if the cached manifest was not created with a different 950 arvados account. 951 """ 952 if self._state.get('manifest', None) is None: 953 # No cached manifest yet, all good. 954 return True 955 now = datetime.datetime.utcnow() 956 oldest_exp = None 957 oldest_loc = None 958 block_found = False 959 for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']): 960 loc = m.group(0) 961 try: 962 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16)) 963 except IndexError: 964 # Locator without signature 965 continue 966 block_found = True 967 if exp > now and (oldest_exp is None or exp < oldest_exp): 968 oldest_exp = exp 969 oldest_loc = loc 970 if not block_found: 971 # No block signatures found => no invalid block signatures. 972 return True 973 if oldest_loc is None: 974 # Locator signatures found, but all have expired. 975 # Reset the cache and move on. 976 self.logger.info('Cache expired, starting from scratch.') 977 self._state['manifest'] = '' 978 return True 979 kc = arvados.KeepClient(api_client=self._api_client, 980 num_retries=self.num_retries) 981 try: 982 kc.head(oldest_loc) 983 except arvados.errors.KeepRequestError: 984 # Something is wrong, cached manifest is not valid. 985 return False 986 return True 987 988 def collection_file_paths(self, col, path_prefix='.'): 989 """Return a list of file paths by recursively go through the entire collection `col`""" 990 file_paths = [] 991 for name, item in col.items(): 992 if isinstance(item, arvados.arvfile.ArvadosFile): 993 file_paths.append(os.path.join(path_prefix, name)) 994 elif isinstance(item, arvados.collection.Subcollection): 995 new_prefix = os.path.join(path_prefix, name) 996 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 997 return file_paths 998 999 def _lock_file(self, fileobj): 1000 try: 1001 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 1002 except IOError: 1003 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 1004 1005 def _save_state(self): 1006 """ 1007 Atomically save current state into cache. 1008 """ 1009 with self._state_lock: 1010 # We're not using copy.deepcopy() here because it's a lot slower 1011 # than json.dumps(), and we're already needing JSON format to be 1012 # saved on disk. 1013 state = json.dumps(self._state) 1014 try: 1015 new_cache = tempfile.NamedTemporaryFile( 1016 mode='w+', 1017 dir=os.path.dirname(self._cache_filename), delete=False) 1018 self._lock_file(new_cache) 1019 new_cache.write(state) 1020 new_cache.flush() 1021 os.fsync(new_cache) 1022 os.rename(new_cache.name, self._cache_filename) 1023 except (IOError, OSError, ResumeCacheConflict) as error: 1024 self.logger.error("There was a problem while saving the cache file: {}".format(error)) 1025 try: 1026 os.unlink(new_cache_name) 1027 except NameError: # mkstemp failed. 1028 pass 1029 else: 1030 self._cache_file.close() 1031 self._cache_file = new_cache 1032 1033 def collection_name(self): 1034 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None 1035 1036 def collection_trash_at(self): 1037 return self._my_collection().get_trash_at() 1038 1039 def manifest_locator(self): 1040 return self._my_collection().manifest_locator() 1041 1042 def portable_data_hash(self): 1043 pdh = self._my_collection().portable_data_hash() 1044 m = self._my_collection().stripped_manifest().encode() 1045 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 1046 if pdh != local_pdh: 1047 self.logger.warning("\n".join([ 1048 "arv-put: API server provided PDH differs from local manifest.", 1049 " This should not happen; showing API server version."])) 1050 return pdh 1051 1052 def manifest_text(self, stream_name=".", strip=False, normalize=False): 1053 return self._my_collection().manifest_text(stream_name, strip, normalize) 1054 1055 def _datablocks_on_item(self, item): 1056 """ 1057 Return a list of datablock locators, recursively navigating 1058 through subcollections 1059 """ 1060 if isinstance(item, arvados.arvfile.ArvadosFile): 1061 if item.size() == 0: 1062 # Empty file locator 1063 return ["d41d8cd98f00b204e9800998ecf8427e+0"] 1064 else: 1065 locators = [] 1066 for segment in item.segments(): 1067 loc = segment.locator 1068 locators.append(loc) 1069 return locators 1070 elif isinstance(item, arvados.collection.Collection): 1071 l = [self._datablocks_on_item(x) for x in item.values()] 1072 # Fast list flattener method taken from: 1073 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python 1074 return [loc for sublist in l for loc in sublist] 1075 else: 1076 return None 1077 1078 def data_locators(self): 1079 with self._collection_lock: 1080 # Make sure all datablocks are flushed before getting the locators 1081 self._my_collection().manifest_text() 1082 datablocks = self._datablocks_on_item(self._my_collection()) 1083 return datablocks 1084 1085_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0], 1086 os.getpid()) 1087 1088# Simulate glob.glob() matching behavior without the need to scan the filesystem 1089# Note: fnmatch() doesn't work correctly when used with pathnames. For example the 1090# pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py', 1091# so instead we're using it on every path component. 1092def pathname_match(pathname, pattern): 1093 name = pathname.split(os.sep) 1094 # Fix patterns like 'some/subdir/' or 'some//subdir' 1095 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.'] 1096 if len(name) != len(pat): 1097 return False 1098 for i in range(len(name)): 1099 if not fnmatch.fnmatch(name[i], pat[i]): 1100 return False 1101 return True 1102 1103def machine_progress(bytes_written, bytes_expected): 1104 return _machine_format.format( 1105 bytes_written, -1 if (bytes_expected is None) else bytes_expected) 1106 1107def human_progress(bytes_written, bytes_expected): 1108 if bytes_expected: 1109 return "\r{}M / {}M {:.1%} ".format( 1110 bytes_written >> 20, bytes_expected >> 20, 1111 float(bytes_written) / bytes_expected) 1112 else: 1113 return "\r{} ".format(bytes_written) 1114 1115def progress_writer(progress_func, outfile=sys.stderr): 1116 def write_progress(bytes_written, bytes_expected): 1117 outfile.write(progress_func(bytes_written, bytes_expected)) 1118 return write_progress 1119 1120def desired_project_uuid(api_client, project_uuid, num_retries): 1121 if not project_uuid: 1122 query = api_client.users().current() 1123 elif arvados.util.user_uuid_pattern.match(project_uuid): 1124 query = api_client.users().get(uuid=project_uuid) 1125 elif arvados.util.group_uuid_pattern.match(project_uuid): 1126 query = api_client.groups().get(uuid=project_uuid) 1127 else: 1128 raise ValueError("Not a valid project UUID: {}".format(project_uuid)) 1129 return query.execute(num_retries=num_retries)['uuid'] 1130 1131def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, 1132 install_sig_handlers=True): 1133 global api_client 1134 1135 args = parse_arguments(arguments) 1136 logger = logging.getLogger('arvados.arv_put') 1137 if args.silent: 1138 logger.setLevel(logging.WARNING) 1139 else: 1140 logger.setLevel(logging.INFO) 1141 status = 0 1142 1143 request_id = arvados.util.new_request_id() 1144 1145 formatter = ArvPutLogFormatter(request_id) 1146 logging.getLogger('arvados').handlers[0].setFormatter(formatter) 1147 1148 if api_client is None: 1149 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries) 1150 1151 if install_sig_handlers: 1152 arv_cmd.install_signal_handlers() 1153 1154 # Trash arguments validation 1155 trash_at = None 1156 if args.trash_at is not None: 1157 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we 1158 # make sure the user provides a complete YYYY-MM-DD date. 1159 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at): 1160 logger.error("--trash-at argument format invalid, use --help to see examples.") 1161 sys.exit(1) 1162 # Check if no time information was provided. In that case, assume end-of-day. 1163 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at): 1164 args.trash_at += 'T23:59:59' 1165 try: 1166 trash_at = ciso8601.parse_datetime(args.trash_at) 1167 except: 1168 logger.error("--trash-at argument format invalid, use --help to see examples.") 1169 sys.exit(1) 1170 else: 1171 if trash_at.tzinfo is not None: 1172 # Timezone aware datetime provided. 1173 utcoffset = -trash_at.utcoffset() 1174 else: 1175 # Timezone naive datetime provided. Assume is local. 1176 if time.daylight: 1177 utcoffset = datetime.timedelta(seconds=time.altzone) 1178 else: 1179 utcoffset = datetime.timedelta(seconds=time.timezone) 1180 # Convert to UTC timezone naive datetime. 1181 trash_at = trash_at.replace(tzinfo=None) + utcoffset 1182 1183 if trash_at <= datetime.datetime.utcnow(): 1184 logger.error("--trash-at argument must be set in the future") 1185 sys.exit(1) 1186 if args.trash_after is not None: 1187 if args.trash_after < 1: 1188 logger.error("--trash-after argument must be >= 1") 1189 sys.exit(1) 1190 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60)) 1191 1192 # Determine the name to use 1193 if args.name: 1194 if args.stream or args.raw: 1195 logger.error("Cannot use --name with --stream or --raw") 1196 sys.exit(1) 1197 elif args.update_collection: 1198 logger.error("Cannot use --name with --update-collection") 1199 sys.exit(1) 1200 collection_name = args.name 1201 else: 1202 collection_name = "Saved at {} by {}@{}".format( 1203 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), 1204 pwd.getpwuid(os.getuid()).pw_name, 1205 socket.gethostname()) 1206 1207 if args.project_uuid and (args.stream or args.raw): 1208 logger.error("Cannot use --project-uuid with --stream or --raw") 1209 sys.exit(1) 1210 1211 # Determine the parent project 1212 try: 1213 project_uuid = desired_project_uuid(api_client, args.project_uuid, 1214 args.retries) 1215 except (apiclient_errors.Error, ValueError) as error: 1216 logger.error(error) 1217 sys.exit(1) 1218 1219 if args.progress: 1220 reporter = progress_writer(human_progress) 1221 elif args.batch_progress: 1222 reporter = progress_writer(machine_progress) 1223 else: 1224 reporter = None 1225 1226 # Setup exclude regex from all the --exclude arguments provided 1227 name_patterns = [] 1228 exclude_paths = [] 1229 exclude_names = None 1230 if len(args.exclude) > 0: 1231 # We're supporting 2 kinds of exclusion patterns: 1232 # 1) --exclude '*.jpg' (file/dir name patterns, will only match 1233 # the name, wherever the file is on the tree) 1234 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the 1235 # entire path, and should be relative to 1236 # any input dir argument) 1237 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs 1238 # placed directly underneath the input dir) 1239 for p in args.exclude: 1240 # Only relative paths patterns allowed 1241 if p.startswith(os.sep): 1242 logger.error("Cannot use absolute paths with --exclude") 1243 sys.exit(1) 1244 if os.path.dirname(p): 1245 # We don't support of path patterns with '..' 1246 p_parts = p.split(os.sep) 1247 if '..' in p_parts: 1248 logger.error( 1249 "Cannot use path patterns that include or '..'") 1250 sys.exit(1) 1251 # Path search pattern 1252 exclude_paths.append(p) 1253 else: 1254 # Name-only search pattern 1255 name_patterns.append(p) 1256 # For name only matching, we can combine all patterns into a single 1257 # regexp, for better performance. 1258 exclude_names = re.compile('|'.join( 1259 [fnmatch.translate(p) for p in name_patterns] 1260 )) if len(name_patterns) > 0 else None 1261 # Show the user the patterns to be used, just in case they weren't 1262 # specified inside quotes and got changed by the shell expansion. 1263 logger.info("Exclude patterns: {}".format(args.exclude)) 1264 1265 # If this is used by a human, and there's at least one directory to be 1266 # uploaded, the expected bytes calculation can take a moment. 1267 if args.progress and any([os.path.isdir(f) for f in args.paths]): 1268 logger.info("Calculating upload size, this could take some time...") 1269 try: 1270 writer = ArvPutUploadJob(paths = args.paths, 1271 resume = args.resume, 1272 use_cache = args.use_cache, 1273 batch_mode= args.batch, 1274 filename = args.filename, 1275 reporter = reporter, 1276 api_client = api_client, 1277 num_retries = args.retries, 1278 replication_desired = args.replication, 1279 put_threads = args.threads, 1280 name = collection_name, 1281 owner_uuid = project_uuid, 1282 ensure_unique_name = True, 1283 update_collection = args.update_collection, 1284 storage_classes=args.storage_classes, 1285 logger=logger, 1286 dry_run=args.dry_run, 1287 follow_links=args.follow_links, 1288 exclude_paths=exclude_paths, 1289 exclude_names=exclude_names, 1290 trash_at=trash_at) 1291 except ResumeCacheConflict: 1292 logger.error("\n".join([ 1293 "arv-put: Another process is already uploading this data.", 1294 " Use --no-cache if this is really what you want."])) 1295 sys.exit(1) 1296 except ResumeCacheInvalidError: 1297 logger.error("\n".join([ 1298 "arv-put: Resume cache contains invalid signature: it may have expired", 1299 " or been created with another Arvados user's credentials.", 1300 " Switch user or use one of the following options to restart upload:", 1301 " --no-resume to start a new resume cache.", 1302 " --no-cache to disable resume cache.", 1303 " --batch to ignore the resume cache if invalid."])) 1304 sys.exit(1) 1305 except (CollectionUpdateError, PathDoesNotExistError) as error: 1306 logger.error("\n".join([ 1307 "arv-put: %s" % str(error)])) 1308 sys.exit(1) 1309 except ArvPutUploadIsPending: 1310 # Dry run check successful, return proper exit code. 1311 sys.exit(2) 1312 except ArvPutUploadNotPending: 1313 # No files pending for upload 1314 sys.exit(0) 1315 1316 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: 1317 logger.warning("\n".join([ 1318 "arv-put: Resuming previous upload from last checkpoint.", 1319 " Use the --no-resume option to start over."])) 1320 1321 if not args.dry_run: 1322 writer.report_progress() 1323 output = None 1324 try: 1325 writer.start(save_collection=not(args.stream or args.raw)) 1326 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error: 1327 logger.error("\n".join([ 1328 "arv-put: %s" % str(error)])) 1329 sys.exit(1) 1330 1331 if args.progress: # Print newline to split stderr from stdout for humans. 1332 logger.info("\n") 1333 1334 if args.stream: 1335 if args.normalize: 1336 output = writer.manifest_text(normalize=True) 1337 else: 1338 output = writer.manifest_text() 1339 elif args.raw: 1340 output = ','.join(writer.data_locators()) 1341 elif writer.manifest_locator() is not None: 1342 try: 1343 expiration_notice = "" 1344 if writer.collection_trash_at() is not None: 1345 # Get the local timezone-naive version, and log it with timezone information. 1346 if time.daylight: 1347 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone) 1348 else: 1349 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone) 1350 expiration_notice = ". It will expire on {} {}.".format( 1351 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z")) 1352 if args.update_collection: 1353 logger.info(u"Collection updated: '{}'{}".format( 1354 writer.collection_name(), expiration_notice)) 1355 else: 1356 logger.info(u"Collection saved as '{}'{}".format( 1357 writer.collection_name(), expiration_notice)) 1358 if args.portable_data_hash: 1359 output = writer.portable_data_hash() 1360 else: 1361 output = writer.manifest_locator() 1362 except apiclient_errors.Error as error: 1363 logger.error( 1364 "arv-put: Error creating Collection on project: {}.".format( 1365 error)) 1366 status = 1 1367 else: 1368 status = 1 1369 1370 # Print the locator (uuid) of the new collection. 1371 if output is None: 1372 status = status or 1 1373 elif not args.silent: 1374 stdout.write(output) 1375 if not output.endswith('\n'): 1376 stdout.write('\n') 1377 1378 if install_sig_handlers: 1379 arv_cmd.restore_signal_handlers() 1380 1381 if status != 0: 1382 sys.exit(status) 1383 1384 # Success! 1385 return output 1386 1387 1388if __name__ == '__main__': 1389 main()
264def parse_arguments(arguments): 265 args = arg_parser.parse_args(arguments) 266 267 if len(args.paths) == 0: 268 args.paths = ['-'] 269 270 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths] 271 272 if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])): 273 arg_parser.error(""" 274 --filename argument cannot be used when storing a directory or 275 multiple files. 276 """) 277 278 # Turn on --progress by default if stderr is a tty. 279 if (not (args.batch_progress or args.no_progress or args.silent) 280 and os.isatty(sys.stderr.fileno())): 281 args.progress = True 282 283 # Turn off --resume (default) if --no-cache is used. 284 if not args.use_cache: 285 args.resume = False 286 287 if args.paths == ['-']: 288 if args.update_collection: 289 arg_parser.error(""" 290 --update-collection cannot be used when reading from stdin. 291 """) 292 args.resume = False 293 args.use_cache = False 294 if not args.filename: 295 args.filename = 'stdin' 296 297 # Remove possible duplicated patterns 298 if len(args.exclude) > 0: 299 args.exclude = list(set(args.exclude)) 300 301 return args
Common base class for all non-exit exceptions.
Common base class for all non-exit exceptions.
Common base class for all non-exit exceptions.
Common base class for all non-exit exceptions.
Common base class for all non-exit exceptions.
Common base class for all non-exit exceptions.
Common base class for all non-exit exceptions.
331class FileUploadList(list): 332 def __init__(self, dry_run=False): 333 list.__init__(self) 334 self.dry_run = dry_run 335 336 def append(self, other): 337 if self.dry_run: 338 raise ArvPutUploadIsPending() 339 super(FileUploadList, self).append(other)
Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
343class ArvPutLogFormatter(logging.Formatter): 344 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format) 345 err_fmtr = None 346 request_id_informed = False 347 348 def __init__(self, request_id): 349 self.err_fmtr = logging.Formatter( 350 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 351 arvados.log_date_format) 352 353 def format(self, record): 354 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 355 self.request_id_informed = True 356 return self.err_fmtr.format(record) 357 return self.std_fmtr.format(record)
Formatter instances are used to convert a LogRecord to text.
Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, “%(message)s”, “{message}”, or “${message}”, is used.
The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user’s message and arguments are pre- formatted into a LogRecord’s message attribute. Currently, the useful attributes in a LogRecord are described by:
%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG”, “INFO”, “WARNING”, “ERROR”, “CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted
348 def __init__(self, request_id): 349 self.err_fmtr = logging.Formatter( 350 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 351 arvados.log_date_format)
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to
use one of %-formatting, str.format()
({}
) formatting or
string.Template
formatting in your format string.
Changed in version 3.2:
Added the style
parameter.
353 def format(self, record): 354 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 355 self.request_id_informed = True 356 return self.err_fmtr.format(record) 357 return self.std_fmtr.format(record)
Format the specified record as text.
The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
360class ResumeCache(object): 361 CACHE_DIR = 'arv-put' 362 363 def __init__(self, file_spec): 364 self.cache_file = open(file_spec, 'a+') 365 self._lock_file(self.cache_file) 366 self.filename = self.cache_file.name 367 368 @classmethod 369 def make_path(cls, args): 370 md5 = hashlib.md5() 371 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 372 realpaths = sorted(os.path.realpath(path) for path in args.paths) 373 md5.update(b'\0'.join([p.encode() for p in realpaths])) 374 if any(os.path.isdir(path) for path in realpaths): 375 md5.update(b'-1') 376 elif args.filename: 377 md5.update(args.filename.encode()) 378 cache_path = Path(cls.CACHE_DIR) 379 if len(cache_path.parts) == 1: 380 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path) 381 else: 382 # Note this is a noop if cache_path is absolute, which is what we want. 383 cache_path = Path.home() / cache_path 384 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700) 385 return str(cache_path / md5.hexdigest()) 386 387 def _lock_file(self, fileobj): 388 try: 389 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 390 except IOError: 391 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 392 393 def load(self): 394 self.cache_file.seek(0) 395 return json.load(self.cache_file) 396 397 def check_cache(self, api_client=None, num_retries=0): 398 try: 399 state = self.load() 400 locator = None 401 try: 402 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 403 locator = state["_finished_streams"][0][1][0] 404 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 405 locator = state["_current_stream_locators"][0] 406 if locator is not None: 407 kc = arvados.keep.KeepClient(api_client=api_client) 408 kc.head(locator, num_retries=num_retries) 409 except Exception as e: 410 self.restart() 411 except (ValueError): 412 pass 413 414 def save(self, data): 415 try: 416 new_cache_fd, new_cache_name = tempfile.mkstemp( 417 dir=os.path.dirname(self.filename)) 418 self._lock_file(new_cache_fd) 419 new_cache = os.fdopen(new_cache_fd, 'r+') 420 json.dump(data, new_cache) 421 os.rename(new_cache_name, self.filename) 422 except (IOError, OSError, ResumeCacheConflict): 423 try: 424 os.unlink(new_cache_name) 425 except NameError: # mkstemp failed. 426 pass 427 else: 428 self.cache_file.close() 429 self.cache_file = new_cache 430 431 def close(self): 432 self.cache_file.close() 433 434 def destroy(self): 435 try: 436 os.unlink(self.filename) 437 except OSError as error: 438 if error.errno != errno.ENOENT: # That's what we wanted anyway. 439 raise 440 self.close() 441 442 def restart(self): 443 self.destroy() 444 self.__init__(self.filename)
368 @classmethod 369 def make_path(cls, args): 370 md5 = hashlib.md5() 371 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 372 realpaths = sorted(os.path.realpath(path) for path in args.paths) 373 md5.update(b'\0'.join([p.encode() for p in realpaths])) 374 if any(os.path.isdir(path) for path in realpaths): 375 md5.update(b'-1') 376 elif args.filename: 377 md5.update(args.filename.encode()) 378 cache_path = Path(cls.CACHE_DIR) 379 if len(cache_path.parts) == 1: 380 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path) 381 else: 382 # Note this is a noop if cache_path is absolute, which is what we want. 383 cache_path = Path.home() / cache_path 384 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700) 385 return str(cache_path / md5.hexdigest())
397 def check_cache(self, api_client=None, num_retries=0): 398 try: 399 state = self.load() 400 locator = None 401 try: 402 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 403 locator = state["_finished_streams"][0][1][0] 404 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 405 locator = state["_current_stream_locators"][0] 406 if locator is not None: 407 kc = arvados.keep.KeepClient(api_client=api_client) 408 kc.head(locator, num_retries=num_retries) 409 except Exception as e: 410 self.restart() 411 except (ValueError): 412 pass
414 def save(self, data): 415 try: 416 new_cache_fd, new_cache_name = tempfile.mkstemp( 417 dir=os.path.dirname(self.filename)) 418 self._lock_file(new_cache_fd) 419 new_cache = os.fdopen(new_cache_fd, 'r+') 420 json.dump(data, new_cache) 421 os.rename(new_cache_name, self.filename) 422 except (IOError, OSError, ResumeCacheConflict): 423 try: 424 os.unlink(new_cache_name) 425 except NameError: # mkstemp failed. 426 pass 427 else: 428 self.cache_file.close() 429 self.cache_file = new_cache
447class ArvPutUploadJob(object): 448 CACHE_DIR = 'arv-put' 449 EMPTY_STATE = { 450 'manifest' : None, # Last saved manifest checkpoint 451 'files' : {} # Previous run file list: {path : {size, mtime}} 452 } 453 454 def __init__(self, paths, resume=True, use_cache=True, reporter=None, 455 name=None, owner_uuid=None, api_client=None, batch_mode=False, 456 ensure_unique_name=False, num_retries=None, 457 put_threads=None, replication_desired=None, filename=None, 458 update_time=60.0, update_collection=None, storage_classes=None, 459 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 460 follow_links=True, exclude_paths=[], exclude_names=None, 461 trash_at=None): 462 self.paths = paths 463 self.resume = resume 464 self.use_cache = use_cache 465 self.batch_mode = batch_mode 466 self.update = False 467 self.reporter = reporter 468 # This will set to 0 before start counting, if no special files are going 469 # to be read. 470 self.bytes_expected = None 471 self.bytes_written = 0 472 self.bytes_skipped = 0 473 self.name = name 474 self.owner_uuid = owner_uuid 475 self.ensure_unique_name = ensure_unique_name 476 self.num_retries = num_retries 477 self.replication_desired = replication_desired 478 self.put_threads = put_threads 479 self.filename = filename 480 self.storage_classes = storage_classes 481 self._api_client = api_client 482 self._state_lock = threading.Lock() 483 self._state = None # Previous run state (file list & manifest) 484 self._current_files = [] # Current run file list 485 self._cache_file = None 486 self._collection_lock = threading.Lock() 487 self._remote_collection = None # Collection being updated (if asked) 488 self._local_collection = None # Collection from previous run manifest 489 self._file_paths = set() # Files to be updated in remote collection 490 self._stop_checkpointer = threading.Event() 491 self._checkpointer = threading.Thread(target=self._update_task) 492 self._checkpointer.daemon = True 493 self._update_task_time = update_time # How many seconds wait between update runs 494 self._files_to_upload = FileUploadList(dry_run=dry_run) 495 self._upload_started = False 496 self.logger = logger 497 self.dry_run = dry_run 498 self._checkpoint_before_quit = True 499 self.follow_links = follow_links 500 self.exclude_paths = exclude_paths 501 self.exclude_names = exclude_names 502 self._trash_at = trash_at 503 504 if self._trash_at is not None: 505 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: 506 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') 507 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: 508 raise TypeError('provided trash_at datetime should be timezone-naive') 509 510 if not self.use_cache and self.resume: 511 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 512 513 # Check for obvious dry-run responses 514 if self.dry_run and (not self.use_cache or not self.resume): 515 raise ArvPutUploadIsPending() 516 517 # Load cached data if any and if needed 518 self._setup_state(update_collection) 519 520 # Build the upload file list, excluding requested files and counting the 521 # bytes expected to be uploaded. 522 self._build_upload_list() 523 524 def _build_upload_list(self): 525 """ 526 Scan the requested paths to count file sizes, excluding requested files 527 and dirs and building the upload file list. 528 """ 529 # If there aren't special files to be read, reset total bytes count to zero 530 # to start counting. 531 if not any([p for p in self.paths 532 if not (os.path.isfile(p) or os.path.isdir(p))]): 533 self.bytes_expected = 0 534 535 for path in self.paths: 536 # Test for stdin first, in case some file named '-' exist 537 if path == '-': 538 if self.dry_run: 539 raise ArvPutUploadIsPending() 540 self._write_stdin(self.filename or 'stdin') 541 elif not os.path.exists(path): 542 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path)) 543 elif (not self.follow_links) and os.path.islink(path): 544 self.logger.warning("Skipping symlink '{}'".format(path)) 545 continue 546 elif os.path.isdir(path): 547 # Use absolute paths on cache index so CWD doesn't interfere 548 # with the caching logic. 549 orig_path = path 550 path = os.path.abspath(path) 551 if orig_path[-1:] == os.sep: 552 # When passing a directory reference with a trailing slash, 553 # its contents should be uploaded directly to the 554 # collection's root. 555 prefixdir = path 556 else: 557 # When passing a directory reference with no trailing slash, 558 # upload the directory to the collection's root. 559 prefixdir = os.path.dirname(path) 560 prefixdir += os.sep 561 for root, dirs, files in os.walk(path, 562 followlinks=self.follow_links): 563 root_relpath = os.path.relpath(root, path) 564 if root_relpath == '.': 565 root_relpath = '' 566 # Exclude files/dirs by full path matching pattern 567 if self.exclude_paths: 568 dirs[:] = [d for d in dirs 569 if not any(pathname_match( 570 os.path.join(root_relpath, d), pat) 571 for pat in self.exclude_paths)] 572 files = [f for f in files 573 if not any(pathname_match( 574 os.path.join(root_relpath, f), pat) 575 for pat in self.exclude_paths)] 576 # Exclude files/dirs by name matching pattern 577 if self.exclude_names is not None: 578 dirs[:] = [d for d in dirs 579 if not self.exclude_names.match(d)] 580 files = [f for f in files 581 if not self.exclude_names.match(f)] 582 # Make os.walk()'s dir traversing order deterministic 583 dirs.sort() 584 files.sort() 585 for f in files: 586 filepath = os.path.join(root, f) 587 if not os.path.isfile(filepath): 588 self.logger.warning("Skipping non-regular file '{}'".format(filepath)) 589 continue 590 # Add its size to the total bytes count (if applicable) 591 if self.follow_links or (not os.path.islink(filepath)): 592 if self.bytes_expected is not None: 593 self.bytes_expected += os.path.getsize(filepath) 594 self._check_file(filepath, 595 os.path.join(root[len(prefixdir):], f)) 596 else: 597 filepath = os.path.abspath(path) 598 # Add its size to the total bytes count (if applicable) 599 if self.follow_links or (not os.path.islink(filepath)): 600 if self.bytes_expected is not None: 601 self.bytes_expected += os.path.getsize(filepath) 602 self._check_file(filepath, 603 self.filename or os.path.basename(path)) 604 # If dry-mode is on, and got up to this point, then we should notify that 605 # there aren't any file to upload. 606 if self.dry_run: 607 raise ArvPutUploadNotPending() 608 # Remove local_collection's files that don't exist locally anymore, so the 609 # bytes_written count is correct. 610 for f in self.collection_file_paths(self._local_collection, 611 path_prefix=""): 612 if f != 'stdin' and f != self.filename and not f in self._file_paths: 613 self._local_collection.remove(f) 614 615 def start(self, save_collection): 616 """ 617 Start supporting thread & file uploading 618 """ 619 self._checkpointer.start() 620 try: 621 # Update bytes_written from current local collection and 622 # report initial progress. 623 self._update() 624 # Actual file upload 625 self._upload_started = True # Used by the update thread to start checkpointing 626 self._upload_files() 627 except (SystemExit, Exception) as e: 628 self._checkpoint_before_quit = False 629 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 630 # Note: We're expecting SystemExit instead of 631 # KeyboardInterrupt because we have a custom signal 632 # handler in place that raises SystemExit with the catched 633 # signal's code. 634 if isinstance(e, PathDoesNotExistError): 635 # We aren't interested in the traceback for this case 636 pass 637 elif not isinstance(e, SystemExit) or e.code != -2: 638 self.logger.warning("Abnormal termination:\n{}".format( 639 traceback.format_exc())) 640 raise 641 finally: 642 if not self.dry_run: 643 # Stop the thread before doing anything else 644 self._stop_checkpointer.set() 645 self._checkpointer.join() 646 if self._checkpoint_before_quit: 647 # Commit all pending blocks & one last _update() 648 self._local_collection.manifest_text() 649 self._update(final=True) 650 if save_collection: 651 self.save_collection() 652 if self.use_cache: 653 self._cache_file.close() 654 655 def _collection_trash_at(self): 656 """ 657 Returns the trash date that the collection should use at save time. 658 Takes into account absolute/relative trash_at values requested 659 by the user. 660 """ 661 if type(self._trash_at) == datetime.timedelta: 662 # Get an absolute datetime for trash_at 663 return datetime.datetime.utcnow() + self._trash_at 664 return self._trash_at 665 666 def save_collection(self): 667 if self.update: 668 # Check if files should be updated on the remote collection. 669 for fp in self._file_paths: 670 remote_file = self._remote_collection.find(fp) 671 if not remote_file: 672 # File don't exist on remote collection, copy it. 673 self._remote_collection.copy(fp, fp, self._local_collection) 674 elif remote_file != self._local_collection.find(fp): 675 # A different file exist on remote collection, overwrite it. 676 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 677 else: 678 # The file already exist on remote collection, skip it. 679 pass 680 self._remote_collection.save(num_retries=self.num_retries, 681 trash_at=self._collection_trash_at()) 682 else: 683 if len(self._local_collection) == 0: 684 self.logger.warning("No files were uploaded, skipping collection creation.") 685 return 686 self._local_collection.save_new( 687 name=self.name, owner_uuid=self.owner_uuid, 688 ensure_unique_name=self.ensure_unique_name, 689 num_retries=self.num_retries, 690 trash_at=self._collection_trash_at()) 691 692 def destroy_cache(self): 693 if self.use_cache: 694 try: 695 os.unlink(self._cache_filename) 696 except OSError as error: 697 # That's what we wanted anyway. 698 if error.errno != errno.ENOENT: 699 raise 700 self._cache_file.close() 701 702 def _collection_size(self, collection): 703 """ 704 Recursively get the total size of the collection 705 """ 706 size = 0 707 for item in collection.values(): 708 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): 709 size += self._collection_size(item) 710 else: 711 size += item.size() 712 return size 713 714 def _update_task(self): 715 """ 716 Periodically called support task. File uploading is 717 asynchronous so we poll status from the collection. 718 """ 719 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time): 720 self._update() 721 722 def _update(self, final=False): 723 """ 724 Update cached manifest text and report progress. 725 """ 726 if self._upload_started: 727 with self._collection_lock: 728 self.bytes_written = self._collection_size(self._local_collection) 729 if self.use_cache: 730 if final: 731 manifest = self._local_collection.manifest_text() 732 else: 733 # Get the manifest text without comitting pending blocks 734 manifest = self._local_collection.manifest_text(strip=False, 735 normalize=False, 736 only_committed=True) 737 # Update cache 738 with self._state_lock: 739 self._state['manifest'] = manifest 740 if self.use_cache: 741 try: 742 self._save_state() 743 except Exception as e: 744 self.logger.error("Unexpected error trying to save cache file: {}".format(e)) 745 # Keep remote collection's trash_at attribute synced when using relative expire dates 746 if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta: 747 try: 748 self._api_client.collections().update( 749 uuid=self._remote_collection.manifest_locator(), 750 body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")} 751 ).execute(num_retries=self.num_retries) 752 except Exception as e: 753 self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e)) 754 else: 755 self.bytes_written = self.bytes_skipped 756 # Call the reporter, if any 757 self.report_progress() 758 759 def report_progress(self): 760 if self.reporter is not None: 761 self.reporter(self.bytes_written, self.bytes_expected) 762 763 def _write_stdin(self, filename): 764 output = self._local_collection.open(filename, 'wb') 765 self._write(sys.stdin.buffer, output) 766 output.close() 767 768 def _check_file(self, source, filename): 769 """ 770 Check if this file needs to be uploaded 771 """ 772 # Ignore symlinks when requested 773 if (not self.follow_links) and os.path.islink(source): 774 return 775 resume_offset = 0 776 should_upload = False 777 new_file_in_cache = False 778 # Record file path for updating the remote collection before exiting 779 self._file_paths.add(filename) 780 781 with self._state_lock: 782 # If no previous cached data on this file, store it for an eventual 783 # repeated run. 784 if source not in self._state['files']: 785 self._state['files'][source] = { 786 'mtime': os.path.getmtime(source), 787 'size' : os.path.getsize(source) 788 } 789 new_file_in_cache = True 790 cached_file_data = self._state['files'][source] 791 792 # Check if file was already uploaded (at least partially) 793 file_in_local_collection = self._local_collection.find(filename) 794 795 # If not resuming, upload the full file. 796 if not self.resume: 797 should_upload = True 798 # New file detected from last run, upload it. 799 elif new_file_in_cache: 800 should_upload = True 801 # Local file didn't change from last run. 802 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): 803 if not file_in_local_collection: 804 # File not uploaded yet, upload it completely 805 should_upload = True 806 elif file_in_local_collection.permission_expired(): 807 # Permission token expired, re-upload file. This will change whenever 808 # we have a API for refreshing tokens. 809 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename)) 810 should_upload = True 811 self._local_collection.remove(filename) 812 elif cached_file_data['size'] == file_in_local_collection.size(): 813 # File already there, skip it. 814 self.bytes_skipped += cached_file_data['size'] 815 elif cached_file_data['size'] > file_in_local_collection.size(): 816 # File partially uploaded, resume! 817 resume_offset = file_in_local_collection.size() 818 self.bytes_skipped += resume_offset 819 should_upload = True 820 else: 821 # Inconsistent cache, re-upload the file 822 should_upload = True 823 self._local_collection.remove(filename) 824 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) 825 # Local file differs from cached data, re-upload it. 826 else: 827 if file_in_local_collection: 828 self._local_collection.remove(filename) 829 should_upload = True 830 831 if should_upload: 832 try: 833 self._files_to_upload.append((source, resume_offset, filename)) 834 except ArvPutUploadIsPending: 835 # This could happen when running on dry-mode, close cache file to 836 # avoid locking issues. 837 self._cache_file.close() 838 raise 839 840 def _upload_files(self): 841 for source, resume_offset, filename in self._files_to_upload: 842 with open(source, 'rb') as source_fd: 843 with self._state_lock: 844 self._state['files'][source]['mtime'] = os.path.getmtime(source) 845 self._state['files'][source]['size'] = os.path.getsize(source) 846 if resume_offset > 0: 847 # Start upload where we left off 848 output = self._local_collection.open(filename, 'ab') 849 source_fd.seek(resume_offset) 850 else: 851 # Start from scratch 852 output = self._local_collection.open(filename, 'wb') 853 self._write(source_fd, output) 854 output.close(flush=False) 855 856 def _write(self, source_fd, output): 857 while True: 858 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) 859 if not data: 860 break 861 output.write(data) 862 863 def _my_collection(self): 864 return self._remote_collection if self.update else self._local_collection 865 866 def _get_cache_filepath(self): 867 # Set up cache file name from input paths. 868 md5 = hashlib.md5() 869 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 870 realpaths = sorted(os.path.realpath(path) for path in self.paths) 871 md5.update(b'\0'.join([p.encode() for p in realpaths])) 872 if self.filename: 873 md5.update(self.filename.encode()) 874 cache_path = Path(self.CACHE_DIR) 875 if len(cache_path.parts) == 1: 876 cache_path = basedirs.BaseDirectories('CACHE').storage_path(cache_path) 877 else: 878 # Note this is a noop if cache_path is absolute, which is what we want. 879 cache_path = Path.home() / cache_path 880 cache_path.mkdir(parents=True, exist_ok=True, mode=0o700) 881 return str(cache_path / md5.hexdigest()) 882 883 def _setup_state(self, update_collection): 884 """ 885 Create a new cache file or load a previously existing one. 886 """ 887 # Load an already existing collection for update 888 if update_collection and re.match(arvados.util.collection_uuid_pattern, 889 update_collection): 890 try: 891 self._remote_collection = arvados.collection.Collection( 892 update_collection, 893 api_client=self._api_client, 894 storage_classes_desired=self.storage_classes, 895 num_retries=self.num_retries) 896 except arvados.errors.ApiError as error: 897 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) 898 else: 899 self.update = True 900 elif update_collection: 901 # Collection locator provided, but unknown format 902 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) 903 904 if self.use_cache: 905 cache_filepath = self._get_cache_filepath() 906 if self.resume and os.path.exists(cache_filepath): 907 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath)) 908 self._cache_file = open(cache_filepath, 'a+') 909 else: 910 # --no-resume means start with a empty cache file. 911 self.logger.info(u"Creating new cache file at {}".format(cache_filepath)) 912 self._cache_file = open(cache_filepath, 'w+') 913 self._cache_filename = self._cache_file.name 914 self._lock_file(self._cache_file) 915 self._cache_file.seek(0) 916 917 with self._state_lock: 918 if self.use_cache: 919 try: 920 self._state = json.load(self._cache_file) 921 if not set(['manifest', 'files']).issubset(set(self._state.keys())): 922 # Cache at least partially incomplete, set up new cache 923 self._state = copy.deepcopy(self.EMPTY_STATE) 924 except ValueError: 925 # Cache file empty, set up new cache 926 self._state = copy.deepcopy(self.EMPTY_STATE) 927 else: 928 self.logger.info("No cache usage requested for this run.") 929 # No cache file, set empty state 930 self._state = copy.deepcopy(self.EMPTY_STATE) 931 if not self._cached_manifest_valid(): 932 if not self.batch_mode: 933 raise ResumeCacheInvalidError() 934 else: 935 self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name)) 936 self.use_cache = False # Don't overwrite preexisting cache file. 937 self._state = copy.deepcopy(self.EMPTY_STATE) 938 # Load the previous manifest so we can check if files were modified remotely. 939 self._local_collection = arvados.collection.Collection( 940 self._state['manifest'], 941 replication_desired=self.replication_desired, 942 storage_classes_desired=self.storage_classes, 943 put_threads=self.put_threads, 944 api_client=self._api_client, 945 num_retries=self.num_retries) 946 947 def _cached_manifest_valid(self): 948 """ 949 Validate the oldest non-expired block signature to check if cached manifest 950 is usable: checking if the cached manifest was not created with a different 951 arvados account. 952 """ 953 if self._state.get('manifest', None) is None: 954 # No cached manifest yet, all good. 955 return True 956 now = datetime.datetime.utcnow() 957 oldest_exp = None 958 oldest_loc = None 959 block_found = False 960 for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']): 961 loc = m.group(0) 962 try: 963 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16)) 964 except IndexError: 965 # Locator without signature 966 continue 967 block_found = True 968 if exp > now and (oldest_exp is None or exp < oldest_exp): 969 oldest_exp = exp 970 oldest_loc = loc 971 if not block_found: 972 # No block signatures found => no invalid block signatures. 973 return True 974 if oldest_loc is None: 975 # Locator signatures found, but all have expired. 976 # Reset the cache and move on. 977 self.logger.info('Cache expired, starting from scratch.') 978 self._state['manifest'] = '' 979 return True 980 kc = arvados.KeepClient(api_client=self._api_client, 981 num_retries=self.num_retries) 982 try: 983 kc.head(oldest_loc) 984 except arvados.errors.KeepRequestError: 985 # Something is wrong, cached manifest is not valid. 986 return False 987 return True 988 989 def collection_file_paths(self, col, path_prefix='.'): 990 """Return a list of file paths by recursively go through the entire collection `col`""" 991 file_paths = [] 992 for name, item in col.items(): 993 if isinstance(item, arvados.arvfile.ArvadosFile): 994 file_paths.append(os.path.join(path_prefix, name)) 995 elif isinstance(item, arvados.collection.Subcollection): 996 new_prefix = os.path.join(path_prefix, name) 997 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 998 return file_paths 999 1000 def _lock_file(self, fileobj): 1001 try: 1002 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 1003 except IOError: 1004 raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) 1005 1006 def _save_state(self): 1007 """ 1008 Atomically save current state into cache. 1009 """ 1010 with self._state_lock: 1011 # We're not using copy.deepcopy() here because it's a lot slower 1012 # than json.dumps(), and we're already needing JSON format to be 1013 # saved on disk. 1014 state = json.dumps(self._state) 1015 try: 1016 new_cache = tempfile.NamedTemporaryFile( 1017 mode='w+', 1018 dir=os.path.dirname(self._cache_filename), delete=False) 1019 self._lock_file(new_cache) 1020 new_cache.write(state) 1021 new_cache.flush() 1022 os.fsync(new_cache) 1023 os.rename(new_cache.name, self._cache_filename) 1024 except (IOError, OSError, ResumeCacheConflict) as error: 1025 self.logger.error("There was a problem while saving the cache file: {}".format(error)) 1026 try: 1027 os.unlink(new_cache_name) 1028 except NameError: # mkstemp failed. 1029 pass 1030 else: 1031 self._cache_file.close() 1032 self._cache_file = new_cache 1033 1034 def collection_name(self): 1035 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None 1036 1037 def collection_trash_at(self): 1038 return self._my_collection().get_trash_at() 1039 1040 def manifest_locator(self): 1041 return self._my_collection().manifest_locator() 1042 1043 def portable_data_hash(self): 1044 pdh = self._my_collection().portable_data_hash() 1045 m = self._my_collection().stripped_manifest().encode() 1046 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 1047 if pdh != local_pdh: 1048 self.logger.warning("\n".join([ 1049 "arv-put: API server provided PDH differs from local manifest.", 1050 " This should not happen; showing API server version."])) 1051 return pdh 1052 1053 def manifest_text(self, stream_name=".", strip=False, normalize=False): 1054 return self._my_collection().manifest_text(stream_name, strip, normalize) 1055 1056 def _datablocks_on_item(self, item): 1057 """ 1058 Return a list of datablock locators, recursively navigating 1059 through subcollections 1060 """ 1061 if isinstance(item, arvados.arvfile.ArvadosFile): 1062 if item.size() == 0: 1063 # Empty file locator 1064 return ["d41d8cd98f00b204e9800998ecf8427e+0"] 1065 else: 1066 locators = [] 1067 for segment in item.segments(): 1068 loc = segment.locator 1069 locators.append(loc) 1070 return locators 1071 elif isinstance(item, arvados.collection.Collection): 1072 l = [self._datablocks_on_item(x) for x in item.values()] 1073 # Fast list flattener method taken from: 1074 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python 1075 return [loc for sublist in l for loc in sublist] 1076 else: 1077 return None 1078 1079 def data_locators(self): 1080 with self._collection_lock: 1081 # Make sure all datablocks are flushed before getting the locators 1082 self._my_collection().manifest_text() 1083 datablocks = self._datablocks_on_item(self._my_collection()) 1084 return datablocks
454 def __init__(self, paths, resume=True, use_cache=True, reporter=None, 455 name=None, owner_uuid=None, api_client=None, batch_mode=False, 456 ensure_unique_name=False, num_retries=None, 457 put_threads=None, replication_desired=None, filename=None, 458 update_time=60.0, update_collection=None, storage_classes=None, 459 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 460 follow_links=True, exclude_paths=[], exclude_names=None, 461 trash_at=None): 462 self.paths = paths 463 self.resume = resume 464 self.use_cache = use_cache 465 self.batch_mode = batch_mode 466 self.update = False 467 self.reporter = reporter 468 # This will set to 0 before start counting, if no special files are going 469 # to be read. 470 self.bytes_expected = None 471 self.bytes_written = 0 472 self.bytes_skipped = 0 473 self.name = name 474 self.owner_uuid = owner_uuid 475 self.ensure_unique_name = ensure_unique_name 476 self.num_retries = num_retries 477 self.replication_desired = replication_desired 478 self.put_threads = put_threads 479 self.filename = filename 480 self.storage_classes = storage_classes 481 self._api_client = api_client 482 self._state_lock = threading.Lock() 483 self._state = None # Previous run state (file list & manifest) 484 self._current_files = [] # Current run file list 485 self._cache_file = None 486 self._collection_lock = threading.Lock() 487 self._remote_collection = None # Collection being updated (if asked) 488 self._local_collection = None # Collection from previous run manifest 489 self._file_paths = set() # Files to be updated in remote collection 490 self._stop_checkpointer = threading.Event() 491 self._checkpointer = threading.Thread(target=self._update_task) 492 self._checkpointer.daemon = True 493 self._update_task_time = update_time # How many seconds wait between update runs 494 self._files_to_upload = FileUploadList(dry_run=dry_run) 495 self._upload_started = False 496 self.logger = logger 497 self.dry_run = dry_run 498 self._checkpoint_before_quit = True 499 self.follow_links = follow_links 500 self.exclude_paths = exclude_paths 501 self.exclude_names = exclude_names 502 self._trash_at = trash_at 503 504 if self._trash_at is not None: 505 if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: 506 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') 507 if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: 508 raise TypeError('provided trash_at datetime should be timezone-naive') 509 510 if not self.use_cache and self.resume: 511 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 512 513 # Check for obvious dry-run responses 514 if self.dry_run and (not self.use_cache or not self.resume): 515 raise ArvPutUploadIsPending() 516 517 # Load cached data if any and if needed 518 self._setup_state(update_collection) 519 520 # Build the upload file list, excluding requested files and counting the 521 # bytes expected to be uploaded. 522 self._build_upload_list()
615 def start(self, save_collection): 616 """ 617 Start supporting thread & file uploading 618 """ 619 self._checkpointer.start() 620 try: 621 # Update bytes_written from current local collection and 622 # report initial progress. 623 self._update() 624 # Actual file upload 625 self._upload_started = True # Used by the update thread to start checkpointing 626 self._upload_files() 627 except (SystemExit, Exception) as e: 628 self._checkpoint_before_quit = False 629 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 630 # Note: We're expecting SystemExit instead of 631 # KeyboardInterrupt because we have a custom signal 632 # handler in place that raises SystemExit with the catched 633 # signal's code. 634 if isinstance(e, PathDoesNotExistError): 635 # We aren't interested in the traceback for this case 636 pass 637 elif not isinstance(e, SystemExit) or e.code != -2: 638 self.logger.warning("Abnormal termination:\n{}".format( 639 traceback.format_exc())) 640 raise 641 finally: 642 if not self.dry_run: 643 # Stop the thread before doing anything else 644 self._stop_checkpointer.set() 645 self._checkpointer.join() 646 if self._checkpoint_before_quit: 647 # Commit all pending blocks & one last _update() 648 self._local_collection.manifest_text() 649 self._update(final=True) 650 if save_collection: 651 self.save_collection() 652 if self.use_cache: 653 self._cache_file.close()
Start supporting thread & file uploading
666 def save_collection(self): 667 if self.update: 668 # Check if files should be updated on the remote collection. 669 for fp in self._file_paths: 670 remote_file = self._remote_collection.find(fp) 671 if not remote_file: 672 # File don't exist on remote collection, copy it. 673 self._remote_collection.copy(fp, fp, self._local_collection) 674 elif remote_file != self._local_collection.find(fp): 675 # A different file exist on remote collection, overwrite it. 676 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 677 else: 678 # The file already exist on remote collection, skip it. 679 pass 680 self._remote_collection.save(num_retries=self.num_retries, 681 trash_at=self._collection_trash_at()) 682 else: 683 if len(self._local_collection) == 0: 684 self.logger.warning("No files were uploaded, skipping collection creation.") 685 return 686 self._local_collection.save_new( 687 name=self.name, owner_uuid=self.owner_uuid, 688 ensure_unique_name=self.ensure_unique_name, 689 num_retries=self.num_retries, 690 trash_at=self._collection_trash_at())
989 def collection_file_paths(self, col, path_prefix='.'): 990 """Return a list of file paths by recursively go through the entire collection `col`""" 991 file_paths = [] 992 for name, item in col.items(): 993 if isinstance(item, arvados.arvfile.ArvadosFile): 994 file_paths.append(os.path.join(path_prefix, name)) 995 elif isinstance(item, arvados.collection.Subcollection): 996 new_prefix = os.path.join(path_prefix, name) 997 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 998 return file_paths
Return a list of file paths by recursively go through the entire collection col
1043 def portable_data_hash(self): 1044 pdh = self._my_collection().portable_data_hash() 1045 m = self._my_collection().stripped_manifest().encode() 1046 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 1047 if pdh != local_pdh: 1048 self.logger.warning("\n".join([ 1049 "arv-put: API server provided PDH differs from local manifest.", 1050 " This should not happen; showing API server version."])) 1051 return pdh
1093def pathname_match(pathname, pattern): 1094 name = pathname.split(os.sep) 1095 # Fix patterns like 'some/subdir/' or 'some//subdir' 1096 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.'] 1097 if len(name) != len(pat): 1098 return False 1099 for i in range(len(name)): 1100 if not fnmatch.fnmatch(name[i], pat[i]): 1101 return False 1102 return True
1121def desired_project_uuid(api_client, project_uuid, num_retries): 1122 if not project_uuid: 1123 query = api_client.users().current() 1124 elif arvados.util.user_uuid_pattern.match(project_uuid): 1125 query = api_client.users().get(uuid=project_uuid) 1126 elif arvados.util.group_uuid_pattern.match(project_uuid): 1127 query = api_client.groups().get(uuid=project_uuid) 1128 else: 1129 raise ValueError("Not a valid project UUID: {}".format(project_uuid)) 1130 return query.execute(num_retries=num_retries)['uuid']
1132def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, 1133 install_sig_handlers=True): 1134 global api_client 1135 1136 args = parse_arguments(arguments) 1137 logger = logging.getLogger('arvados.arv_put') 1138 if args.silent: 1139 logger.setLevel(logging.WARNING) 1140 else: 1141 logger.setLevel(logging.INFO) 1142 status = 0 1143 1144 request_id = arvados.util.new_request_id() 1145 1146 formatter = ArvPutLogFormatter(request_id) 1147 logging.getLogger('arvados').handlers[0].setFormatter(formatter) 1148 1149 if api_client is None: 1150 api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries) 1151 1152 if install_sig_handlers: 1153 arv_cmd.install_signal_handlers() 1154 1155 # Trash arguments validation 1156 trash_at = None 1157 if args.trash_at is not None: 1158 # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we 1159 # make sure the user provides a complete YYYY-MM-DD date. 1160 if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at): 1161 logger.error("--trash-at argument format invalid, use --help to see examples.") 1162 sys.exit(1) 1163 # Check if no time information was provided. In that case, assume end-of-day. 1164 if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at): 1165 args.trash_at += 'T23:59:59' 1166 try: 1167 trash_at = ciso8601.parse_datetime(args.trash_at) 1168 except: 1169 logger.error("--trash-at argument format invalid, use --help to see examples.") 1170 sys.exit(1) 1171 else: 1172 if trash_at.tzinfo is not None: 1173 # Timezone aware datetime provided. 1174 utcoffset = -trash_at.utcoffset() 1175 else: 1176 # Timezone naive datetime provided. Assume is local. 1177 if time.daylight: 1178 utcoffset = datetime.timedelta(seconds=time.altzone) 1179 else: 1180 utcoffset = datetime.timedelta(seconds=time.timezone) 1181 # Convert to UTC timezone naive datetime. 1182 trash_at = trash_at.replace(tzinfo=None) + utcoffset 1183 1184 if trash_at <= datetime.datetime.utcnow(): 1185 logger.error("--trash-at argument must be set in the future") 1186 sys.exit(1) 1187 if args.trash_after is not None: 1188 if args.trash_after < 1: 1189 logger.error("--trash-after argument must be >= 1") 1190 sys.exit(1) 1191 trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60)) 1192 1193 # Determine the name to use 1194 if args.name: 1195 if args.stream or args.raw: 1196 logger.error("Cannot use --name with --stream or --raw") 1197 sys.exit(1) 1198 elif args.update_collection: 1199 logger.error("Cannot use --name with --update-collection") 1200 sys.exit(1) 1201 collection_name = args.name 1202 else: 1203 collection_name = "Saved at {} by {}@{}".format( 1204 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), 1205 pwd.getpwuid(os.getuid()).pw_name, 1206 socket.gethostname()) 1207 1208 if args.project_uuid and (args.stream or args.raw): 1209 logger.error("Cannot use --project-uuid with --stream or --raw") 1210 sys.exit(1) 1211 1212 # Determine the parent project 1213 try: 1214 project_uuid = desired_project_uuid(api_client, args.project_uuid, 1215 args.retries) 1216 except (apiclient_errors.Error, ValueError) as error: 1217 logger.error(error) 1218 sys.exit(1) 1219 1220 if args.progress: 1221 reporter = progress_writer(human_progress) 1222 elif args.batch_progress: 1223 reporter = progress_writer(machine_progress) 1224 else: 1225 reporter = None 1226 1227 # Setup exclude regex from all the --exclude arguments provided 1228 name_patterns = [] 1229 exclude_paths = [] 1230 exclude_names = None 1231 if len(args.exclude) > 0: 1232 # We're supporting 2 kinds of exclusion patterns: 1233 # 1) --exclude '*.jpg' (file/dir name patterns, will only match 1234 # the name, wherever the file is on the tree) 1235 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the 1236 # entire path, and should be relative to 1237 # any input dir argument) 1238 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs 1239 # placed directly underneath the input dir) 1240 for p in args.exclude: 1241 # Only relative paths patterns allowed 1242 if p.startswith(os.sep): 1243 logger.error("Cannot use absolute paths with --exclude") 1244 sys.exit(1) 1245 if os.path.dirname(p): 1246 # We don't support of path patterns with '..' 1247 p_parts = p.split(os.sep) 1248 if '..' in p_parts: 1249 logger.error( 1250 "Cannot use path patterns that include or '..'") 1251 sys.exit(1) 1252 # Path search pattern 1253 exclude_paths.append(p) 1254 else: 1255 # Name-only search pattern 1256 name_patterns.append(p) 1257 # For name only matching, we can combine all patterns into a single 1258 # regexp, for better performance. 1259 exclude_names = re.compile('|'.join( 1260 [fnmatch.translate(p) for p in name_patterns] 1261 )) if len(name_patterns) > 0 else None 1262 # Show the user the patterns to be used, just in case they weren't 1263 # specified inside quotes and got changed by the shell expansion. 1264 logger.info("Exclude patterns: {}".format(args.exclude)) 1265 1266 # If this is used by a human, and there's at least one directory to be 1267 # uploaded, the expected bytes calculation can take a moment. 1268 if args.progress and any([os.path.isdir(f) for f in args.paths]): 1269 logger.info("Calculating upload size, this could take some time...") 1270 try: 1271 writer = ArvPutUploadJob(paths = args.paths, 1272 resume = args.resume, 1273 use_cache = args.use_cache, 1274 batch_mode= args.batch, 1275 filename = args.filename, 1276 reporter = reporter, 1277 api_client = api_client, 1278 num_retries = args.retries, 1279 replication_desired = args.replication, 1280 put_threads = args.threads, 1281 name = collection_name, 1282 owner_uuid = project_uuid, 1283 ensure_unique_name = True, 1284 update_collection = args.update_collection, 1285 storage_classes=args.storage_classes, 1286 logger=logger, 1287 dry_run=args.dry_run, 1288 follow_links=args.follow_links, 1289 exclude_paths=exclude_paths, 1290 exclude_names=exclude_names, 1291 trash_at=trash_at) 1292 except ResumeCacheConflict: 1293 logger.error("\n".join([ 1294 "arv-put: Another process is already uploading this data.", 1295 " Use --no-cache if this is really what you want."])) 1296 sys.exit(1) 1297 except ResumeCacheInvalidError: 1298 logger.error("\n".join([ 1299 "arv-put: Resume cache contains invalid signature: it may have expired", 1300 " or been created with another Arvados user's credentials.", 1301 " Switch user or use one of the following options to restart upload:", 1302 " --no-resume to start a new resume cache.", 1303 " --no-cache to disable resume cache.", 1304 " --batch to ignore the resume cache if invalid."])) 1305 sys.exit(1) 1306 except (CollectionUpdateError, PathDoesNotExistError) as error: 1307 logger.error("\n".join([ 1308 "arv-put: %s" % str(error)])) 1309 sys.exit(1) 1310 except ArvPutUploadIsPending: 1311 # Dry run check successful, return proper exit code. 1312 sys.exit(2) 1313 except ArvPutUploadNotPending: 1314 # No files pending for upload 1315 sys.exit(0) 1316 1317 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: 1318 logger.warning("\n".join([ 1319 "arv-put: Resuming previous upload from last checkpoint.", 1320 " Use the --no-resume option to start over."])) 1321 1322 if not args.dry_run: 1323 writer.report_progress() 1324 output = None 1325 try: 1326 writer.start(save_collection=not(args.stream or args.raw)) 1327 except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error: 1328 logger.error("\n".join([ 1329 "arv-put: %s" % str(error)])) 1330 sys.exit(1) 1331 1332 if args.progress: # Print newline to split stderr from stdout for humans. 1333 logger.info("\n") 1334 1335 if args.stream: 1336 if args.normalize: 1337 output = writer.manifest_text(normalize=True) 1338 else: 1339 output = writer.manifest_text() 1340 elif args.raw: 1341 output = ','.join(writer.data_locators()) 1342 elif writer.manifest_locator() is not None: 1343 try: 1344 expiration_notice = "" 1345 if writer.collection_trash_at() is not None: 1346 # Get the local timezone-naive version, and log it with timezone information. 1347 if time.daylight: 1348 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone) 1349 else: 1350 local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone) 1351 expiration_notice = ". It will expire on {} {}.".format( 1352 local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z")) 1353 if args.update_collection: 1354 logger.info(u"Collection updated: '{}'{}".format( 1355 writer.collection_name(), expiration_notice)) 1356 else: 1357 logger.info(u"Collection saved as '{}'{}".format( 1358 writer.collection_name(), expiration_notice)) 1359 if args.portable_data_hash: 1360 output = writer.portable_data_hash() 1361 else: 1362 output = writer.manifest_locator() 1363 except apiclient_errors.Error as error: 1364 logger.error( 1365 "arv-put: Error creating Collection on project: {}.".format( 1366 error)) 1367 status = 1 1368 else: 1369 status = 1 1370 1371 # Print the locator (uuid) of the new collection. 1372 if output is None: 1373 status = status or 1 1374 elif not args.silent: 1375 stdout.write(output) 1376 if not output.endswith('\n'): 1377 stdout.write('\n') 1378 1379 if install_sig_handlers: 1380 arv_cmd.restore_signal_handlers() 1381 1382 if status != 0: 1383 sys.exit(status) 1384 1385 # Success! 1386 return output