Module arvados.commands.put

Functions

def desired_project_uuid(api_client, project_uuid, num_retries)
Expand source code
def desired_project_uuid(api_client, project_uuid, num_retries):
    if not project_uuid:
        query = api_client.users().current()
    elif arvados.util.user_uuid_pattern.match(project_uuid):
        query = api_client.users().get(uuid=project_uuid)
    elif arvados.util.group_uuid_pattern.match(project_uuid):
        query = api_client.groups().get(uuid=project_uuid)
    else:
        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
    return query.execute(num_retries=num_retries)['uuid']
def human_progress(bytes_written, bytes_expected)
Expand source code
def human_progress(bytes_written, bytes_expected):
    if bytes_expected:
        return "\r{}M / {}M {:.1%} ".format(
            bytes_written >> 20, bytes_expected >> 20,
            float(bytes_written) / bytes_expected)
    else:
        return "\r{} ".format(bytes_written)
def machine_progress(bytes_written, bytes_expected)
Expand source code
def machine_progress(bytes_written, bytes_expected):
    return _machine_format.format(
        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, install_sig_handlers=True)
Expand source code
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
         install_sig_handlers=True):
    global api_client

    args = parse_arguments(arguments)
    logger = logging.getLogger('arvados.arv_put')
    if args.silent:
        logger.setLevel(logging.WARNING)
    else:
        logger.setLevel(logging.INFO)
    status = 0

    request_id = arvados.util.new_request_id()

    formatter = ArvPutLogFormatter(request_id)
    logging.getLogger('arvados').handlers[0].setFormatter(formatter)

    if api_client is None:
        api_client = arvados.api('v1', request_id=request_id)

    if install_sig_handlers:
        arv_cmd.install_signal_handlers()

    # Trash arguments validation
    trash_at = None
    if args.trash_at is not None:
        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
        # make sure the user provides a complete YYYY-MM-DD date.
        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
            logger.error("--trash-at argument format invalid, use --help to see examples.")
            sys.exit(1)
        # Check if no time information was provided. In that case, assume end-of-day.
        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
            args.trash_at += 'T23:59:59'
        try:
            trash_at = ciso8601.parse_datetime(args.trash_at)
        except:
            logger.error("--trash-at argument format invalid, use --help to see examples.")
            sys.exit(1)
        else:
            if trash_at.tzinfo is not None:
                # Timezone aware datetime provided.
                utcoffset = -trash_at.utcoffset()
            else:
                # Timezone naive datetime provided. Assume is local.
                if time.daylight:
                    utcoffset = datetime.timedelta(seconds=time.altzone)
                else:
                    utcoffset = datetime.timedelta(seconds=time.timezone)
            # Convert to UTC timezone naive datetime.
            trash_at = trash_at.replace(tzinfo=None) + utcoffset

        if trash_at <= datetime.datetime.utcnow():
            logger.error("--trash-at argument must be set in the future")
            sys.exit(1)
    if args.trash_after is not None:
        if args.trash_after < 1:
            logger.error("--trash-after argument must be >= 1")
            sys.exit(1)
        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))

    # Determine the name to use
    if args.name:
        if args.stream or args.raw:
            logger.error("Cannot use --name with --stream or --raw")
            sys.exit(1)
        elif args.update_collection:
            logger.error("Cannot use --name with --update-collection")
            sys.exit(1)
        collection_name = args.name
    else:
        collection_name = "Saved at {} by {}@{}".format(
            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
            pwd.getpwuid(os.getuid()).pw_name,
            socket.gethostname())

    if args.project_uuid and (args.stream or args.raw):
        logger.error("Cannot use --project-uuid with --stream or --raw")
        sys.exit(1)

    # Determine the parent project
    try:
        project_uuid = desired_project_uuid(api_client, args.project_uuid,
                                            args.retries)
    except (apiclient_errors.Error, ValueError) as error:
        logger.error(error)
        sys.exit(1)

    if args.progress:
        reporter = progress_writer(human_progress)
    elif args.batch_progress:
        reporter = progress_writer(machine_progress)
    else:
        reporter = None

    #  Split storage-classes argument
    storage_classes = None
    if args.storage_classes:
        storage_classes = args.storage_classes.strip().replace(' ', '').split(',')

    # Setup exclude regex from all the --exclude arguments provided
    name_patterns = []
    exclude_paths = []
    exclude_names = None
    if len(args.exclude) > 0:
        # We're supporting 2 kinds of exclusion patterns:
        # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
        #                            the name, wherever the file is on the tree)
        # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
        #                            entire path, and should be relative to
        #                            any input dir argument)
        # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
        #                            placed directly underneath the input dir)
        for p in args.exclude:
            # Only relative paths patterns allowed
            if p.startswith(os.sep):
                logger.error("Cannot use absolute paths with --exclude")
                sys.exit(1)
            if os.path.dirname(p):
                # We don't support of path patterns with '..'
                p_parts = p.split(os.sep)
                if '..' in p_parts:
                    logger.error(
                        "Cannot use path patterns that include or '..'")
                    sys.exit(1)
                # Path search pattern
                exclude_paths.append(p)
            else:
                # Name-only search pattern
                name_patterns.append(p)
        # For name only matching, we can combine all patterns into a single
        # regexp, for better performance.
        exclude_names = re.compile('|'.join(
            [fnmatch.translate(p) for p in name_patterns]
        )) if len(name_patterns) > 0 else None
        # Show the user the patterns to be used, just in case they weren't
        # specified inside quotes and got changed by the shell expansion.
        logger.info("Exclude patterns: {}".format(args.exclude))

    # If this is used by a human, and there's at least one directory to be
    # uploaded, the expected bytes calculation can take a moment.
    if args.progress and any([os.path.isdir(f) for f in args.paths]):
        logger.info("Calculating upload size, this could take some time...")
    try:
        writer = ArvPutUploadJob(paths = args.paths,
                                 resume = args.resume,
                                 use_cache = args.use_cache,
                                 batch_mode= args.batch,
                                 filename = args.filename,
                                 reporter = reporter,
                                 api_client = api_client,
                                 num_retries = args.retries,
                                 replication_desired = args.replication,
                                 put_threads = args.threads,
                                 name = collection_name,
                                 owner_uuid = project_uuid,
                                 ensure_unique_name = True,
                                 update_collection = args.update_collection,
                                 storage_classes=storage_classes,
                                 logger=logger,
                                 dry_run=args.dry_run,
                                 follow_links=args.follow_links,
                                 exclude_paths=exclude_paths,
                                 exclude_names=exclude_names,
                                 trash_at=trash_at)
    except ResumeCacheConflict:
        logger.error("\n".join([
            "arv-put: Another process is already uploading this data.",
            "         Use --no-cache if this is really what you want."]))
        sys.exit(1)
    except ResumeCacheInvalidError:
        logger.error("\n".join([
            "arv-put: Resume cache contains invalid signature: it may have expired",
            "         or been created with another Arvados user's credentials.",
            "         Switch user or use one of the following options to restart upload:",
            "         --no-resume to start a new resume cache.",
            "         --no-cache to disable resume cache.",
            "         --batch to ignore the resume cache if invalid."]))
        sys.exit(1)
    except (CollectionUpdateError, PathDoesNotExistError) as error:
        logger.error("\n".join([
            "arv-put: %s" % str(error)]))
        sys.exit(1)
    except ArvPutUploadIsPending:
        # Dry run check successful, return proper exit code.
        sys.exit(2)
    except ArvPutUploadNotPending:
        # No files pending for upload
        sys.exit(0)

    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
        logger.warning("\n".join([
            "arv-put: Resuming previous upload from last checkpoint.",
            "         Use the --no-resume option to start over."]))

    if not args.dry_run:
        writer.report_progress()
    output = None
    try:
        writer.start(save_collection=not(args.stream or args.raw))
    except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
        logger.error("\n".join([
            "arv-put: %s" % str(error)]))
        sys.exit(1)

    if args.progress:  # Print newline to split stderr from stdout for humans.
        logger.info("\n")

    if args.stream:
        if args.normalize:
            output = writer.manifest_text(normalize=True)
        else:
            output = writer.manifest_text()
    elif args.raw:
        output = ','.join(writer.data_locators())
    elif writer.manifest_locator() is not None:
        try:
            expiration_notice = ""
            if writer.collection_trash_at() is not None:
                # Get the local timezone-naive version, and log it with timezone information.
                if time.daylight:
                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
                else:
                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
                expiration_notice = ". It will expire on {} {}.".format(
                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
            if args.update_collection:
                logger.info(u"Collection updated: '{}'{}".format(
                    writer.collection_name(), expiration_notice))
            else:
                logger.info(u"Collection saved as '{}'{}".format(
                    writer.collection_name(), expiration_notice))
            if args.portable_data_hash:
                output = writer.portable_data_hash()
            else:
                output = writer.manifest_locator()
        except apiclient_errors.Error as error:
            logger.error(
                "arv-put: Error creating Collection on project: {}.".format(
                    error))
            status = 1
    else:
        status = 1

    # Print the locator (uuid) of the new collection.
    if output is None:
        status = status or 1
    elif not args.silent:
        stdout.write(output)
        if not output.endswith('\n'):
            stdout.write('\n')

    if install_sig_handlers:
        arv_cmd.restore_signal_handlers()

    if status != 0:
        sys.exit(status)

    # Success!
    return output
def parse_arguments(arguments)
Expand source code
def parse_arguments(arguments):
    args = arg_parser.parse_args(arguments)

    if len(args.paths) == 0:
        args.paths = ['-']

    args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]

    if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
        arg_parser.error("""
    --filename argument cannot be used when storing a directory or
    multiple files.
    """)

    # Turn on --progress by default if stderr is a tty.
    if (not (args.batch_progress or args.no_progress or args.silent)
        and os.isatty(sys.stderr.fileno())):
        args.progress = True

    # Turn off --resume (default) if --no-cache is used.
    if not args.use_cache:
        args.resume = False

    if args.paths == ['-']:
        if args.update_collection:
            arg_parser.error("""
    --update-collection cannot be used when reading from stdin.
    """)
        args.resume = False
        args.use_cache = False
        if not args.filename:
            args.filename = 'stdin'

    # Remove possible duplicated patterns
    if len(args.exclude) > 0:
        args.exclude = list(set(args.exclude))

    return args
def pathname_match(pathname, pattern)
Expand source code
def pathname_match(pathname, pattern):
    name = pathname.split(os.sep)
    # Fix patterns like 'some/subdir/' or 'some//subdir'
    pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
    if len(name) != len(pat):
        return False
    for i in range(len(name)):
        if not fnmatch.fnmatch(name[i], pat[i]):
            return False
    return True
def progress_writer(progress_func, outfile=sys.stderr)
Expand source code
def progress_writer(progress_func, outfile=sys.stderr):
    def write_progress(bytes_written, bytes_expected):
        outfile.write(progress_func(bytes_written, bytes_expected))
    return write_progress

Classes

class ArvPutArgumentConflict (*args, **kwargs)
Expand source code
class ArvPutArgumentConflict(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ArvPutLogFormatter (request_id)
Expand source code
class ArvPutLogFormatter(logging.Formatter):
    std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
    err_fmtr = None
    request_id_informed = False

    def __init__(self, request_id):
        self.err_fmtr = logging.Formatter(
            arvados.log_format+' (X-Request-Id: {})'.format(request_id),
            arvados.log_date_format)

    def format(self, record):
        if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
            self.request_id_informed = True
            return self.err_fmtr.format(record)
        return self.std_fmtr.format(record)

Formatter instances are used to convert a LogRecord to text.

Formatters need to know how a LogRecord is constructed. They are responsible for converting a LogRecord to (usually) a string which can be interpreted by either a human or an external system. The base Formatter allows a formatting string to be specified. If none is supplied, the style-dependent default value, "%(message)s", "{message}", or "${message}", is used.

The Formatter can be initialized with a format string which makes use of knowledge of the LogRecord attributes - e.g. the default value mentioned above makes use of the fact that the user's message and arguments are pre- formatted into a LogRecord's message attribute. Currently, the useful attributes in a LogRecord are described by:

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of '%', '{' or '$' to specify that you want to use one of %-formatting, :meth:str.format ({}) formatting or :class:string.Template formatting in your format string.

Changed in version: 3.2

Added the style parameter.

Ancestors

  • logging.Formatter

Class variables

var err_fmtr
var request_id_informed
var std_fmtr

Methods

def format(self, record)
Expand source code
def format(self, record):
    if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
        self.request_id_informed = True
        return self.err_fmtr.format(record)
    return self.std_fmtr.format(record)

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class ArvPutUploadIsPending (*args, **kwargs)
Expand source code
class ArvPutUploadIsPending(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ArvPutUploadJob (paths,
resume=True,
use_cache=True,
reporter=None,
name=None,
owner_uuid=None,
api_client=None,
batch_mode=False,
ensure_unique_name=False,
num_retries=None,
put_threads=None,
replication_desired=None,
filename=None,
update_time=60.0,
update_collection=None,
storage_classes=None,
logger=<Logger arvados.arv_put (WARNING)>,
dry_run=False,
follow_links=True,
exclude_paths=[],
exclude_names=None,
trash_at=None)
Expand source code
class ArvPutUploadJob(object):
    CACHE_DIR = '.cache/arvados/arv-put'
    EMPTY_STATE = {
        'manifest' : None, # Last saved manifest checkpoint
        'files' : {} # Previous run file list: {path : {size, mtime}}
    }

    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
                 name=None, owner_uuid=None, api_client=None, batch_mode=False,
                 ensure_unique_name=False, num_retries=None,
                 put_threads=None, replication_desired=None, filename=None,
                 update_time=60.0, update_collection=None, storage_classes=None,
                 logger=logging.getLogger('arvados.arv_put'), dry_run=False,
                 follow_links=True, exclude_paths=[], exclude_names=None,
                 trash_at=None):
        self.paths = paths
        self.resume = resume
        self.use_cache = use_cache
        self.batch_mode = batch_mode
        self.update = False
        self.reporter = reporter
        # This will set to 0 before start counting, if no special files are going
        # to be read.
        self.bytes_expected = None
        self.bytes_written = 0
        self.bytes_skipped = 0
        self.name = name
        self.owner_uuid = owner_uuid
        self.ensure_unique_name = ensure_unique_name
        self.num_retries = num_retries
        self.replication_desired = replication_desired
        self.put_threads = put_threads
        self.filename = filename
        self.storage_classes = storage_classes
        self._api_client = api_client
        self._state_lock = threading.Lock()
        self._state = None # Previous run state (file list & manifest)
        self._current_files = [] # Current run file list
        self._cache_file = None
        self._collection_lock = threading.Lock()
        self._remote_collection = None # Collection being updated (if asked)
        self._local_collection = None # Collection from previous run manifest
        self._file_paths = set() # Files to be updated in remote collection
        self._stop_checkpointer = threading.Event()
        self._checkpointer = threading.Thread(target=self._update_task)
        self._checkpointer.daemon = True
        self._update_task_time = update_time  # How many seconds wait between update runs
        self._files_to_upload = FileUploadList(dry_run=dry_run)
        self._upload_started = False
        self.logger = logger
        self.dry_run = dry_run
        self._checkpoint_before_quit = True
        self.follow_links = follow_links
        self.exclude_paths = exclude_paths
        self.exclude_names = exclude_names
        self._trash_at = trash_at

        if self._trash_at is not None:
            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
                raise TypeError('provided trash_at datetime should be timezone-naive')

        if not self.use_cache and self.resume:
            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')

        # Check for obvious dry-run responses
        if self.dry_run and (not self.use_cache or not self.resume):
            raise ArvPutUploadIsPending()

        # Load cached data if any and if needed
        self._setup_state(update_collection)

        # Build the upload file list, excluding requested files and counting the
        # bytes expected to be uploaded.
        self._build_upload_list()

    def _build_upload_list(self):
        """
        Scan the requested paths to count file sizes, excluding requested files
        and dirs and building the upload file list.
        """
        # If there aren't special files to be read, reset total bytes count to zero
        # to start counting.
        if not any([p for p in self.paths
                    if not (os.path.isfile(p) or os.path.isdir(p))]):
            self.bytes_expected = 0

        for path in self.paths:
            # Test for stdin first, in case some file named '-' exist
            if path == '-':
                if self.dry_run:
                    raise ArvPutUploadIsPending()
                self._write_stdin(self.filename or 'stdin')
            elif not os.path.exists(path):
                 raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
            elif (not self.follow_links) and os.path.islink(path):
                self.logger.warning("Skipping symlink '{}'".format(path))
                continue
            elif os.path.isdir(path):
                # Use absolute paths on cache index so CWD doesn't interfere
                # with the caching logic.
                orig_path = path
                path = os.path.abspath(path)
                if orig_path[-1:] == os.sep:
                    # When passing a directory reference with a trailing slash,
                    # its contents should be uploaded directly to the
                    # collection's root.
                    prefixdir = path
                else:
                    # When passing a directory reference with no trailing slash,
                    # upload the directory to the collection's root.
                    prefixdir = os.path.dirname(path)
                prefixdir += os.sep
                for root, dirs, files in os.walk(path,
                                                 followlinks=self.follow_links):
                    root_relpath = os.path.relpath(root, path)
                    if root_relpath == '.':
                        root_relpath = ''
                    # Exclude files/dirs by full path matching pattern
                    if self.exclude_paths:
                        dirs[:] = [d for d in dirs
                                   if not any(pathname_match(
                                           os.path.join(root_relpath, d), pat)
                                              for pat in self.exclude_paths)]
                        files = [f for f in files
                                 if not any(pathname_match(
                                         os.path.join(root_relpath, f), pat)
                                            for pat in self.exclude_paths)]
                    # Exclude files/dirs by name matching pattern
                    if self.exclude_names is not None:
                        dirs[:] = [d for d in dirs
                                   if not self.exclude_names.match(d)]
                        files = [f for f in files
                                 if not self.exclude_names.match(f)]
                    # Make os.walk()'s dir traversing order deterministic
                    dirs.sort()
                    files.sort()
                    for f in files:
                        filepath = os.path.join(root, f)
                        if not os.path.isfile(filepath):
                            self.logger.warning("Skipping non-regular file '{}'".format(filepath))
                            continue
                        # Add its size to the total bytes count (if applicable)
                        if self.follow_links or (not os.path.islink(filepath)):
                            if self.bytes_expected is not None:
                                self.bytes_expected += os.path.getsize(filepath)
                        self._check_file(filepath,
                                         os.path.join(root[len(prefixdir):], f))
            else:
                filepath = os.path.abspath(path)
                # Add its size to the total bytes count (if applicable)
                if self.follow_links or (not os.path.islink(filepath)):
                    if self.bytes_expected is not None:
                        self.bytes_expected += os.path.getsize(filepath)
                self._check_file(filepath,
                                 self.filename or os.path.basename(path))
        # If dry-mode is on, and got up to this point, then we should notify that
        # there aren't any file to upload.
        if self.dry_run:
            raise ArvPutUploadNotPending()
        # Remove local_collection's files that don't exist locally anymore, so the
        # bytes_written count is correct.
        for f in self.collection_file_paths(self._local_collection,
                                            path_prefix=""):
            if f != 'stdin' and f != self.filename and not f in self._file_paths:
                self._local_collection.remove(f)

    def start(self, save_collection):
        """
        Start supporting thread & file uploading
        """
        self._checkpointer.start()
        try:
            # Update bytes_written from current local collection and
            # report initial progress.
            self._update()
            # Actual file upload
            self._upload_started = True # Used by the update thread to start checkpointing
            self._upload_files()
        except (SystemExit, Exception) as e:
            self._checkpoint_before_quit = False
            # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
            # Note: We're expecting SystemExit instead of
            # KeyboardInterrupt because we have a custom signal
            # handler in place that raises SystemExit with the catched
            # signal's code.
            if isinstance(e, PathDoesNotExistError):
                # We aren't interested in the traceback for this case
                pass
            elif not isinstance(e, SystemExit) or e.code != -2:
                self.logger.warning("Abnormal termination:\n{}".format(
                    traceback.format_exc()))
            raise
        finally:
            if not self.dry_run:
                # Stop the thread before doing anything else
                self._stop_checkpointer.set()
                self._checkpointer.join()
                if self._checkpoint_before_quit:
                    # Commit all pending blocks & one last _update()
                    self._local_collection.manifest_text()
                    self._update(final=True)
                    if save_collection:
                        self.save_collection()
            if self.use_cache:
                self._cache_file.close()

    def _collection_trash_at(self):
        """
        Returns the trash date that the collection should use at save time.
        Takes into account absolute/relative trash_at values requested
        by the user.
        """
        if type(self._trash_at) == datetime.timedelta:
            # Get an absolute datetime for trash_at
            return datetime.datetime.utcnow() + self._trash_at
        return self._trash_at

    def save_collection(self):
        if self.update:
            # Check if files should be updated on the remote collection.
            for fp in self._file_paths:
                remote_file = self._remote_collection.find(fp)
                if not remote_file:
                    # File don't exist on remote collection, copy it.
                    self._remote_collection.copy(fp, fp, self._local_collection)
                elif remote_file != self._local_collection.find(fp):
                    # A different file exist on remote collection, overwrite it.
                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
                else:
                    # The file already exist on remote collection, skip it.
                    pass
            self._remote_collection.save(num_retries=self.num_retries,
                                         trash_at=self._collection_trash_at())
        else:
            if len(self._local_collection) == 0:
                self.logger.warning("No files were uploaded, skipping collection creation.")
                return
            self._local_collection.save_new(
                name=self.name, owner_uuid=self.owner_uuid,
                ensure_unique_name=self.ensure_unique_name,
                num_retries=self.num_retries,
                trash_at=self._collection_trash_at())

    def destroy_cache(self):
        if self.use_cache:
            try:
                os.unlink(self._cache_filename)
            except OSError as error:
                # That's what we wanted anyway.
                if error.errno != errno.ENOENT:
                    raise
            self._cache_file.close()

    def _collection_size(self, collection):
        """
        Recursively get the total size of the collection
        """
        size = 0
        for item in listvalues(collection):
            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
                size += self._collection_size(item)
            else:
                size += item.size()
        return size

    def _update_task(self):
        """
        Periodically called support task. File uploading is
        asynchronous so we poll status from the collection.
        """
        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
            self._update()

    def _update(self, final=False):
        """
        Update cached manifest text and report progress.
        """
        if self._upload_started:
            with self._collection_lock:
                self.bytes_written = self._collection_size(self._local_collection)
                if self.use_cache:
                    if final:
                        manifest = self._local_collection.manifest_text()
                    else:
                        # Get the manifest text without comitting pending blocks
                        manifest = self._local_collection.manifest_text(strip=False,
                                                                        normalize=False,
                                                                        only_committed=True)
                    # Update cache
                    with self._state_lock:
                        self._state['manifest'] = manifest
            if self.use_cache:
                try:
                    self._save_state()
                except Exception as e:
                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
            # Keep remote collection's trash_at attribute synced when using relative expire dates
            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
                try:
                    self._api_client.collections().update(
                        uuid=self._remote_collection.manifest_locator(),
                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
                    ).execute(num_retries=self.num_retries)
                except Exception as e:
                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
        else:
            self.bytes_written = self.bytes_skipped
        # Call the reporter, if any
        self.report_progress()

    def report_progress(self):
        if self.reporter is not None:
            self.reporter(self.bytes_written, self.bytes_expected)

    def _write_stdin(self, filename):
        output = self._local_collection.open(filename, 'wb')
        self._write(sys.stdin.buffer, output)
        output.close()

    def _check_file(self, source, filename):
        """
        Check if this file needs to be uploaded
        """
        # Ignore symlinks when requested
        if (not self.follow_links) and os.path.islink(source):
            return
        resume_offset = 0
        should_upload = False
        new_file_in_cache = False
        # Record file path for updating the remote collection before exiting
        self._file_paths.add(filename)

        with self._state_lock:
            # If no previous cached data on this file, store it for an eventual
            # repeated run.
            if source not in self._state['files']:
                self._state['files'][source] = {
                    'mtime': os.path.getmtime(source),
                    'size' : os.path.getsize(source)
                }
                new_file_in_cache = True
            cached_file_data = self._state['files'][source]

        # Check if file was already uploaded (at least partially)
        file_in_local_collection = self._local_collection.find(filename)

        # If not resuming, upload the full file.
        if not self.resume:
            should_upload = True
        # New file detected from last run, upload it.
        elif new_file_in_cache:
            should_upload = True
        # Local file didn't change from last run.
        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
            if not file_in_local_collection:
                # File not uploaded yet, upload it completely
                should_upload = True
            elif file_in_local_collection.permission_expired():
                # Permission token expired, re-upload file. This will change whenever
                # we have a API for refreshing tokens.
                self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
                should_upload = True
                self._local_collection.remove(filename)
            elif cached_file_data['size'] == file_in_local_collection.size():
                # File already there, skip it.
                self.bytes_skipped += cached_file_data['size']
            elif cached_file_data['size'] > file_in_local_collection.size():
                # File partially uploaded, resume!
                resume_offset = file_in_local_collection.size()
                self.bytes_skipped += resume_offset
                should_upload = True
            else:
                # Inconsistent cache, re-upload the file
                should_upload = True
                self._local_collection.remove(filename)
                self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
        # Local file differs from cached data, re-upload it.
        else:
            if file_in_local_collection:
                self._local_collection.remove(filename)
            should_upload = True

        if should_upload:
            try:
                self._files_to_upload.append((source, resume_offset, filename))
            except ArvPutUploadIsPending:
                # This could happen when running on dry-mode, close cache file to
                # avoid locking issues.
                self._cache_file.close()
                raise

    def _upload_files(self):
        for source, resume_offset, filename in self._files_to_upload:
            with open(source, 'rb') as source_fd:
                with self._state_lock:
                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
                    self._state['files'][source]['size'] = os.path.getsize(source)
                if resume_offset > 0:
                    # Start upload where we left off
                    output = self._local_collection.open(filename, 'ab')
                    source_fd.seek(resume_offset)
                else:
                    # Start from scratch
                    output = self._local_collection.open(filename, 'wb')
                self._write(source_fd, output)
                output.close(flush=False)

    def _write(self, source_fd, output):
        while True:
            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
            if not data:
                break
            output.write(data)

    def _my_collection(self):
        return self._remote_collection if self.update else self._local_collection

    def _get_cache_filepath(self):
        # Set up cache file name from input paths.
        md5 = hashlib.md5()
        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
        realpaths = sorted(os.path.realpath(path) for path in self.paths)
        md5.update(b'\0'.join([p.encode() for p in realpaths]))
        if self.filename:
            md5.update(self.filename.encode())
        cache_filename = md5.hexdigest()
        cache_filepath = os.path.join(
            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
            cache_filename)
        return cache_filepath

    def _setup_state(self, update_collection):
        """
        Create a new cache file or load a previously existing one.
        """
        # Load an already existing collection for update
        if update_collection and re.match(arvados.util.collection_uuid_pattern,
                                          update_collection):
            try:
                self._remote_collection = arvados.collection.Collection(
                    update_collection,
                    api_client=self._api_client,
                    storage_classes_desired=self.storage_classes,
                    num_retries=self.num_retries)
            except arvados.errors.ApiError as error:
                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
            else:
                self.update = True
        elif update_collection:
            # Collection locator provided, but unknown format
            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))

        if self.use_cache:
            cache_filepath = self._get_cache_filepath()
            if self.resume and os.path.exists(cache_filepath):
                self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
                self._cache_file = open(cache_filepath, 'a+')
            else:
                # --no-resume means start with a empty cache file.
                self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
                self._cache_file = open(cache_filepath, 'w+')
            self._cache_filename = self._cache_file.name
            self._lock_file(self._cache_file)
            self._cache_file.seek(0)

        with self._state_lock:
            if self.use_cache:
                try:
                    self._state = json.load(self._cache_file)
                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
                        # Cache at least partially incomplete, set up new cache
                        self._state = copy.deepcopy(self.EMPTY_STATE)
                except ValueError:
                    # Cache file empty, set up new cache
                    self._state = copy.deepcopy(self.EMPTY_STATE)
            else:
                self.logger.info("No cache usage requested for this run.")
                # No cache file, set empty state
                self._state = copy.deepcopy(self.EMPTY_STATE)
            if not self._cached_manifest_valid():
                if not self.batch_mode:
                    raise ResumeCacheInvalidError()
                else:
                    self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
                    self.use_cache = False # Don't overwrite preexisting cache file.
                    self._state = copy.deepcopy(self.EMPTY_STATE)
            # Load the previous manifest so we can check if files were modified remotely.
            self._local_collection = arvados.collection.Collection(
                self._state['manifest'],
                replication_desired=self.replication_desired,
                storage_classes_desired=self.storage_classes,
                put_threads=self.put_threads,
                api_client=self._api_client,
                num_retries=self.num_retries)

    def _cached_manifest_valid(self):
        """
        Validate the oldest non-expired block signature to check if cached manifest
        is usable: checking if the cached manifest was not created with a different
        arvados account.
        """
        if self._state.get('manifest', None) is None:
            # No cached manifest yet, all good.
            return True
        now = datetime.datetime.utcnow()
        oldest_exp = None
        oldest_loc = None
        block_found = False
        for m in keep_locator_pattern.finditer(self._state['manifest']):
            loc = m.group(0)
            try:
                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
            except IndexError:
                # Locator without signature
                continue
            block_found = True
            if exp > now and (oldest_exp is None or exp < oldest_exp):
                oldest_exp = exp
                oldest_loc = loc
        if not block_found:
            # No block signatures found => no invalid block signatures.
            return True
        if oldest_loc is None:
            # Locator signatures found, but all have expired.
            # Reset the cache and move on.
            self.logger.info('Cache expired, starting from scratch.')
            self._state['manifest'] = ''
            return True
        kc = arvados.KeepClient(api_client=self._api_client,
                                num_retries=self.num_retries)
        try:
            kc.head(oldest_loc)
        except arvados.errors.KeepRequestError:
            # Something is wrong, cached manifest is not valid.
            return False
        return True

    def collection_file_paths(self, col, path_prefix='.'):
        """Return a list of file paths by recursively go through the entire collection `col`"""
        file_paths = []
        for name, item in listitems(col):
            if isinstance(item, arvados.arvfile.ArvadosFile):
                file_paths.append(os.path.join(path_prefix, name))
            elif isinstance(item, arvados.collection.Subcollection):
                new_prefix = os.path.join(path_prefix, name)
                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
        return file_paths

    def _lock_file(self, fileobj):
        try:
            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))

    def _save_state(self):
        """
        Atomically save current state into cache.
        """
        with self._state_lock:
            # We're not using copy.deepcopy() here because it's a lot slower
            # than json.dumps(), and we're already needing JSON format to be
            # saved on disk.
            state = json.dumps(self._state)
        try:
            new_cache = tempfile.NamedTemporaryFile(
                mode='w+',
                dir=os.path.dirname(self._cache_filename), delete=False)
            self._lock_file(new_cache)
            new_cache.write(state)
            new_cache.flush()
            os.fsync(new_cache)
            os.rename(new_cache.name, self._cache_filename)
        except (IOError, OSError, ResumeCacheConflict) as error:
            self.logger.error("There was a problem while saving the cache file: {}".format(error))
            try:
                os.unlink(new_cache_name)
            except NameError:  # mkstemp failed.
                pass
        else:
            self._cache_file.close()
            self._cache_file = new_cache

    def collection_name(self):
        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None

    def collection_trash_at(self):
        return self._my_collection().get_trash_at()

    def manifest_locator(self):
        return self._my_collection().manifest_locator()

    def portable_data_hash(self):
        pdh = self._my_collection().portable_data_hash()
        m = self._my_collection().stripped_manifest().encode()
        local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
        if pdh != local_pdh:
            self.logger.warning("\n".join([
                "arv-put: API server provided PDH differs from local manifest.",
                "         This should not happen; showing API server version."]))
        return pdh

    def manifest_text(self, stream_name=".", strip=False, normalize=False):
        return self._my_collection().manifest_text(stream_name, strip, normalize)

    def _datablocks_on_item(self, item):
        """
        Return a list of datablock locators, recursively navigating
        through subcollections
        """
        if isinstance(item, arvados.arvfile.ArvadosFile):
            if item.size() == 0:
                # Empty file locator
                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
            else:
                locators = []
                for segment in item.segments():
                    loc = segment.locator
                    locators.append(loc)
                return locators
        elif isinstance(item, arvados.collection.Collection):
            l = [self._datablocks_on_item(x) for x in listvalues(item)]
            # Fast list flattener method taken from:
            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
            return [loc for sublist in l for loc in sublist]
        else:
            return None

    def data_locators(self):
        with self._collection_lock:
            # Make sure all datablocks are flushed before getting the locators
            self._my_collection().manifest_text()
            datablocks = self._datablocks_on_item(self._my_collection())
        return datablocks

