Module arvados.commands.put
Functions
def desired_project_uuid(api_client, project_uuid, num_retries)
def human_progress(bytes_written, bytes_expected)
def machine_progress(bytes_written, bytes_expected)
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, install_sig_handlers=True)
def parse_arguments(arguments)
def pathname_match(pathname, pattern)
def progress_writer(progress_func, outfile=sys.stderr)
Classes
class ArvPutArgumentConflict (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ArvPutArgumentConflict(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ArvPutLogFormatter (request_id)
-
Formatter instances are used to convert a LogRecord to text.
Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.
The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:
%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:
str.format
({}
) formatting or :class:string.Template
formatting in your format string.Changed in version: 3.2
Added the
style
parameter.Expand source code
class ArvPutLogFormatter(logging.Formatter): std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format) err_fmtr = None request_id_informed = False def __init__(self, request_id): self.err_fmtr = logging.Formatter( arvados.log_format+' (X-Request-Id: {})'.format(request_id), arvados.log_date_format) def format(self, record): if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)): self.request_id_informed = True return self.err_fmtr.format(record) return self.std_fmtr.format(record)
Ancestors
- logging.Formatter
Class variables
var err_fmtr
var request_id_informed
var std_fmtr
Methods
def format(self, record)
-
Format the specified record as text.
The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
class ArvPutUploadIsPending (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ArvPutUploadIsPending(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ArvPutUploadJob (paths, resume=True, use_cache=True, reporter=None, name=None, owner_uuid=None, api_client=None, batch_mode=False, ensure_unique_name=False, num_retries=None, put_threads=None, replication_desired=None, filename=None, update_time=60.0, update_collection=None, storage_classes=None, logger=<Logger arvados.arv_put (WARNING)>, dry_run=False, follow_links=True, exclude_paths=[], exclude_names=None, trash_at=None)
-
Expand source code
class ArvPutUploadJob(object): CACHE_DIR = '.cache/arvados/arv-put' EMPTY_STATE = { 'manifest' : None, # Last saved manifest checkpoint 'files' : {} # Previous run file list: {path : {size, mtime}} } def __init__(self, paths, resume=True, use_cache=True, reporter=None, name=None, owner_uuid=None, api_client=None, batch_mode=False, ensure_unique_name=False, num_retries=None, put_threads=None, replication_desired=None, filename=None, update_time=60.0, update_collection=None, storage_classes=None, logger=logging.getLogger('arvados.arv_put'), dry_run=False, follow_links=True, exclude_paths=[], exclude_names=None, trash_at=None): self.paths = paths self.resume = resume self.use_cache = use_cache self.batch_mode = batch_mode self.update = False self.reporter = reporter # This will set to 0 before start counting, if no special files are going # to be read. self.bytes_expected = None self.bytes_written = 0 self.bytes_skipped = 0 self.name = name self.owner_uuid = owner_uuid self.ensure_unique_name = ensure_unique_name self.num_retries = num_retries self.replication_desired = replication_desired self.put_threads = put_threads self.filename = filename self.storage_classes = storage_classes self._api_client = api_client self._state_lock = threading.Lock() self._state = None # Previous run state (file list & manifest) self._current_files = [] # Current run file list self._cache_file = None self._collection_lock = threading.Lock() self._remote_collection = None # Collection being updated (if asked) self._local_collection = None # Collection from previous run manifest self._file_paths = set() # Files to be updated in remote collection self._stop_checkpointer = threading.Event() self._checkpointer = threading.Thread(target=self._update_task) self._checkpointer.daemon = True self._update_task_time = update_time # How many seconds wait between update runs self._files_to_upload = FileUploadList(dry_run=dry_run) self._upload_started = False self.logger = logger self.dry_run = dry_run self._checkpoint_before_quit = True self.follow_links = follow_links self.exclude_paths = exclude_paths self.exclude_names = exclude_names self._trash_at = trash_at if self._trash_at is not None: if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]: raise TypeError('trash_at should be None, timezone-naive datetime or timedelta') if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None: raise TypeError('provided trash_at datetime should be timezone-naive') if not self.use_cache and self.resume: raise ArvPutArgumentConflict('resume cannot be True when use_cache is False') # Check for obvious dry-run responses if self.dry_run and (not self.use_cache or not self.resume): raise ArvPutUploadIsPending() # Load cached data if any and if needed self._setup_state(update_collection) # Build the upload file list, excluding requested files and counting the # bytes expected to be uploaded. self._build_upload_list() def _build_upload_list(self): """ Scan the requested paths to count file sizes, excluding requested files and dirs and building the upload file list. """ # If there aren't special files to be read, reset total bytes count to zero # to start counting. if not any([p for p in self.paths if not (os.path.isfile(p) or os.path.isdir(p))]): self.bytes_expected = 0 for path in self.paths: # Test for stdin first, in case some file named '-' exist if path == '-': if self.dry_run: raise ArvPutUploadIsPending() self._write_stdin(self.filename or 'stdin') elif not os.path.exists(path): raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path)) elif (not self.follow_links) and os.path.islink(path): self.logger.warning("Skipping symlink '{}'".format(path)) continue elif os.path.isdir(path): # Use absolute paths on cache index so CWD doesn't interfere # with the caching logic. orig_path = path path = os.path.abspath(path) if orig_path[-1:] == os.sep: # When passing a directory reference with a trailing slash, # its contents should be uploaded directly to the # collection's root. prefixdir = path else: # When passing a directory reference with no trailing slash, # upload the directory to the collection's root. prefixdir = os.path.dirname(path) prefixdir += os.sep for root, dirs, files in os.walk(path, followlinks=self.follow_links): root_relpath = os.path.relpath(root, path) if root_relpath == '.': root_relpath = '' # Exclude files/dirs by full path matching pattern if self.exclude_paths: dirs[:] = [d for d in dirs if not any(pathname_match( os.path.join(root_relpath, d), pat) for pat in self.exclude_paths)] files = [f for f in files if not any(pathname_match( os.path.join(root_relpath, f), pat) for pat in self.exclude_paths)] # Exclude files/dirs by name matching pattern if self.exclude_names is not None: dirs[:] = [d for d in dirs if not self.exclude_names.match(d)] files = [f for f in files if not self.exclude_names.match(f)] # Make os.walk()'s dir traversing order deterministic dirs.sort() files.sort() for f in files: filepath = os.path.join(root, f) if not os.path.isfile(filepath): self.logger.warning("Skipping non-regular file '{}'".format(filepath)) continue # Add its size to the total bytes count (if applicable) if self.follow_links or (not os.path.islink(filepath)): if self.bytes_expected is not None: self.bytes_expected += os.path.getsize(filepath) self._check_file(filepath, os.path.join(root[len(prefixdir):], f)) else: filepath = os.path.abspath(path) # Add its size to the total bytes count (if applicable) if self.follow_links or (not os.path.islink(filepath)): if self.bytes_expected is not None: self.bytes_expected += os.path.getsize(filepath) self._check_file(filepath, self.filename or os.path.basename(path)) # If dry-mode is on, and got up to this point, then we should notify that # there aren't any file to upload. if self.dry_run: raise ArvPutUploadNotPending() # Remove local_collection's files that don't exist locally anymore, so the # bytes_written count is correct. for f in self.collection_file_paths(self._local_collection, path_prefix=""): if f != 'stdin' and f != self.filename and not f in self._file_paths: self._local_collection.remove(f) def start(self, save_collection): """ Start supporting thread & file uploading """ self._checkpointer.start() try: # Update bytes_written from current local collection and # report initial progress. self._update() # Actual file upload self._upload_started = True # Used by the update thread to start checkpointing self._upload_files() except (SystemExit, Exception) as e: self._checkpoint_before_quit = False # Log stack trace only when Ctrl-C isn't pressed (SIGINT) # Note: We're expecting SystemExit instead of # KeyboardInterrupt because we have a custom signal # handler in place that raises SystemExit with the catched # signal's code. if isinstance(e, PathDoesNotExistError): # We aren't interested in the traceback for this case pass elif not isinstance(e, SystemExit) or e.code != -2: self.logger.warning("Abnormal termination:\n{}".format( traceback.format_exc())) raise finally: if not self.dry_run: # Stop the thread before doing anything else self._stop_checkpointer.set() self._checkpointer.join() if self._checkpoint_before_quit: # Commit all pending blocks & one last _update() self._local_collection.manifest_text() self._update(final=True) if save_collection: self.save_collection() if self.use_cache: self._cache_file.close() def _collection_trash_at(self): """ Returns the trash date that the collection should use at save time. Takes into account absolute/relative trash_at values requested by the user. """ if type(self._trash_at) == datetime.timedelta: # Get an absolute datetime for trash_at return datetime.datetime.utcnow() + self._trash_at return self._trash_at def save_collection(self): if self.update: # Check if files should be updated on the remote collection. for fp in self._file_paths: remote_file = self._remote_collection.find(fp) if not remote_file: # File don't exist on remote collection, copy it. self._remote_collection.copy(fp, fp, self._local_collection) elif remote_file != self._local_collection.find(fp): # A different file exist on remote collection, overwrite it. self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True) else: # The file already exist on remote collection, skip it. pass self._remote_collection.save(num_retries=self.num_retries, trash_at=self._collection_trash_at()) else: if len(self._local_collection) == 0: self.logger.warning("No files were uploaded, skipping collection creation.") return self._local_collection.save_new( name=self.name, owner_uuid=self.owner_uuid, ensure_unique_name=self.ensure_unique_name, num_retries=self.num_retries, trash_at=self._collection_trash_at()) def destroy_cache(self): if self.use_cache: try: os.unlink(self._cache_filename) except OSError as error: # That's what we wanted anyway. if error.errno != errno.ENOENT: raise self._cache_file.close() def _collection_size(self, collection): """ Recursively get the total size of the collection """ size = 0 for item in listvalues(collection): if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection): size += self._collection_size(item) else: size += item.size() return size def _update_task(self): """ Periodically called support task. File uploading is asynchronous so we poll status from the collection. """ while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time): self._update() def _update(self, final=False): """ Update cached manifest text and report progress. """ if self._upload_started: with self._collection_lock: self.bytes_written = self._collection_size(self._local_collection) if self.use_cache: if final: manifest = self._local_collection.manifest_text() else: # Get the manifest text without comitting pending blocks manifest = self._local_collection.manifest_text(strip=False, normalize=False, only_committed=True) # Update cache with self._state_lock: self._state['manifest'] = manifest if self.use_cache: try: self._save_state() except Exception as e: self.logger.error("Unexpected error trying to save cache file: {}".format(e)) # Keep remote collection's trash_at attribute synced when using relative expire dates if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta: try: self._api_client.collections().update( uuid=self._remote_collection.manifest_locator(), body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")} ).execute(num_retries=self.num_retries) except Exception as e: self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e)) else: self.bytes_written = self.bytes_skipped # Call the reporter, if any self.report_progress() def report_progress(self): if self.reporter is not None: self.reporter(self.bytes_written, self.bytes_expected) def _write_stdin(self, filename): output = self._local_collection.open(filename, 'wb') self._write(sys.stdin.buffer, output) output.close() def _check_file(self, source, filename): """ Check if this file needs to be uploaded """ # Ignore symlinks when requested if (not self.follow_links) and os.path.islink(source): return resume_offset = 0 should_upload = False new_file_in_cache = False # Record file path for updating the remote collection before exiting self._file_paths.add(filename) with self._state_lock: # If no previous cached data on this file, store it for an eventual # repeated run. if source not in self._state['files']: self._state['files'][source] = { 'mtime': os.path.getmtime(source), 'size' : os.path.getsize(source) } new_file_in_cache = True cached_file_data = self._state['files'][source] # Check if file was already uploaded (at least partially) file_in_local_collection = self._local_collection.find(filename) # If not resuming, upload the full file. if not self.resume: should_upload = True # New file detected from last run, upload it. elif new_file_in_cache: should_upload = True # Local file didn't change from last run. elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source): if not file_in_local_collection: # File not uploaded yet, upload it completely should_upload = True elif file_in_local_collection.permission_expired(): # Permission token expired, re-upload file. This will change whenever # we have a API for refreshing tokens. self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename)) should_upload = True self._local_collection.remove(filename) elif cached_file_data['size'] == file_in_local_collection.size(): # File already there, skip it. self.bytes_skipped += cached_file_data['size'] elif cached_file_data['size'] > file_in_local_collection.size(): # File partially uploaded, resume! resume_offset = file_in_local_collection.size() self.bytes_skipped += resume_offset should_upload = True else: # Inconsistent cache, re-upload the file should_upload = True self._local_collection.remove(filename) self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) # Local file differs from cached data, re-upload it. else: if file_in_local_collection: self._local_collection.remove(filename) should_upload = True if should_upload: try: self._files_to_upload.append((source, resume_offset, filename)) except ArvPutUploadIsPending: # This could happen when running on dry-mode, close cache file to # avoid locking issues. self._cache_file.close() raise def _upload_files(self): for source, resume_offset, filename in self._files_to_upload: with open(source, 'rb') as source_fd: with self._state_lock: self._state['files'][source]['mtime'] = os.path.getmtime(source) self._state['files'][source]['size'] = os.path.getsize(source) if resume_offset > 0: # Start upload where we left off output = self._local_collection.open(filename, 'ab') source_fd.seek(resume_offset) else: # Start from scratch output = self._local_collection.open(filename, 'wb') self._write(source_fd, output) output.close(flush=False) def _write(self, source_fd, output): while True: data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE) if not data: break output.write(data) def _my_collection(self): return self._remote_collection if self.update else self._local_collection def _get_cache_filepath(self): # Set up cache file name from input paths. md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) realpaths = sorted(os.path.realpath(path) for path in self.paths) md5.update(b'\0'.join([p.encode() for p in realpaths])) if self.filename: md5.update(self.filename.encode()) cache_filename = md5.hexdigest() cache_filepath = os.path.join( arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), cache_filename) return cache_filepath def _setup_state(self, update_collection): """ Create a new cache file or load a previously existing one. """ # Load an already existing collection for update if update_collection and re.match(arvados.util.collection_uuid_pattern, update_collection): try: self._remote_collection = arvados.collection.Collection( update_collection, api_client=self._api_client, storage_classes_desired=self.storage_classes, num_retries=self.num_retries) except arvados.errors.ApiError as error: raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error)) else: self.update = True elif update_collection: # Collection locator provided, but unknown format raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) if self.use_cache: cache_filepath = self._get_cache_filepath() if self.resume and os.path.exists(cache_filepath): self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath)) self._cache_file = open(cache_filepath, 'a+') else: # --no-resume means start with a empty cache file. self.logger.info(u"Creating new cache file at {}".format(cache_filepath)) self._cache_file = open(cache_filepath, 'w+') self._cache_filename = self._cache_file.name self._lock_file(self._cache_file) self._cache_file.seek(0) with self._state_lock: if self.use_cache: try: self._state = json.load(self._cache_file) if not set(['manifest', 'files']).issubset(set(self._state.keys())): # Cache at least partially incomplete, set up new cache self._state = copy.deepcopy(self.EMPTY_STATE) except ValueError: # Cache file empty, set up new cache self._state = copy.deepcopy(self.EMPTY_STATE) else: self.logger.info("No cache usage requested for this run.") # No cache file, set empty state self._state = copy.deepcopy(self.EMPTY_STATE) if not self._cached_manifest_valid(): if not self.batch_mode: raise ResumeCacheInvalidError() else: self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name)) self.use_cache = False # Don't overwrite preexisting cache file. self._state = copy.deepcopy(self.EMPTY_STATE) # Load the previous manifest so we can check if files were modified remotely. self._local_collection = arvados.collection.Collection( self._state['manifest'], replication_desired=self.replication_desired, storage_classes_desired=self.storage_classes, put_threads=self.put_threads, api_client=self._api_client, num_retries=self.num_retries) def _cached_manifest_valid(self): """ Validate the oldest non-expired block signature to check if cached manifest is usable: checking if the cached manifest was not created with a different arvados account. """ if self._state.get('manifest', None) is None: # No cached manifest yet, all good. return True now = datetime.datetime.utcnow() oldest_exp = None oldest_loc = None block_found = False for m in keep_locator_pattern.finditer(self._state['manifest']): loc = m.group(0) try: exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16)) except IndexError: # Locator without signature continue block_found = True if exp > now and (oldest_exp is None or exp < oldest_exp): oldest_exp = exp oldest_loc = loc if not block_found: # No block signatures found => no invalid block signatures. return True if oldest_loc is None: # Locator signatures found, but all have expired. # Reset the cache and move on. self.logger.info('Cache expired, starting from scratch.') self._state['manifest'] = '' return True kc = arvados.KeepClient(api_client=self._api_client, num_retries=self.num_retries) try: kc.head(oldest_loc) except arvados.errors.KeepRequestError: # Something is wrong, cached manifest is not valid. return False return True def collection_file_paths(self, col, path_prefix='.'): """Return a list of file paths by recursively go through the entire collection `col`""" file_paths = [] for name, item in listitems(col): if isinstance(item, arvados.arvfile.ArvadosFile): file_paths.append(os.path.join(path_prefix, name)) elif isinstance(item, arvados.collection.Subcollection): new_prefix = os.path.join(path_prefix, name) file_paths += self.collection_file_paths(item, path_prefix=new_prefix) return file_paths def _lock_file(self, fileobj): try: fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) def _save_state(self): """ Atomically save current state into cache. """ with self._state_lock: # We're not using copy.deepcopy() here because it's a lot slower # than json.dumps(), and we're already needing JSON format to be # saved on disk. state = json.dumps(self._state) try: new_cache = tempfile.NamedTemporaryFile( mode='w+', dir=os.path.dirname(self._cache_filename), delete=False) self._lock_file(new_cache) new_cache.write(state) new_cache.flush() os.fsync(new_cache) os.rename(new_cache.name, self._cache_filename) except (IOError, OSError, ResumeCacheConflict) as error: self.logger.error("There was a problem while saving the cache file: {}".format(error)) try: os.unlink(new_cache_name) except NameError: # mkstemp failed. pass else: self._cache_file.close() self._cache_file = new_cache def collection_name(self): return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None def collection_trash_at(self): return self._my_collection().get_trash_at() def manifest_locator(self): return self._my_collection().manifest_locator() def portable_data_hash(self): pdh = self._my_collection().portable_data_hash() m = self._my_collection().stripped_manifest().encode() local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m)) if pdh != local_pdh: self.logger.warning("\n".join([ "arv-put: API server provided PDH differs from local manifest.", " This should not happen; showing API server version."])) return pdh def manifest_text(self, stream_name=".", strip=False, normalize=False): return self._my_collection().manifest_text(stream_name, strip, normalize) def _datablocks_on_item(self, item): """ Return a list of datablock locators, recursively navigating through subcollections """ if isinstance(item, arvados.arvfile.ArvadosFile): if item.size() == 0: # Empty file locator return ["d41d8cd98f00b204e9800998ecf8427e+0"] else: locators = [] for segment in item.segments(): loc = segment.locator locators.append(loc) return locators elif isinstance(item, arvados.collection.Collection): l = [self._datablocks_on_item(x) for x in listvalues(item)] # Fast list flattener method taken from: # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python return [loc for sublist in l for loc in sublist] else: return None def data_locators(self): with self._collection_lock: # Make sure all datablocks are flushed before getting the locators self._my_collection().manifest_text() datablocks = self._datablocks_on_item(self._my_collection()) return datablocks
Class variables
var CACHE_DIR
var EMPTY_STATE
Methods
def collection_file_paths(self, col, path_prefix='.')
-
Return a list of file paths by recursively go through the entire collection
col
def collection_name(self)
def collection_trash_at(self)
def data_locators(self)
def destroy_cache(self)
def manifest_locator(self)
def manifest_text(self, stream_name='.', strip=False, normalize=False)
def portable_data_hash(self)
def report_progress(self)
def save_collection(self)
def start(self, save_collection)
-
Start supporting thread & file uploading
class ArvPutUploadNotPending (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ArvPutUploadNotPending(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class CollectionUpdateError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class CollectionUpdateError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class FileUploadList (dry_run=False)
-
Built-in mutable sequence.
If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.
Expand source code
class FileUploadList(list): def __init__(self, dry_run=False): list.__init__(self) self.dry_run = dry_run def append(self, other): if self.dry_run: raise ArvPutUploadIsPending() super(FileUploadList, self).append(other)
Ancestors
- builtins.list
Methods
def append(self, other)
-
Append object to the end of the list.
class PathDoesNotExistError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class PathDoesNotExistError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ResumeCache (file_spec)
-
Expand source code
class ResumeCache(object): CACHE_DIR = '.cache/arvados/arv-put' def __init__(self, file_spec): self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name @classmethod def make_path(cls, args): md5 = hashlib.md5() md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) realpaths = sorted(os.path.realpath(path) for path in args.paths) md5.update(b'\0'.join([p.encode() for p in realpaths])) if any(os.path.isdir(path) for path in realpaths): md5.update(b'-1') elif args.filename: md5.update(args.filename.encode()) return os.path.join( arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'), md5.hexdigest()) def _lock_file(self, fileobj): try: fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: raise ResumeCacheConflict(u"{} locked".format(fileobj.name)) def load(self): self.cache_file.seek(0) return json.load(self.cache_file) def check_cache(self, api_client=None, num_retries=0): try: state = self.load() locator = None try: if "_finished_streams" in state and len(state["_finished_streams"]) > 0: locator = state["_finished_streams"][0][1][0] elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: locator = state["_current_stream_locators"][0] if locator is not None: kc = arvados.keep.KeepClient(api_client=api_client) kc.head(locator, num_retries=num_retries) except Exception as e: self.restart() except (ValueError): pass def save(self, data): try: new_cache_fd, new_cache_name = tempfile.mkstemp( dir=os.path.dirname(self.filename)) self._lock_file(new_cache_fd) new_cache = os.fdopen(new_cache_fd, 'r+') json.dump(data, new_cache) os.rename(new_cache_name, self.filename) except (IOError, OSError, ResumeCacheConflict): try: os.unlink(new_cache_name) except NameError: # mkstemp failed. pass else: self.cache_file.close() self.cache_file = new_cache def close(self): self.cache_file.close() def destroy(self): try: os.unlink(self.filename) except OSError as error: if error.errno != errno.ENOENT: # That's what we wanted anyway. raise self.close() def restart(self): self.destroy() self.__init__(self.filename)
Class variables
var CACHE_DIR
Static methods
def make_path(args)
Methods
def check_cache(self, api_client=None, num_retries=0)
def close(self)
def destroy(self)
def load(self)
def restart(self)
def save(self, data)
class ResumeCacheConflict (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ResumeCacheConflict(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ResumeCacheInvalidError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ResumeCacheInvalidError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException