Package arvados :: Package commands :: Module put
[hide private]
[frames] | no frames]

Source Code for Module arvados.commands.put

   1  # Copyright (C) The Arvados Authors. All rights reserved. 
   2  # 
   3  # SPDX-License-Identifier: Apache-2.0 
   4   
   5  from __future__ import division 
   6  from future.utils import listitems, listvalues 
   7  from builtins import str 
   8  from builtins import object 
   9  import argparse 
  10  import arvados 
  11  import arvados.collection 
  12  import base64 
  13  import copy 
  14  import datetime 
  15  import errno 
  16  import fcntl 
  17  import fnmatch 
  18  import hashlib 
  19  import json 
  20  import logging 
  21  import os 
  22  import pwd 
  23  import re 
  24  import signal 
  25  import socket 
  26  import sys 
  27  import tempfile 
  28  import threading 
  29  import time 
  30  import traceback 
  31   
  32  from apiclient import errors as apiclient_errors 
  33  from arvados._version import __version__ 
  34   
  35  import arvados.commands._util as arv_cmd 
  36   
  37  api_client = None 
  38   
  39  upload_opts = argparse.ArgumentParser(add_help=False) 
  40   
  41  upload_opts.add_argument('--version', action='version', 
  42                           version="%s %s" % (sys.argv[0], __version__), 
  43                           help='Print version and exit.') 
  44  upload_opts.add_argument('paths', metavar='path', type=str, nargs='*', 
  45                           help=""" 
  46  Local file or directory. If path is a directory reference with a trailing 
  47  slash, then just upload the directory's contents; otherwise upload the 
  48  directory itself. Default: read from standard input. 
  49  """) 
  50   
  51  _group = upload_opts.add_mutually_exclusive_group() 
  52   
  53  _group.add_argument('--max-manifest-depth', type=int, metavar='N', 
  54                      default=-1, help=argparse.SUPPRESS) 
  55   
  56  _group.add_argument('--normalize', action='store_true', 
  57                      help=""" 
  58  Normalize the manifest by re-ordering files and streams after writing 
  59  data. 
  60  """) 
  61   
  62  _group.add_argument('--dry-run', action='store_true', default=False, 
  63                      help=""" 
  64  Don't actually upload files, but only check if any file should be 
  65  uploaded. Exit with code=2 when files are pending for upload. 
  66  """) 
  67   
  68  _group = upload_opts.add_mutually_exclusive_group() 
  69   
  70  _group.add_argument('--as-stream', action='store_true', dest='stream', 
  71                      help=""" 
  72  Synonym for --stream. 
  73  """) 
  74   
  75  _group.add_argument('--stream', action='store_true', 
  76                      help=""" 
  77  Store the file content and display the resulting manifest on 
  78  stdout. Do not write the manifest to Keep or save a Collection object 
  79  in Arvados. 
  80  """) 
  81   
  82  _group.add_argument('--as-manifest', action='store_true', dest='manifest', 
  83                      help=""" 
  84  Synonym for --manifest. 
  85  """) 
  86   
  87  _group.add_argument('--in-manifest', action='store_true', dest='manifest', 
  88                      help=""" 
  89  Synonym for --manifest. 
  90  """) 
  91   
  92  _group.add_argument('--manifest', action='store_true', 
  93                      help=""" 
  94  Store the file data and resulting manifest in Keep, save a Collection 
  95  object in Arvados, and display the manifest locator (Collection uuid) 
  96  on stdout. This is the default behavior. 
  97  """) 
  98   
  99  _group.add_argument('--as-raw', action='store_true', dest='raw', 
 100                      help=""" 
 101  Synonym for --raw. 
 102  """) 
 103   
 104  _group.add_argument('--raw', action='store_true', 
 105                      help=""" 
 106  Store the file content and display the data block locators on stdout, 
 107  separated by commas, with a trailing newline. Do not store a 
 108  manifest. 
 109  """) 
 110   
 111  upload_opts.add_argument('--update-collection', type=str, default=None, 
 112                           dest='update_collection', metavar="UUID", help=""" 
 113  Update an existing collection identified by the given Arvados collection 
 114  UUID. All new local files will be uploaded. 
 115  """) 
 116   
 117  upload_opts.add_argument('--use-filename', type=str, default=None, 
 118                           dest='filename', help=""" 
 119  Synonym for --filename. 
 120  """) 
 121   
 122  upload_opts.add_argument('--filename', type=str, default=None, 
 123                           help=""" 
 124  Use the given filename in the manifest, instead of the name of the 
 125  local file. This is useful when "-" or "/dev/stdin" is given as an 
 126  input file. It can be used only if there is exactly one path given and 
 127  it is not a directory. Implies --manifest. 
 128  """) 
 129   
 130  upload_opts.add_argument('--portable-data-hash', action='store_true', 
 131                           help=""" 
 132  Print the portable data hash instead of the Arvados UUID for the collection 
 133  created by the upload. 
 134  """) 
 135   
 136  upload_opts.add_argument('--replication', type=int, metavar='N', default=None, 
 137                           help=""" 
 138  Set the replication level for the new collection: how many different 
 139  physical storage devices (e.g., disks) should have a copy of each data 
 140  block. Default is to use the server-provided default (if any) or 2. 
 141  """) 
 142   
 143  upload_opts.add_argument('--storage-classes', help=""" 
 144  Specify comma separated list of storage classes to be used when saving data to Keep. 
 145  """) 
 146   
 147  upload_opts.add_argument('--threads', type=int, metavar='N', default=None, 
 148                           help=""" 
 149  Set the number of upload threads to be used. Take into account that 
 150  using lots of threads will increase the RAM requirements. Default is 
 151  to use 2 threads. 
 152  On high latency installations, using a greater number will improve 
 153  overall throughput. 
 154  """) 
 155   
 156  run_opts = argparse.ArgumentParser(add_help=False) 
 157   
 158  run_opts.add_argument('--project-uuid', metavar='UUID', help=""" 
 159  Store the collection in the specified project, instead of your Home 
 160  project. 
 161  """) 
 162   
 163  run_opts.add_argument('--name', help=""" 
 164  Save the collection with the specified name. 
 165  """) 
 166   
 167  run_opts.add_argument('--exclude', metavar='PATTERN', default=[], 
 168                        action='append', help=""" 
 169  Exclude files and directories whose names match the given glob pattern. When 
 170  using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir' 
 171  directory, relative to the provided input dirs will be excluded. 
 172  When using a filename pattern like '*.txt', any text file will be excluded 
 173  no matter where is placed. 
 174  For the special case of needing to exclude only files or dirs directly below 
 175  the given input directory, you can use a pattern like './exclude_this.gif'. 
 176  You can specify multiple patterns by using this argument more than once. 
 177  """) 
 178   
 179  _group = run_opts.add_mutually_exclusive_group() 
 180  _group.add_argument('--progress', action='store_true', 
 181                      help=""" 
 182  Display human-readable progress on stderr (bytes and, if possible, 
 183  percentage of total data size). This is the default behavior when 
 184  stderr is a tty. 
 185  """) 
 186   
 187  _group.add_argument('--no-progress', action='store_true', 
 188                      help=""" 
 189  Do not display human-readable progress on stderr, even if stderr is a 
 190  tty. 
 191  """) 
 192   
 193  _group.add_argument('--batch-progress', action='store_true', 
 194                      help=""" 
 195  Display machine-readable progress on stderr (bytes and, if known, 
 196  total data size). 
 197  """) 
 198   
 199  run_opts.add_argument('--silent', action='store_true', 
 200                        help=""" 
 201  Do not print any debug messages to console. (Any error messages will 
 202  still be displayed.) 
 203  """) 
 204   
 205  _group = run_opts.add_mutually_exclusive_group() 
 206  _group.add_argument('--resume', action='store_true', default=True, 
 207                      help=""" 
 208  Continue interrupted uploads from cached state (default). 
 209  """) 
 210  _group.add_argument('--no-resume', action='store_false', dest='resume', 
 211                      help=""" 
 212  Do not continue interrupted uploads from cached state. 
 213  """) 
 214   
 215  _group = run_opts.add_mutually_exclusive_group() 
 216  _group.add_argument('--follow-links', action='store_true', default=True, 
 217                      dest='follow_links', help=""" 
 218  Follow file and directory symlinks (default). 
 219  """) 
 220  _group.add_argument('--no-follow-links', action='store_false', dest='follow_links', 
 221                      help=""" 
 222  Do not follow file and directory symlinks. 
 223  """) 
 224   
 225  _group = run_opts.add_mutually_exclusive_group() 
 226  _group.add_argument('--cache', action='store_true', dest='use_cache', default=True, 
 227                      help=""" 
 228  Save upload state in a cache file for resuming (default). 
 229  """) 
 230  _group.add_argument('--no-cache', action='store_false', dest='use_cache', 
 231                      help=""" 
 232  Do not save upload state in a cache file for resuming. 
 233  """) 
 234   
 235  arg_parser = argparse.ArgumentParser( 
 236      description='Copy data from the local filesystem to Keep.', 
 237      parents=[upload_opts, run_opts, arv_cmd.retry_opt]) 
238 239 -def parse_arguments(arguments):
240 args = arg_parser.parse_args(arguments) 241 242 if len(args.paths) == 0: 243 args.paths = ['-'] 244 245 args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths] 246 247 if len(args.paths) != 1 or os.path.isdir(args.paths[0]): 248 if args.filename: 249 arg_parser.error(""" 250 --filename argument cannot be used when storing a directory or 251 multiple files. 252 """) 253 254 # Turn on --progress by default if stderr is a tty. 255 if (not (args.batch_progress or args.no_progress or args.silent) 256 and os.isatty(sys.stderr.fileno())): 257 args.progress = True 258 259 # Turn off --resume (default) if --no-cache is used. 260 if not args.use_cache: 261 args.resume = False 262 263 if args.paths == ['-']: 264 if args.update_collection: 265 arg_parser.error(""" 266 --update-collection cannot be used when reading from stdin. 267 """) 268 args.resume = False 269 args.use_cache = False 270 if not args.filename: 271 args.filename = 'stdin' 272 273 # Remove possible duplicated patterns 274 if len(args.exclude) > 0: 275 args.exclude = list(set(args.exclude)) 276 277 return args
278
279 280 -class PathDoesNotExistError(Exception):
281 pass
282
283 284 -class CollectionUpdateError(Exception):
285 pass
286
287 288 -class ResumeCacheConflict(Exception):
289 pass
290
291 292 -class ArvPutArgumentConflict(Exception):
293 pass
294
295 296 -class ArvPutUploadIsPending(Exception):
297 pass
298
299 300 -class ArvPutUploadNotPending(Exception):
301 pass
302
303 304 -class FileUploadList(list):
305 - def __init__(self, dry_run=False):
306 list.__init__(self) 307 self.dry_run = dry_run
308
309 - def append(self, other):
310 if self.dry_run: 311 raise ArvPutUploadIsPending() 312 super(FileUploadList, self).append(other)
313
314 315 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG 316 -class ArvPutLogFormatter(logging.Formatter):
317 std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format) 318 err_fmtr = None 319 request_id_informed = False 320
321 - def __init__(self, request_id):
322 self.err_fmtr = logging.Formatter( 323 arvados.log_format+' (X-Request-Id: {})'.format(request_id), 324 arvados.log_date_format)
325
326 - def format(self, record):
327 if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): 328 self.request_id_informed = True 329 return self.err_fmtr.format(record) 330 return self.std_fmtr.format(record)
331
332 333 -class ResumeCache(object):
334 CACHE_DIR = '.cache/arvados/arv-put' 335
336 - def __init__(self, file_spec):
337 self.cache_file = open(file_spec, 'a+') 338 self._lock_file(self.cache_file) 339 self.filename = self.cache_file.name
340 341 @classmethod
342 - def make_path(cls, args):
343 md5 = hashlib.md5() 344 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 345 realpaths = sorted(os.path.realpath(path) for path in args.paths) 346 md5.update(b'\0'.join([p.encode() for p in realpaths])) 347 if any(os.path.isdir(path) for path in realpaths): 348 md5.update(b'-1') 349 elif args.filename: 350 md5.update(args.filename.encode()) 351 return os.path.join( 352 arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), 353 md5.hexdigest())
354
355 - def _lock_file(self, fileobj):
356 try: 357 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 358 except IOError: 359 raise ResumeCacheConflict("{} locked".format(fileobj.name))
360
361 - def load(self):
362 self.cache_file.seek(0) 363 return json.load(self.cache_file)
364
365 - def check_cache(self, api_client=None, num_retries=0):
366 try: 367 state = self.load() 368 locator = None 369 try: 370 if "_finished_streams" in state and len(state["_finished_streams"]) > 0: 371 locator = state["_finished_streams"][0][1][0] 372 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: 373 locator = state["_current_stream_locators"][0] 374 if locator is not None: 375 kc = arvados.keep.KeepClient(api_client=api_client) 376 kc.head(locator, num_retries=num_retries) 377 except Exception as e: 378 self.restart() 379 except (ValueError): 380 pass
381
382 - def save(self, data):
383 try: 384 new_cache_fd, new_cache_name = tempfile.mkstemp( 385 dir=os.path.dirname(self.filename)) 386 self._lock_file(new_cache_fd) 387 new_cache = os.fdopen(new_cache_fd, 'r+') 388 json.dump(data, new_cache) 389 os.rename(new_cache_name, self.filename) 390 except (IOError, OSError, ResumeCacheConflict) as error: 391 try: 392 os.unlink(new_cache_name) 393 except NameError: # mkstemp failed. 394 pass 395 else: 396 self.cache_file.close() 397 self.cache_file = new_cache
398
399 - def close(self):
400 self.cache_file.close()
401
402 - def destroy(self):
403 try: 404 os.unlink(self.filename) 405 except OSError as error: 406 if error.errno != errno.ENOENT: # That's what we wanted anyway. 407 raise 408 self.close()
409
410 - def restart(self):
411 self.destroy() 412 self.__init__(self.filename)
413
414 415 -class ArvPutUploadJob(object):
416 CACHE_DIR = '.cache/arvados/arv-put' 417 EMPTY_STATE = { 418 'manifest' : None, # Last saved manifest checkpoint 419 'files' : {} # Previous run file list: {path : {size, mtime}} 420 } 421
422 - def __init__(self, paths, resume=True, use_cache=True, reporter=None, 423 name=None, owner_uuid=None, api_client=None, 424 ensure_unique_name=False, num_retries=None, 425 put_threads=None, replication_desired=None, filename=None, 426 update_time=60.0, update_collection=None, storage_classes=None, 427 logger=logging.getLogger('arvados.arv_put'), dry_run=False, 428 follow_links=True, exclude_paths=[], exclude_names=None):
429 self.paths = paths 430 self.resume = resume 431 self.use_cache = use_cache 432 self.update = False 433 self.reporter = reporter 434 # This will set to 0 before start counting, if no special files are going 435 # to be read. 436 self.bytes_expected = None 437 self.bytes_written = 0 438 self.bytes_skipped = 0 439 self.name = name 440 self.owner_uuid = owner_uuid 441 self.ensure_unique_name = ensure_unique_name 442 self.num_retries = num_retries 443 self.replication_desired = replication_desired 444 self.put_threads = put_threads 445 self.filename = filename 446 self.storage_classes = storage_classes 447 self._api_client = api_client 448 self._state_lock = threading.Lock() 449 self._state = None # Previous run state (file list & manifest) 450 self._current_files = [] # Current run file list 451 self._cache_file = None 452 self._collection_lock = threading.Lock() 453 self._remote_collection = None # Collection being updated (if asked) 454 self._local_collection = None # Collection from previous run manifest 455 self._file_paths = set() # Files to be updated in remote collection 456 self._stop_checkpointer = threading.Event() 457 self._checkpointer = threading.Thread(target=self._update_task) 458 self._checkpointer.daemon = True 459 self._update_task_time = update_time # How many seconds wait between update runs 460 self._files_to_upload = FileUploadList(dry_run=dry_run) 461 self._upload_started = False 462 self.logger = logger 463 self.dry_run = dry_run 464 self._checkpoint_before_quit = True 465 self.follow_links = follow_links 466 self.exclude_paths = exclude_paths 467 self.exclude_names = exclude_names 468 469 if not self.use_cache and self.resume: 470 raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') 471 472 # Check for obvious dry-run responses 473 if self.dry_run and (not self.use_cache or not self.resume): 474 raise ArvPutUploadIsPending() 475 476 # Load cached data if any and if needed 477 self._setup_state(update_collection) 478 479 # Build the upload file list, excluding requested files and counting the 480 # bytes expected to be uploaded. 481 self._build_upload_list()
482
483 - def _build_upload_list(self):
484 """ 485 Scan the requested paths to count file sizes, excluding files & dirs if requested 486 and building the upload file list. 487 """ 488 # If there aren't special files to be read, reset total bytes count to zero 489 # to start counting. 490 if not any([p for p in self.paths 491 if not (os.path.isfile(p) or os.path.isdir(p))]): 492 self.bytes_expected = 0 493 494 for path in self.paths: 495 # Test for stdin first, in case some file named '-' exist 496 if path == '-': 497 if self.dry_run: 498 raise ArvPutUploadIsPending() 499 self._write_stdin(self.filename or 'stdin') 500 elif not os.path.exists(path): 501 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path)) 502 elif os.path.isdir(path): 503 # Use absolute paths on cache index so CWD doesn't interfere 504 # with the caching logic. 505 orig_path = path 506 path = os.path.abspath(path) 507 if orig_path[-1:] == os.sep: 508 # When passing a directory reference with a trailing slash, 509 # its contents should be uploaded directly to the 510 # collection's root. 511 prefixdir = path 512 else: 513 # When passing a directory reference with no trailing slash, 514 # upload the directory to the collection's root. 515 prefixdir = os.path.dirname(path) 516 prefixdir += os.sep 517 for root, dirs, files in os.walk(path, 518 followlinks=self.follow_links): 519 root_relpath = os.path.relpath(root, path) 520 if root_relpath == '.': 521 root_relpath = '' 522 # Exclude files/dirs by full path matching pattern 523 if self.exclude_paths: 524 dirs[:] = [d for d in dirs 525 if not any(pathname_match( 526 os.path.join(root_relpath, d), pat) 527 for pat in self.exclude_paths)] 528 files = [f for f in files 529 if not any(pathname_match( 530 os.path.join(root_relpath, f), pat) 531 for pat in self.exclude_paths)] 532 # Exclude files/dirs by name matching pattern 533 if self.exclude_names is not None: 534 dirs[:] = [d for d in dirs 535 if not self.exclude_names.match(d)] 536 files = [f for f in files 537 if not self.exclude_names.match(f)] 538 # Make os.walk()'s dir traversing order deterministic 539 dirs.sort() 540 files.sort() 541 for f in files: 542 filepath = os.path.join(root, f) 543 # Add its size to the total bytes count (if applicable) 544 if self.follow_links or (not os.path.islink(filepath)): 545 if self.bytes_expected is not None: 546 self.bytes_expected += os.path.getsize(filepath) 547 self._check_file(filepath, 548 os.path.join(root[len(prefixdir):], f)) 549 else: 550 filepath = os.path.abspath(path) 551 # Add its size to the total bytes count (if applicable) 552 if self.follow_links or (not os.path.islink(filepath)): 553 if self.bytes_expected is not None: 554 self.bytes_expected += os.path.getsize(filepath) 555 self._check_file(filepath, 556 self.filename or os.path.basename(path)) 557 # If dry-mode is on, and got up to this point, then we should notify that 558 # there aren't any file to upload. 559 if self.dry_run: 560 raise ArvPutUploadNotPending() 561 # Remove local_collection's files that don't exist locally anymore, so the 562 # bytes_written count is correct. 563 for f in self.collection_file_paths(self._local_collection, 564 path_prefix=""): 565 if f != 'stdin' and f != self.filename and not f in self._file_paths: 566 self._local_collection.remove(f)
567
568 - def start(self, save_collection):
569 """ 570 Start supporting thread & file uploading 571 """ 572 self._checkpointer.start() 573 try: 574 # Update bytes_written from current local collection and 575 # report initial progress. 576 self._update() 577 # Actual file upload 578 self._upload_started = True # Used by the update thread to start checkpointing 579 self._upload_files() 580 except (SystemExit, Exception) as e: 581 self._checkpoint_before_quit = False 582 # Log stack trace only when Ctrl-C isn't pressed (SIGINT) 583 # Note: We're expecting SystemExit instead of 584 # KeyboardInterrupt because we have a custom signal 585 # handler in place that raises SystemExit with the catched 586 # signal's code. 587 if isinstance(e, PathDoesNotExistError): 588 # We aren't interested in the traceback for this case 589 pass 590 elif not isinstance(e, SystemExit) or e.code != -2: 591 self.logger.warning("Abnormal termination:\n{}".format( 592 traceback.format_exc())) 593 raise 594 finally: 595 if not self.dry_run: 596 # Stop the thread before doing anything else 597 self._stop_checkpointer.set() 598 self._checkpointer.join() 599 if self._checkpoint_before_quit: 600 # Commit all pending blocks & one last _update() 601 self._local_collection.manifest_text() 602 self._update(final=True) 603 if save_collection: 604 self.save_collection() 605 if self.use_cache: 606 self._cache_file.close()
607
608 - def save_collection(self):
609 if self.update: 610 # Check if files should be updated on the remote collection. 611 for fp in self._file_paths: 612 remote_file = self._remote_collection.find(fp) 613 if not remote_file: 614 # File don't exist on remote collection, copy it. 615 self._remote_collection.copy(fp, fp, self._local_collection) 616 elif remote_file != self._local_collection.find(fp): 617 # A different file exist on remote collection, overwrite it. 618 self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) 619 else: 620 # The file already exist on remote collection, skip it. 621 pass 622 self._remote_collection.save(storage_classes=self.storage_classes, 623 num_retries=self.num_retries) 624 else: 625 if self.storage_classes is None: 626 self.storage_classes = ['default'] 627 self._local_collection.save_new( 628 name=self.name, owner_uuid=self.owner_uuid, 629 storage_classes=self.storage_classes, 630 ensure_unique_name=self.ensure_unique_name, 631 num_retries=self.num_retries)
632
633 - def destroy_cache(self):
634 if self.use_cache: 635 try: 636 os.unlink(self._cache_filename) 637 except OSError as error: 638 # That's what we wanted anyway. 639 if error.errno != errno.ENOENT: 640 raise 641 self._cache_file.close()
642
643 - def _collection_size(self, collection):
644 """ 645 Recursively get the total size of the collection 646 """ 647 size = 0 648 for item in listvalues(collection): 649 if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): 650 size += self._collection_size(item) 651 else: 652 size += item.size() 653 return size
654
655 - def _update_task(self):
656 """ 657 Periodically called support task. File uploading is 658 asynchronous so we poll status from the collection. 659 """ 660 while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time): 661 self._update()
662
663 - def _update(self, final=False):
664 """ 665 Update cached manifest text and report progress. 666 """ 667 if self._upload_started: 668 with self._collection_lock: 669 self.bytes_written = self._collection_size(self._local_collection) 670 if self.use_cache: 671 if final: 672 manifest = self._local_collection.manifest_text() 673 else: 674 # Get the manifest text without comitting pending blocks 675 manifest = self._local_collection.manifest_text(strip=False, 676 normalize=False, 677 only_committed=True) 678 # Update cache 679 with self._state_lock: 680 self._state['manifest'] = manifest 681 if self.use_cache: 682 try: 683 self._save_state() 684 except Exception as e: 685 self.logger.error("Unexpected error trying to save cache file: {}".format(e)) 686 else: 687 self.bytes_written = self.bytes_skipped 688 # Call the reporter, if any 689 self.report_progress()
690
691 - def report_progress(self):
692 if self.reporter is not None: 693 self.reporter(self.bytes_written, self.bytes_expected)
694
695 - def _write_stdin(self, filename):
696 output = self._local_collection.open(filename, 'wb') 697 self._write(sys.stdin, output) 698 output.close()
699
700 - def _check_file(self, source, filename):
701 """ 702 Check if this file needs to be uploaded 703 """ 704 # Ignore symlinks when requested 705 if (not self.follow_links) and os.path.islink(source): 706 return 707 resume_offset = 0 708 should_upload = False 709 new_file_in_cache = False 710 # Record file path for updating the remote collection before exiting 711 self._file_paths.add(filename) 712 713 with self._state_lock: 714 # If no previous cached data on this file, store it for an eventual 715 # repeated run. 716 if source not in self._state['files']: 717 self._state['files'][source] = { 718 'mtime': os.path.getmtime(source), 719 'size' : os.path.getsize(source) 720 } 721 new_file_in_cache = True 722 cached_file_data = self._state['files'][source] 723 724 # Check if file was already uploaded (at least partially) 725 file_in_local_collection = self._local_collection.find(filename) 726 727 # If not resuming, upload the full file. 728 if not self.resume: 729 should_upload = True 730 # New file detected from last run, upload it. 731 elif new_file_in_cache: 732 should_upload = True 733 # Local file didn't change from last run. 734 elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): 735 if not file_in_local_collection: 736 # File not uploaded yet, upload it completely 737 should_upload = True 738 elif file_in_local_collection.permission_expired(): 739 # Permission token expired, re-upload file. This will change whenever 740 # we have a API for refreshing tokens. 741 self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename)) 742 should_upload = True 743 self._local_collection.remove(filename) 744 elif cached_file_data['size'] == file_in_local_collection.size(): 745 # File already there, skip it. 746 self.bytes_skipped += cached_file_data['size'] 747 elif cached_file_data['size'] > file_in_local_collection.size(): 748 # File partially uploaded, resume! 749 resume_offset = file_in_local_collection.size() 750 self.bytes_skipped += resume_offset 751 should_upload = True 752 else: 753 # Inconsistent cache, re-upload the file 754 should_upload = True 755 self._local_collection.remove(filename) 756 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) 757 # Local file differs from cached data, re-upload it. 758 else: 759 if file_in_local_collection: 760 self._local_collection.remove(filename) 761 should_upload = True 762 763 if should_upload: 764 try: 765 self._files_to_upload.append((source, resume_offset, filename)) 766 except ArvPutUploadIsPending: 767 # This could happen when running on dry-mode, close cache file to 768 # avoid locking issues. 769 self._cache_file.close() 770 raise
771
772 - def _upload_files(self):
773 for source, resume_offset, filename in self._files_to_upload: 774 with open(source, 'rb') as source_fd: 775 with self._state_lock: 776 self._state['files'][source]['mtime'] = os.path.getmtime(source) 777 self._state['files'][source]['size'] = os.path.getsize(source) 778 if resume_offset > 0: 779 # Start upload where we left off 780 output = self._local_collection.open(filename, 'ab') 781 source_fd.seek(resume_offset) 782 else: 783 # Start from scratch 784 output = self._local_collection.open(filename, 'wb') 785 self._write(source_fd, output) 786 output.close(flush=False)
787
788 - def _write(self, source_fd, output):
789 while True: 790 data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) 791 if not data: 792 break 793 output.write(data)
794
795 - def _my_collection(self):
796 return self._remote_collection if self.update else self._local_collection
797
798 - def _setup_state(self, update_collection):
799 """ 800 Create a new cache file or load a previously existing one. 801 """ 802 # Load an already existing collection for update 803 if update_collection and re.match(arvados.util.collection_uuid_pattern, 804 update_collection): 805 try: 806 self._remote_collection = arvados.collection.Collection( 807 update_collection, api_client=self._api_client) 808 except arvados.errors.ApiError as error: 809 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) 810 else: 811 self.update = True 812 elif update_collection: 813 # Collection locator provided, but unknown format 814 raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) 815 816 if self.use_cache: 817 # Set up cache file name from input paths. 818 md5 = hashlib.md5() 819 md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) 820 realpaths = sorted(os.path.realpath(path) for path in self.paths) 821 md5.update(b'\0'.join([p.encode() for p in realpaths])) 822 if self.filename: 823 md5.update(self.filename.encode()) 824 cache_filename = md5.hexdigest() 825 cache_filepath = os.path.join( 826 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), 827 cache_filename) 828 if self.resume and os.path.exists(cache_filepath): 829 self.logger.info("Resuming upload from cache file {}".format(cache_filepath)) 830 self._cache_file = open(cache_filepath, 'a+') 831 else: 832 # --no-resume means start with a empty cache file. 833 self.logger.info("Creating new cache file at {}".format(cache_filepath)) 834 self._cache_file = open(cache_filepath, 'w+') 835 self._cache_filename = self._cache_file.name 836 self._lock_file(self._cache_file) 837 self._cache_file.seek(0) 838 839 with self._state_lock: 840 if self.use_cache: 841 try: 842 self._state = json.load(self._cache_file) 843 if not set(['manifest', 'files']).issubset(set(self._state.keys())): 844 # Cache at least partially incomplete, set up new cache 845 self._state = copy.deepcopy(self.EMPTY_STATE) 846 except ValueError: 847 # Cache file empty, set up new cache 848 self._state = copy.deepcopy(self.EMPTY_STATE) 849 else: 850 self.logger.info("No cache usage requested for this run.") 851 # No cache file, set empty state 852 self._state = copy.deepcopy(self.EMPTY_STATE) 853 # Load the previous manifest so we can check if files were modified remotely. 854 self._local_collection = arvados.collection.Collection( 855 self._state['manifest'], 856 replication_desired=self.replication_desired, 857 put_threads=self.put_threads, 858 api_client=self._api_client)
859
860 - def collection_file_paths(self, col, path_prefix='.'):
861 """Return a list of file paths by recursively go through the entire collection `col`""" 862 file_paths = [] 863 for name, item in listitems(col): 864 if isinstance(item, arvados.arvfile.ArvadosFile): 865 file_paths.append(os.path.join(path_prefix, name)) 866 elif isinstance(item, arvados.collection.Subcollection): 867 new_prefix = os.path.join(path_prefix, name) 868 file_paths += self.collection_file_paths(item, path_prefix=new_prefix) 869 return file_paths
870
871 - def _lock_file(self, fileobj):
872 try: 873 fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) 874 except IOError: 875 raise ResumeCacheConflict("{} locked".format(fileobj.name))
876
877 - def _save_state(self):
878 """ 879 Atomically save current state into cache. 880 """ 881 with self._state_lock: 882 # We're not using copy.deepcopy() here because it's a lot slower 883 # than json.dumps(), and we're already needing JSON format to be 884 # saved on disk. 885 state = json.dumps(self._state) 886 try: 887 new_cache = tempfile.NamedTemporaryFile( 888 mode='w+', 889 dir=os.path.dirname(self._cache_filename), delete=False) 890 self._lock_file(new_cache) 891 new_cache.write(state) 892 new_cache.flush() 893 os.fsync(new_cache) 894 os.rename(new_cache.name, self._cache_filename) 895 except (IOError, OSError, ResumeCacheConflict) as error: 896 self.logger.error("There was a problem while saving the cache file: {}".format(error)) 897 try: 898 os.unlink(new_cache_name) 899 except NameError: # mkstemp failed. 900 pass 901 else: 902 self._cache_file.close() 903 self._cache_file = new_cache
904
905 - def collection_name(self):
906 return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
907
908 - def manifest_locator(self):
909 return self._my_collection().manifest_locator()
910
911 - def portable_data_hash(self):
912 pdh = self._my_collection().portable_data_hash() 913 m = self._my_collection().stripped_manifest().encode() 914 local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) 915 if pdh != local_pdh: 916 self.logger.warning("\n".join([ 917 "arv-put: API server provided PDH differs from local manifest.", 918 " This should not happen; showing API server version."])) 919 return pdh
920
921 - def manifest_text(self, stream_name=".", strip=False, normalize=False):
922 return self._my_collection().manifest_text(stream_name, strip, normalize)
923
924 - def _datablocks_on_item(self, item):
925 """ 926 Return a list of datablock locators, recursively navigating 927 through subcollections 928 """ 929 if isinstance(item, arvados.arvfile.ArvadosFile): 930 if item.size() == 0: 931 # Empty file locator 932 return ["d41d8cd98f00b204e9800998ecf8427e+0"] 933 else: 934 locators = [] 935 for segment in item.segments(): 936 loc = segment.locator 937 locators.append(loc) 938 return locators 939 elif isinstance(item, arvados.collection.Collection): 940 l = [self._datablocks_on_item(x) for x in listvalues(item)] 941 # Fast list flattener method taken from: 942 # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python 943 return [loc for sublist in l for loc in sublist] 944 else: 945 return None
946
947 - def data_locators(self):
948 with self._collection_lock: 949 # Make sure all datablocks are flushed before getting the locators 950 self._my_collection().manifest_text() 951 datablocks = self._datablocks_on_item(self._my_collection()) 952 return datablocks
953 954 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0], 955 os.getpid())
956 957 # Simulate glob.glob() matching behavior without the need to scan the filesystem 958 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the 959 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py', 960 # so instead we're using it on every path component. 961 -def pathname_match(pathname, pattern):
962 name = pathname.split(os.sep) 963 # Fix patterns like 'some/subdir/' or 'some//subdir' 964 pat = [x for x in pattern.split(os.sep) if x != '' and x != '.'] 965 if len(name) != len(pat): 966 return False 967 for i in range(len(name)): 968 if not fnmatch.fnmatch(name[i], pat[i]): 969 return False 970 return True
971
972 -def machine_progress(bytes_written, bytes_expected):
973 return _machine_format.format( 974 bytes_written, -1 if (bytes_expected is None) else bytes_expected)
975
976 -def human_progress(bytes_written, bytes_expected):
977 if bytes_expected: 978 return "\r{}M / {}M {:.1%} ".format( 979 bytes_written >> 20, bytes_expected >> 20, 980 float(bytes_written) / bytes_expected) 981 else: 982 return "\r{} ".format(bytes_written)
983
984 -def progress_writer(progress_func, outfile=sys.stderr):
985 def write_progress(bytes_written, bytes_expected): 986 outfile.write(progress_func(bytes_written, bytes_expected))
987 return write_progress 988
989 -def desired_project_uuid(api_client, project_uuid, num_retries):
990 if not project_uuid: 991 query = api_client.users().current() 992 elif arvados.util.user_uuid_pattern.match(project_uuid): 993 query = api_client.users().get(uuid=project_uuid) 994 elif arvados.util.group_uuid_pattern.match(project_uuid): 995 query = api_client.groups().get(uuid=project_uuid) 996 else: 997 raise ValueError("Not a valid project UUID: {}".format(project_uuid)) 998 return query.execute(num_retries=num_retries)['uuid']
999
1000 -def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, 1001 install_sig_handlers=True):
1002 global api_client 1003 1004 args = parse_arguments(arguments) 1005 logger = logging.getLogger('arvados.arv_put') 1006 if args.silent: 1007 logger.setLevel(logging.WARNING) 1008 else: 1009 logger.setLevel(logging.INFO) 1010 status = 0 1011 1012 request_id = arvados.util.new_request_id() 1013 1014 formatter = ArvPutLogFormatter(request_id) 1015 logging.getLogger('arvados').handlers[0].setFormatter(formatter) 1016 1017 if api_client is None: 1018 api_client = arvados.api('v1', request_id=request_id) 1019 1020 if install_sig_handlers: 1021 arv_cmd.install_signal_handlers() 1022 1023 # Determine the name to use 1024 if args.name: 1025 if args.stream or args.raw: 1026 logger.error("Cannot use --name with --stream or --raw") 1027 sys.exit(1) 1028 elif args.update_collection: 1029 logger.error("Cannot use --name with --update-collection") 1030 sys.exit(1) 1031 collection_name = args.name 1032 else: 1033 collection_name = "Saved at {} by {}@{}".format( 1034 datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"), 1035 pwd.getpwuid(os.getuid()).pw_name, 1036 socket.gethostname()) 1037 1038 if args.project_uuid and (args.stream or args.raw): 1039 logger.error("Cannot use --project-uuid with --stream or --raw") 1040 sys.exit(1) 1041 1042 # Determine the parent project 1043 try: 1044 project_uuid = desired_project_uuid(api_client, args.project_uuid, 1045 args.retries) 1046 except (apiclient_errors.Error, ValueError) as error: 1047 logger.error(error) 1048 sys.exit(1) 1049 1050 if args.progress: 1051 reporter = progress_writer(human_progress) 1052 elif args.batch_progress: 1053 reporter = progress_writer(machine_progress) 1054 else: 1055 reporter = None 1056 1057 # Split storage-classes argument 1058 storage_classes = None 1059 if args.storage_classes: 1060 storage_classes = args.storage_classes.strip().split(',') 1061 if len(storage_classes) > 1: 1062 logger.error("Multiple storage classes are not supported currently.") 1063 sys.exit(1) 1064 1065 1066 # Setup exclude regex from all the --exclude arguments provided 1067 name_patterns = [] 1068 exclude_paths = [] 1069 exclude_names = None 1070 if len(args.exclude) > 0: 1071 # We're supporting 2 kinds of exclusion patterns: 1072 # 1) --exclude '*.jpg' (file/dir name patterns, will only match 1073 # the name, wherever the file is on the tree) 1074 # 2.1) --exclude 'foo/bar' (file/dir path patterns, will match the 1075 # entire path, and should be relative to 1076 # any input dir argument) 1077 # 2.2) --exclude './*.jpg' (Special case for excluding files/dirs 1078 # placed directly underneath the input dir) 1079 for p in args.exclude: 1080 # Only relative paths patterns allowed 1081 if p.startswith(os.sep): 1082 logger.error("Cannot use absolute paths with --exclude") 1083 sys.exit(1) 1084 if os.path.dirname(p): 1085 # We don't support of path patterns with '..' 1086 p_parts = p.split(os.sep) 1087 if '..' in p_parts: 1088 logger.error( 1089 "Cannot use path patterns that include or '..'") 1090 sys.exit(1) 1091 # Path search pattern 1092 exclude_paths.append(p) 1093 else: 1094 # Name-only search pattern 1095 name_patterns.append(p) 1096 # For name only matching, we can combine all patterns into a single 1097 # regexp, for better performance. 1098 exclude_names = re.compile('|'.join( 1099 [fnmatch.translate(p) for p in name_patterns] 1100 )) if len(name_patterns) > 0 else None 1101 # Show the user the patterns to be used, just in case they weren't 1102 # specified inside quotes and got changed by the shell expansion. 1103 logger.info("Exclude patterns: {}".format(args.exclude)) 1104 1105 # If this is used by a human, and there's at least one directory to be 1106 # uploaded, the expected bytes calculation can take a moment. 1107 if args.progress and any([os.path.isdir(f) for f in args.paths]): 1108 logger.info("Calculating upload size, this could take some time...") 1109 try: 1110 writer = ArvPutUploadJob(paths = args.paths, 1111 resume = args.resume, 1112 use_cache = args.use_cache, 1113 filename = args.filename, 1114 reporter = reporter, 1115 api_client = api_client, 1116 num_retries = args.retries, 1117 replication_desired = args.replication, 1118 put_threads = args.threads, 1119 name = collection_name, 1120 owner_uuid = project_uuid, 1121 ensure_unique_name = True, 1122 update_collection = args.update_collection, 1123 storage_classes=storage_classes, 1124 logger=logger, 1125 dry_run=args.dry_run, 1126 follow_links=args.follow_links, 1127 exclude_paths=exclude_paths, 1128 exclude_names=exclude_names) 1129 except ResumeCacheConflict: 1130 logger.error("\n".join([ 1131 "arv-put: Another process is already uploading this data.", 1132 " Use --no-cache if this is really what you want."])) 1133 sys.exit(1) 1134 except CollectionUpdateError as error: 1135 logger.error("\n".join([ 1136 "arv-put: %s" % str(error)])) 1137 sys.exit(1) 1138 except ArvPutUploadIsPending: 1139 # Dry run check successful, return proper exit code. 1140 sys.exit(2) 1141 except ArvPutUploadNotPending: 1142 # No files pending for upload 1143 sys.exit(0) 1144 except PathDoesNotExistError as error: 1145 logger.error("\n".join([ 1146 "arv-put: %s" % str(error)])) 1147 sys.exit(1) 1148 1149 if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0: 1150 logger.warning("\n".join([ 1151 "arv-put: Resuming previous upload from last checkpoint.", 1152 " Use the --no-resume option to start over."])) 1153 1154 if not args.dry_run: 1155 writer.report_progress() 1156 output = None 1157 try: 1158 writer.start(save_collection=not(args.stream or args.raw)) 1159 except arvados.errors.ApiError as error: 1160 logger.error("\n".join([ 1161 "arv-put: %s" % str(error)])) 1162 sys.exit(1) 1163 1164 if args.progress: # Print newline to split stderr from stdout for humans. 1165 logger.info("\n") 1166 1167 if args.stream: 1168 if args.normalize: 1169 output = writer.manifest_text(normalize=True) 1170 else: 1171 output = writer.manifest_text() 1172 elif args.raw: 1173 output = ','.join(writer.data_locators()) 1174 else: 1175 try: 1176 if args.update_collection: 1177 logger.info("Collection updated: '{}'".format(writer.collection_name())) 1178 else: 1179 logger.info("Collection saved as '{}'".format(writer.collection_name())) 1180 if args.portable_data_hash: 1181 output = writer.portable_data_hash() 1182 else: 1183 output = writer.manifest_locator() 1184 except apiclient_errors.Error as error: 1185 logger.error( 1186 "arv-put: Error creating Collection on project: {}.".format( 1187 error)) 1188 status = 1 1189 1190 # Print the locator (uuid) of the new collection. 1191 if output is None: 1192 status = status or 1 1193 elif not args.silent: 1194 stdout.write(output) 1195 if not output.endswith('\n'): 1196 stdout.write('\n') 1197 1198 if install_sig_handlers: 1199 arv_cmd.restore_signal_handlers() 1200 1201 if status != 0: 1202 sys.exit(status) 1203 1204 # Success! 1205 return output
1206 1207 1208 if __name__ == '__main__': 1209 main() 1210