Class variables

var CACHE_DIR
var EMPTY_STATE

Methods

def collection_file_paths(self, col, path_prefix='.')
Expand source code
def collection_file_paths(self, col, path_prefix='.'):
    """Return a list of file paths by recursively go through the entire collection `col`"""
    file_paths = []
    for name, item in listitems(col):
        if isinstance(item, arvados.arvfile.ArvadosFile):
            file_paths.append(os.path.join(path_prefix, name))
        elif isinstance(item, arvados.collection.Subcollection):
            new_prefix = os.path.join(path_prefix, name)
            file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
    return file_paths

Return a list of file paths by recursively go through the entire collection col

def collection_name(self)
Expand source code
def collection_name(self):
    return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
def collection_trash_at(self)
Expand source code
def collection_trash_at(self):
    return self._my_collection().get_trash_at()
def data_locators(self)
Expand source code
def data_locators(self):
    with self._collection_lock:
        # Make sure all datablocks are flushed before getting the locators
        self._my_collection().manifest_text()
        datablocks = self._datablocks_on_item(self._my_collection())
    return datablocks
def destroy_cache(self)
Expand source code
def destroy_cache(self):
    if self.use_cache:
        try:
            os.unlink(self._cache_filename)
        except OSError as error:
            # That's what we wanted anyway.
            if error.errno != errno.ENOENT:
                raise
        self._cache_file.close()
def manifest_locator(self)
Expand source code
def manifest_locator(self):
    return self._my_collection().manifest_locator()
def manifest_text(self, stream_name='.', strip=False, normalize=False)
Expand source code
def manifest_text(self, stream_name=".", strip=False, normalize=False):
    return self._my_collection().manifest_text(stream_name, strip, normalize)
def portable_data_hash(self)
Expand source code
def portable_data_hash(self):
    pdh = self._my_collection().portable_data_hash()
    m = self._my_collection().stripped_manifest().encode()
    local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
    if pdh != local_pdh:
        self.logger.warning("\n".join([
            "arv-put: API server provided PDH differs from local manifest.",
            "         This should not happen; showing API server version."]))
    return pdh
def report_progress(self)
Expand source code
def report_progress(self):
    if self.reporter is not None:
        self.reporter(self.bytes_written, self.bytes_expected)
def save_collection(self)
Expand source code
def save_collection(self):
    if self.update:
        # Check if files should be updated on the remote collection.
        for fp in self._file_paths:
            remote_file = self._remote_collection.find(fp)
            if not remote_file:
                # File don't exist on remote collection, copy it.
                self._remote_collection.copy(fp, fp, self._local_collection)
            elif remote_file != self._local_collection.find(fp):
                # A different file exist on remote collection, overwrite it.
                self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
            else:
                # The file already exist on remote collection, skip it.
                pass
        self._remote_collection.save(num_retries=self.num_retries,
                                     trash_at=self._collection_trash_at())
    else:
        if len(self._local_collection) == 0:
            self.logger.warning("No files were uploaded, skipping collection creation.")
            return
        self._local_collection.save_new(
            name=self.name, owner_uuid=self.owner_uuid,
            ensure_unique_name=self.ensure_unique_name,
            num_retries=self.num_retries,
            trash_at=self._collection_trash_at())
def start(self, save_collection)
Expand source code
def start(self, save_collection):
    """
    Start supporting thread & file uploading
    """
    self._checkpointer.start()
    try:
        # Update bytes_written from current local collection and
        # report initial progress.
        self._update()
        # Actual file upload
        self._upload_started = True # Used by the update thread to start checkpointing
        self._upload_files()
    except (SystemExit, Exception) as e:
        self._checkpoint_before_quit = False
        # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
        # Note: We're expecting SystemExit instead of
        # KeyboardInterrupt because we have a custom signal
        # handler in place that raises SystemExit with the catched
        # signal's code.
        if isinstance(e, PathDoesNotExistError):
            # We aren't interested in the traceback for this case
            pass
        elif not isinstance(e, SystemExit) or e.code != -2:
            self.logger.warning("Abnormal termination:\n{}".format(
                traceback.format_exc()))
        raise
    finally:
        if not self.dry_run:
            # Stop the thread before doing anything else
            self._stop_checkpointer.set()
            self._checkpointer.join()
            if self._checkpoint_before_quit:
                # Commit all pending blocks & one last _update()
                self._local_collection.manifest_text()
                self._update(final=True)
                if save_collection:
                    self.save_collection()
        if self.use_cache:
            self._cache_file.close()

Start supporting thread & file uploading

class ArvPutUploadNotPending (*args, **kwargs)
Expand source code
class ArvPutUploadNotPending(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class CollectionUpdateError (*args, **kwargs)
Expand source code
class CollectionUpdateError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class FileUploadList (dry_run=False)
Expand source code
class FileUploadList(list):
    def __init__(self, dry_run=False):
        list.__init__(self)
        self.dry_run = dry_run

    def append(self, other):
        if self.dry_run:
            raise ArvPutUploadIsPending()
        super(FileUploadList, self).append(other)

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Ancestors

  • builtins.list

Methods

def append(self, other)
Expand source code
def append(self, other):
    if self.dry_run:
        raise ArvPutUploadIsPending()
    super(FileUploadList, self).append(other)

Append object to the end of the list.

class PathDoesNotExistError (*args, **kwargs)
Expand source code
class PathDoesNotExistError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ResumeCache (file_spec)
Expand source code
class ResumeCache(object):
    CACHE_DIR = '.cache/arvados/arv-put'

    def __init__(self, file_spec):
        self.cache_file = open(file_spec, 'a+')
        self._lock_file(self.cache_file)
        self.filename = self.cache_file.name

    @classmethod
    def make_path(cls, args):
        md5 = hashlib.md5()
        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
        realpaths = sorted(os.path.realpath(path) for path in args.paths)
        md5.update(b'\0'.join([p.encode() for p in realpaths]))
        if any(os.path.isdir(path) for path in realpaths):
            md5.update(b'-1')
        elif args.filename:
            md5.update(args.filename.encode())
        return os.path.join(
            arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
            md5.hexdigest())

    def _lock_file(self, fileobj):
        try:
            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            raise ResumeCacheConflict(u"{} locked".format(fileobj.name))

    def load(self):
        self.cache_file.seek(0)
        return json.load(self.cache_file)

    def check_cache(self, api_client=None, num_retries=0):
        try:
            state = self.load()
            locator = None
            try:
                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
                    locator = state["_finished_streams"][0][1][0]
                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
                    locator = state["_current_stream_locators"][0]
                if locator is not None:
                    kc = arvados.keep.KeepClient(api_client=api_client)
                    kc.head(locator, num_retries=num_retries)
            except Exception as e:
                self.restart()
        except (ValueError):
            pass

    def save(self, data):
        try:
            new_cache_fd, new_cache_name = tempfile.mkstemp(
                dir=os.path.dirname(self.filename))
            self._lock_file(new_cache_fd)
            new_cache = os.fdopen(new_cache_fd, 'r+')
            json.dump(data, new_cache)
            os.rename(new_cache_name, self.filename)
        except (IOError, OSError, ResumeCacheConflict):
            try:
                os.unlink(new_cache_name)
            except NameError:  # mkstemp failed.
                pass
        else:
            self.cache_file.close()
            self.cache_file = new_cache

    def close(self):
        self.cache_file.close()

    def destroy(self):
        try:
            os.unlink(self.filename)
        except OSError as error:
            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
                raise
        self.close()

    def restart(self):
        self.destroy()
        self.__init__(self.filename)

Class variables

var CACHE_DIR

Static methods

def make_path(args)

Methods

def check_cache(self, api_client=None, num_retries=0)
Expand source code
def check_cache(self, api_client=None, num_retries=0):
    try:
        state = self.load()
        locator = None
        try:
            if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
                locator = state["_finished_streams"][0][1][0]
            elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
                locator = state["_current_stream_locators"][0]
            if locator is not None:
                kc = arvados.keep.KeepClient(api_client=api_client)
                kc.head(locator, num_retries=num_retries)
        except Exception as e:
            self.restart()
    except (ValueError):
        pass
def close(self)
Expand source code
def close(self):
    self.cache_file.close()
def destroy(self)
Expand source code
def destroy(self):
    try:
        os.unlink(self.filename)
    except OSError as error:
        if error.errno != errno.ENOENT:  # That's what we wanted anyway.
            raise
    self.close()
def load(self)
Expand source code
def load(self):
    self.cache_file.seek(0)
    return json.load(self.cache_file)
def restart(self)
Expand source code
def restart(self):
    self.destroy()
    self.__init__(self.filename)
def save(self, data)
Expand source code
def save(self, data):
    try:
        new_cache_fd, new_cache_name = tempfile.mkstemp(
            dir=os.path.dirname(self.filename))
        self._lock_file(new_cache_fd)
        new_cache = os.fdopen(new_cache_fd, 'r+')
        json.dump(data, new_cache)
        os.rename(new_cache_name, self.filename)
    except (IOError, OSError, ResumeCacheConflict):
        try:
            os.unlink(new_cache_name)
        except NameError:  # mkstemp failed.
            pass
    else:
        self.cache_file.close()
        self.cache_file = new_cache
class ResumeCacheConflict (*args, **kwargs)
Expand source code
class ResumeCacheConflict(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ResumeCacheInvalidError (*args, **kwargs)
Expand source code
class ResumeCacheInvalidError(Exception):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException