Module arvados.commands.put

Functions

def desired_project_uuid(api_client, project_uuid, num_retries)
def human_progress(bytes_written, bytes_expected)
def machine_progress(bytes_written, bytes_expected)
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, install_sig_handlers=True)
def parse_arguments(arguments)
def pathname_match(pathname, pattern)
def progress_writer(progress_func, outfile=sys.stderr)

Classes

class ArvPutArgumentConflict (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ArvPutArgumentConflict(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ArvPutLogFormatter (request_id)

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Expand source code
class ArvPutLogFormatter(logging.Formatter):
    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
    err_fmtr = None
    request_id_informed = False

    def __init__(self, request_id):
        self.err_fmtr = logging.Formatter(
            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
            arvados.log_date_format)

    def format(self, record):
        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
            self.request_id_informed = True
            return self.err_fmtr.format(record)
        return self.std_fmtr.format(record)

Ancestors

  • logging.Formatter

Class variables

var err_fmtr
var request_id_informed
var std_fmtr

Methods

def format(self, record)

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class ArvPutUploadIsPending (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ArvPutUploadIsPending(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ArvPutUploadJob (paths, resume=True, use_cache=True, reporter=None, name=None, owner_uuid=None, api_client=None, batch_mode=False, ensure_unique_name=False, num_retries=None, put_threads=None, replication_desired=None, filename=None, update_time=60.0, update_collection=None, storage_classes=None, logger=<Logger arvados.arv_put (WARNING)>, dry_run=False, follow_links=True, exclude_paths=[], exclude_names=None, trash_at=None)
Expand source code
class ArvPutUploadJob(object):
    CACHE_DIR = '.cache/arvados/arv-put'
    EMPTY_STATE = {
        'manifest' : None, # Last saved manifest checkpoint
        'files' : {} # Previous run file list: {path : {size, mtime}}
    }

    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                 ensure_unique_name=False, num_retries=None,
                 put_threads=None, replication_desired=None, filename=None,
                 update_time=60.0, update_collection=None, storage_classes=None,
                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
                 follow_links=True, exclude_paths=[], exclude_names=None,
                 trash_at=None):
        self.paths = paths
        self.resume = resume
        self.use_cache = use_cache
        self.batch_mode = batch_mode
        self.update = False
        self.reporter = reporter
        # This will set to 0 before start counting, if no special files are going
        # to be read.
        self.bytes_expected = None
        self.bytes_written = 0
        self.bytes_skipped = 0
        self.name = name
        self.owner_uuid = owner_uuid
        self.ensure_unique_name = ensure_unique_name
        self.num_retries = num_retries
        self.replication_desired = replication_desired
        self.put_threads = put_threads
        self.filename = filename
        self.storage_classes = storage_classes
        self._api_client = api_client
        self._state_lock = threading.Lock()
        self._state = None # Previous run state (file list & manifest)
        self._current_files = [] # Current run file list
        self._cache_file = None
        self._collection_lock = threading.Lock()
        self._remote_collection = None # Collection being updated (if asked)
        self._local_collection = None # Collection from previous run manifest
        self._file_paths = set() # Files to be updated in remote collection
        self._stop_checkpointer = threading.Event()
        self._checkpointer = threading.Thread(target=self._update_task)
        self._checkpointer.daemon = True
        self._update_task_time = update_time  # How many seconds wait between update runs
        self._files_to_upload = FileUploadList(dry_run=dry_run)
        self._upload_started = False
        self.logger = logger
        self.dry_run = dry_run
        self._checkpoint_before_quit = True
        self.follow_links = follow_links
        self.exclude_paths = exclude_paths
        self.exclude_names = exclude_names
        self._trash_at = trash_at

        if self._trash_at is not None:
            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
                raise TypeError('provided trash_at datetime should be timezone-naive')

        if not self.use_cache and self.resume:
            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')

        # Check for obvious dry-run responses
        if self.dry_run and (not self.use_cache or not self.resume):
            raise ArvPutUploadIsPending()

        # Load cached data if any and if needed
        self._setup_state(update_collection)

        # Build the upload file list, excluding requested files and counting the
        # bytes expected to be uploaded.
        self._build_upload_list()

    def _build_upload_list(self):
        """
        Scan the requested paths to count file sizes, excluding requested files
        and dirs and building the upload file list.
        """
        # If there aren't special files to be read, reset total bytes count to zero
        # to start counting.
        if not any([p for p in self.paths
                    if not (os.path.isfile(p) or os.path.isdir(p))]):
            self.bytes_expected = 0

        for path in self.paths:
            # Test for stdin first, in case some file named '-' exist
            if path == '-':
                if self.dry_run:
                    raise ArvPutUploadIsPending()
                self._write_stdin(self.filename or 'stdin')
            elif not os.path.exists(path):
                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
            elif (not self.follow_links) and os.path.islink(path):
                self.logger.warning("Skipping symlink '{}'".format(path))
                continue
            elif os.path.isdir(path):
                # Use absolute paths on cache index so CWD doesn't interfere
                # with the caching logic.
                orig_path = path
                path = os.path.abspath(path)
                if orig_path[-1:] == os.sep:
                    # When passing a directory reference with a trailing slash,
                    # its contents should be uploaded directly to the
                    # collection's root.
                    prefixdir = path
                else:
                    # When passing a directory reference with no trailing slash,
                    # upload the directory to the collection's root.
                    prefixdir = os.path.dirname(path)
                prefixdir += os.sep
                for root, dirs, files in os.walk(path,
                                                 followlinks=self.follow_links):
                    root_relpath = os.path.relpath(root, path)
                    if root_relpath == '.':
                        root_relpath = ''
                    # Exclude files/dirs by full path matching pattern
                    if self.exclude_paths:
                        dirs[:] = [d for d in dirs
                                   if not any(pathname_match(
                                           os.path.join(root_relpath, d), pat)
                                              for pat in self.exclude_paths)]
                        files = [f for f in files
                                 if not any(pathname_match(
                                         os.path.join(root_relpath, f), pat)
                                            for pat in self.exclude_paths)]
                    # Exclude files/dirs by name matching pattern
                    if self.exclude_names is not None:
                        dirs[:] = [d for d in dirs
                                   if not self.exclude_names.match(d)]
                        files = [f for f in files
                                 if not self.exclude_names.match(f)]
                    # Make os.walk()'s dir traversing order deterministic
                    dirs.sort()
                    files.sort()
                    for f in files:
                        filepath = os.path.join(root, f)
                        if not os.path.isfile(filepath):
                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
                            continue
                        # Add its size to the total bytes count (if applicable)
                        if self.follow_links or (not os.path.islink(filepath)):
                            if self.bytes_expected is not None:
                                self.bytes_expected += os.path.getsize(filepath)
                        self._check_file(filepath,
                                         os.path.join(root[len(prefixdir):], f))
            else:
                filepath = os.path.abspath(path)
                # Add its size to the total bytes count (if applicable)
                if self.follow_links or (not os.path.islink(filepath)):
                    if self.bytes_expected is not None:
                        self.bytes_expected += os.path.getsize(filepath)
                self._check_file(filepath,
                                 self.filename or os.path.basename(path))
        # If dry-mode is on, and got up to this point, then we should notify that
        # there aren't any file to upload.
        if self.dry_run:
            raise ArvPutUploadNotPending()
        # Remove local_collection's files that don't exist locally anymore, so the
        # bytes_written count is correct.
        for f in self.collection_file_paths(self._local_collection,
                                            path_prefix=""):
            if f != 'stdin' and f != self.filename and not f in self._file_paths:
                self._local_collection.remove(f)

    def start(self, save_collection):
        """
        Start supporting thread & file uploading
        """
        self._checkpointer.start()
        try:
            # Update bytes_written from current local collection and
            # report initial progress.
            self._update()
            # Actual file upload
            self._upload_started = True # Used by the update thread to start checkpointing
            self._upload_files()
        except (SystemExit, Exception) as e:
            self._checkpoint_before_quit = False
            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
            # Note: We're expecting SystemExit instead of
            # KeyboardInterrupt because we have a custom signal
            # handler in place that raises SystemExit with the catched
            # signal's code.
            if isinstance(e, PathDoesNotExistError):
                # We aren't interested in the traceback for this case
                pass
            elif not isinstance(e, SystemExit) or e.code != -2:
                self.logger.warning("Abnormal termination:\n{}".format(
                    traceback.format_exc()))
            raise
        finally:
            if not self.dry_run:
                # Stop the thread before doing anything else
                self._stop_checkpointer.set()
                self._checkpointer.join()
                if self._checkpoint_before_quit:
                    # Commit all pending blocks & one last _update()
                    self._local_collection.manifest_text()
                    self._update(final=True)
                    if save_collection:
                        self.save_collection()
            if self.use_cache:
                self._cache_file.close()

    def _collection_trash_at(self):
        """
        Returns the trash date that the collection should use at save time.
        Takes into account absolute/relative trash_at values requested
        by the user.
        """
        if type(self._trash_at) == datetime.timedelta:
            # Get an absolute datetime for trash_at
            return datetime.datetime.utcnow() + self._trash_at
        return self._trash_at

    def save_collection(self):
        if self.update:
            # Check if files should be updated on the remote collection.
            for fp in self._file_paths:
                remote_file = self._remote_collection.find(fp)
                if not remote_file:
                    # File don't exist on remote collection, copy it.
                    self._remote_collection.copy(fp, fp, self._local_collection)
                elif remote_file != self._local_collection.find(fp):
                    # A different file exist on remote collection, overwrite it.
                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
                else:
                    # The file already exist on remote collection, skip it.
                    pass
            self._remote_collection.save(num_retries=self.num_retries,
                                         trash_at=self._collection_trash_at())
        else:
            if len(self._local_collection) == 0:
                self.logger.warning("No files were uploaded, skipping collection creation.")
                return
            self._local_collection.save_new(
                name=self.name, owner_uuid=self.owner_uuid,
                ensure_unique_name=self.ensure_unique_name,
                num_retries=self.num_retries,
                trash_at=self._collection_trash_at())

    def destroy_cache(self):
        if self.use_cache:
            try:
                os.unlink(self._cache_filename)
            except OSError as error:
                # That's what we wanted anyway.
                if error.errno != errno.ENOENT:
                    raise
            self._cache_file.close()

    def _collection_size(self, collection):
        """
        Recursively get the total size of the collection
        """
        size = 0
        for item in listvalues(collection):
            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                size += self._collection_size(item)
            else:
                size += item.size()
        return size

    def _update_task(self):
        """
        Periodically called support task. File uploading is
        asynchronous so we poll status from the collection.
        """
        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
            self._update()

    def _update(self, final=False):
        """
        Update cached manifest text and report progress.
        """
        if self._upload_started:
            with self._collection_lock:
                self.bytes_written = self._collection_size(self._local_collection)
                if self.use_cache:
                    if final:
                        manifest = self._local_collection.manifest_text()
                    else:
                        # Get the manifest text without comitting pending blocks
                        manifest = self._local_collection.manifest_text(strip=False,
                                                                        normalize=False,
                                                                        only_committed=True)
                    # Update cache
                    with self._state_lock:
                        self._state['manifest'] = manifest
            if self.use_cache:
                try:
                    self._save_state()
                except Exception as e:
                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
            # Keep remote collection's trash_at attribute synced when using relative expire dates
            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
                try:
                    self._api_client.collections().update(
                        uuid=self._remote_collection.manifest_locator(),
                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
                    ).execute(num_retries=self.num_retries)
                except Exception as e:
                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
        else:
            self.bytes_written = self.bytes_skipped
        # Call the reporter, if any
        self.report_progress()

    def report_progress(self):
        if self.reporter is not None:
            self.reporter(self.bytes_written, self.bytes_expected)

    def _write_stdin(self, filename):
        output = self._local_collection.open(filename, 'wb')
        self._write(sys.stdin.buffer, output)
        output.close()

    def _check_file(self, source, filename):
        """
        Check if this file needs to be uploaded
        """
        # Ignore symlinks when requested
        if (not self.follow_links) and os.path.islink(source):
            return
        resume_offset = 0
        should_upload = False
        new_file_in_cache = False
        # Record file path for updating the remote collection before exiting
        self._file_paths.add(filename)

        with self._state_lock:
            # If no previous cached data on this file, store it for an eventual
            # repeated run.
            if source not in self._state['files']:
                self._state['files'][source] = {
                    'mtime': os.path.getmtime(source),
                    'size' : os.path.getsize(source)
                }
                new_file_in_cache = True
            cached_file_data = self._state['files'][source]

        # Check if file was already uploaded (at least partially)
        file_in_local_collection = self._local_collection.find(filename)

        # If not resuming, upload the full file.
        if not self.resume:
            should_upload = True
        # New file detected from last run, upload it.
        elif new_file_in_cache:
            should_upload = True
        # Local file didn't change from last run.
        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
            if not file_in_local_collection:
                # File not uploaded yet, upload it completely
                should_upload = True
            elif file_in_local_collection.permission_expired():
                # Permission token expired, re-upload file. This will change whenever
                # we have a API for refreshing tokens.
                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                should_upload = True
                self._local_collection.remove(filename)
            elif cached_file_data['size'] == file_in_local_collection.size():
                # File already there, skip it.
                self.bytes_skipped += cached_file_data['size']
            elif cached_file_data['size'] > file_in_local_collection.size():
                # File partially uploaded, resume!
                resume_offset = file_in_local_collection.size()
                self.bytes_skipped += resume_offset
                should_upload = True
            else:
                # Inconsistent cache, re-upload the file
                should_upload = True
                self._local_collection.remove(filename)
                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
        # Local file differs from cached data, re-upload it.
        else:
            if file_in_local_collection:
                self._local_collection.remove(filename)
            should_upload = True

        if should_upload:
            try:
                self._files_to_upload.append((source, resume_offset, filename))
            except ArvPutUploadIsPending:
                # This could happen when running on dry-mode, close cache file to
                # avoid locking issues.
                self._cache_file.close()
                raise

    def _upload_files(self):
        for source, resume_offset, filename in self._files_to_upload:
            with open(source, 'rb') as source_fd:
                with self._state_lock:
                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
                    self._state['files'][source]['size'] = os.path.getsize(source)
                if resume_offset > 0:
                    # Start upload where we left off
                    output = self._local_collection.open(filename, 'ab')
                    source_fd.seek(resume_offset)
                else:
                    # Start from scratch
                    output = self._local_collection.open(filename, 'wb')
                self._write(source_fd, output)
                output.close(flush=False)

    def _write(self, source_fd, output):
        while True:
            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
            if not data:
                break
            output.write(data)

    def _my_collection(self):
        return self._remote_collection if self.update else self._local_collection

    def _get_cache_filepath(self):
        # Set up cache file name from input paths.
        md5 = hashlib.md5()
        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
        realpaths = sorted(os.path.realpath(path) for path in self.paths)
        md5.update(b'\0'.join([p.encode() for p in realpaths]))
        if self.filename:
            md5.update(self.filename.encode())
        cache_filename = md5.hexdigest()
        cache_filepath = os.path.join(
            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
            cache_filename)
        return cache_filepath

    def _setup_state(self, update_collection):
        """
        Create a new cache file or load a previously existing one.
        """
        # Load an already existing collection for update
        if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                          update_collection):
            try:
                self._remote_collection = arvados.collection.Collection(
                    update_collection,
                    api_client=self._api_client,
                    storage_classes_desired=self.storage_classes,
                    num_retries=self.num_retries)
            except arvados.errors.ApiError as error:
                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
            else:
                self.update = True
        elif update_collection:
            # Collection locator provided, but unknown format
            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))

        if self.use_cache:
            cache_filepath = self._get_cache_filepath()
            if self.resume and os.path.exists(cache_filepath):
                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
                self._cache_file = open(cache_filepath, 'a+')
            else:
                # --no-resume means start with a empty cache file.
                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
                self._cache_file = open(cache_filepath, 'w+')
            self._cache_filename = self._cache_file.name
            self._lock_file(self._cache_file)
            self._cache_file.seek(0)

        with self._state_lock:
            if self.use_cache:
                try:
                    self._state = json.load(self._cache_file)
                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
                        # Cache at least partially incomplete, set up new cache
                        self._state = copy.deepcopy(self.EMPTY_STATE)
                except ValueError:
                    # Cache file empty, set up new cache
                    self._state = copy.deepcopy(self.EMPTY_STATE)
            else:
                self.logger.info("No cache usage requested for this run.")
                # No cache file, set empty state
                self._state = copy.deepcopy(self.EMPTY_STATE)
            if not self._cached_manifest_valid():
                if not self.batch_mode:
                    raise ResumeCacheInvalidError()
                else:
                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
                    self.use_cache = False # Don't overwrite preexisting cache file.
                    self._state = copy.deepcopy(self.EMPTY_STATE)
            # Load the previous manifest so we can check if files were modified remotely.
            self._local_collection = arvados.collection.Collection(
                self._state['manifest'],
                replication_desired=self.replication_desired,
                storage_classes_desired=self.storage_classes,
                put_threads=self.put_threads,
                api_client=self._api_client,
                num_retries=self.num_retries)

    def _cached_manifest_valid(self):
        """
        Validate the oldest non-expired block signature to check if cached manifest
        is usable: checking if the cached manifest was not created with a different
        arvados account.
        """
        if self._state.get('manifest', None) is None:
            # No cached manifest yet, all good.
            return True
        now = datetime.datetime.utcnow()
        oldest_exp = None
        oldest_loc = None
        block_found = False
        for m in keep_locator_pattern.finditer(self._state['manifest']):
            loc = m.group(0)
            try:
                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
            except IndexError:
                # Locator without signature
                continue
            block_found = True
            if exp > now and (oldest_exp is None or exp < oldest_exp):
                oldest_exp = exp
                oldest_loc = loc
        if not block_found:
            # No block signatures found => no invalid block signatures.
            return True
        if oldest_loc is None:
            # Locator signatures found, but all have expired.
            # Reset the cache and move on.
            self.logger.info('Cache expired, starting from scratch.')
            self._state['manifest'] = ''
            return True
        kc = arvados.KeepClient(api_client=self._api_client,
                                num_retries=self.num_retries)
        try:
            kc.head(oldest_loc)
        except arvados.errors.KeepRequestError:
            # Something is wrong, cached manifest is not valid.
            return False
        return True

    def collection_file_paths(self, col, path_prefix='.'):
        """Return a list of file paths by recursively go through the entire collection `col`"""
        file_paths = []
        for name, item in listitems(col):
            if isinstance(item, arvados.arvfile.ArvadosFile):
                file_paths.append(os.path.join(path_prefix, name))
            elif isinstance(item, arvados.collection.Subcollection):
                new_prefix = os.path.join(path_prefix, name)
                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
        return file_paths

    def _lock_file(self, fileobj):
        try:
            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))

    def _save_state(self):
        """
        Atomically save current state into cache.
        """
        with self._state_lock:
            # We're not using copy.deepcopy() here because it's a lot slower
            # than json.dumps(), and we're already needing JSON format to be
            # saved on disk.
            state = json.dumps(self._state)
        try:
            new_cache = tempfile.NamedTemporaryFile(
                mode='w+',
                dir=os.path.dirname(self._cache_filename), delete=False)
            self._lock_file(new_cache)
            new_cache.write(state)
            new_cache.flush()
            os.fsync(new_cache)
            os.rename(new_cache.name, self._cache_filename)
        except (IOError, OSError, ResumeCacheConflict) as error:
            self.logger.error("There was a problem while saving the cache file: {}".format(error))
            try:
                os.unlink(new_cache_name)
            except NameError:  # mkstemp failed.
                pass
        else:
            self._cache_file.close()
            self._cache_file = new_cache

    def collection_name(self):
        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None

    def collection_trash_at(self):
        return self._my_collection().get_trash_at()

    def manifest_locator(self):
        return self._my_collection().manifest_locator()

    def portable_data_hash(self):
        pdh = self._my_collection().portable_data_hash()
        m = self._my_collection().stripped_manifest().encode()
        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
        if pdh != local_pdh:
            self.logger.warning("\n".join([
                "arv-put: API server provided PDH differs from local manifest.",
                "         This should not happen; showing API server version."]))
        return pdh

    def manifest_text(self, stream_name=".", strip=False, normalize=False):
        return self._my_collection().manifest_text(stream_name, strip, normalize)

    def _datablocks_on_item(self, item):
        """
        Return a list of datablock locators, recursively navigating
        through subcollections
        """
        if isinstance(item, arvados.arvfile.ArvadosFile):
            if item.size() == 0:
                # Empty file locator
                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
            else:
                locators = []
                for segment in item.segments():
                    loc = segment.locator
                    locators.append(loc)
                return locators
        elif isinstance(item, arvados.collection.Collection):
            l = [self._datablocks_on_item(x) for x in listvalues(item)]
            # Fast list flattener method taken from:
            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
            return [loc for sublist in l for loc in sublist]
        else:
            return None

    def data_locators(self):
        with self._collection_lock:
            # Make sure all datablocks are flushed before getting the locators
            self._my_collection().manifest_text()
            datablocks = self._datablocks_on_item(self._my_collection())
        return datablocks

Class variables

var CACHE_DIR
var EMPTY_STATE

Methods

def collection_file_paths(self, col, path_prefix='.')

Return a list of file paths by recursively go through the entire collection col

def collection_name(self)
def collection_trash_at(self)
def data_locators(self)
def destroy_cache(self)
def manifest_locator(self)
def manifest_text(self, stream_name='.', strip=False, normalize=False)
def portable_data_hash(self)
def report_progress(self)
def save_collection(self)
def start(self, save_collection)

Start supporting thread & file uploading

class ArvPutUploadNotPending (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ArvPutUploadNotPending(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class CollectionUpdateError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class CollectionUpdateError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class FileUploadList (dry_run=False)

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Expand source code
class FileUploadList(list):
    def __init__(self, dry_run=False):
        list.__init__(self)
        self.dry_run = dry_run

    def append(self, other):
        if self.dry_run:
            raise ArvPutUploadIsPending()
        super(FileUploadList, self).append(other)

Ancestors

  • builtins.list

Methods

def append(self, other)

Append object to the end of the list.

class PathDoesNotExistError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class PathDoesNotExistError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ResumeCache (file_spec)
Expand source code
class ResumeCache(object):
    CACHE_DIR = '.cache/arvados/arv-put'

    def __init__(self, file_spec):
        self.cache_file = open(file_spec, 'a+')
        self._lock_file(self.cache_file)
        self.filename = self.cache_file.name

    @classmethod
    def make_path(cls, args):
        md5 = hashlib.md5()
        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
        realpaths = sorted(os.path.realpath(path) for path in args.paths)
        md5.update(b'\0'.join([p.encode() for p in realpaths]))
        if any(os.path.isdir(path) for path in realpaths):
            md5.update(b'-1')
        elif args.filename:
            md5.update(args.filename.encode())
        return os.path.join(
            arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
            md5.hexdigest())

    def _lock_file(self, fileobj):
        try:
            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))

    def load(self):
        self.cache_file.seek(0)
        return json.load(self.cache_file)

    def check_cache(self, api_client=None, num_retries=0):
        try:
            state = self.load()
            locator = None
            try:
                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
                    locator = state["_finished_streams"][0][1][0]
                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
                    locator = state["_current_stream_locators"][0]
                if locator is not None:
                    kc = arvados.keep.KeepClient(api_client=api_client)
                    kc.head(locator, num_retries=num_retries)
            except Exception as e:
                self.restart()
        except (ValueError):
            pass

    def save(self, data):
        try:
            new_cache_fd, new_cache_name = tempfile.mkstemp(
                dir=os.path.dirname(self.filename))
            self._lock_file(new_cache_fd)
            new_cache = os.fdopen(new_cache_fd, 'r+')
            json.dump(data, new_cache)
            os.rename(new_cache_name, self.filename)
        except (IOError, OSError, ResumeCacheConflict):
            try:
                os.unlink(new_cache_name)
            except NameError:  # mkstemp failed.
                pass
        else:
            self.cache_file.close()
            self.cache_file = new_cache

    def close(self):
        self.cache_file.close()

    def destroy(self):
        try:
            os.unlink(self.filename)
        except OSError as error:
            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
                raise
        self.close()

    def restart(self):
        self.destroy()
        self.__init__(self.filename)

Class variables

var CACHE_DIR

Static methods

def make_path(args)

Methods

def check_cache(self, api_client=None, num_retries=0)
def close(self)
def destroy(self)
def load(self)
def restart(self)
def save(self, data)
class ResumeCacheConflict (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ResumeCacheConflict(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ResumeCacheInvalidError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ResumeCacheInvalidError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException