Module arvados.arvfile

Functions

def must_be_writable(orig_func)
def split(path)

split(path) -> streamname, filename

Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume '.'.

def synchronized(orig_func)

Classes

class ArvadosFile (parent, name, stream=[], segments=[])

Represent a file in a Collection.

ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.

This object may be accessed from multiple threads.

ArvadosFile constructor.

:stream: a list of Range objects representing a block stream

:segments: a list of Range objects representing segments

Expand source code
class ArvadosFile(object):
    """Represent a file in a Collection.

    ArvadosFile manages the underlying representation of a file in Keep as a
    sequence of segments spanning a set of blocks, and implements random
    read/write access.

    This object may be accessed from multiple threads.

    """

    __slots__ = ('parent', 'name', '_writers', '_committed',
                 '_segments', 'lock', '_current_bblock', 'fuse_entry')

    def __init__(self, parent, name, stream=[], segments=[]):
        """
        ArvadosFile constructor.

        :stream:
          a list of Range objects representing a block stream

        :segments:
          a list of Range objects representing segments
        """
        self.parent = parent
        self.name = name
        self._writers = set()
        self._committed = False
        self._segments = []
        self.lock = parent.root_collection().lock
        for s in segments:
            self._add_segment(stream, s.locator, s.range_size)
        self._current_bblock = None

    def writable(self):
        return self.parent.writable()

    @synchronized
    def permission_expired(self, as_of_dt=None):
        """Returns True if any of the segment's locators is expired"""
        for r in self._segments:
            if KeepLocator(r.locator).permission_expired(as_of_dt):
                return True
        return False

    @synchronized
    def has_remote_blocks(self):
        """Returns True if any of the segment's locators has a +R signature"""

        for s in self._segments:
            if '+R' in s.locator:
                return True
        return False

    @synchronized
    def _copy_remote_blocks(self, remote_blocks={}):
        """Ask Keep to copy remote blocks and point to their local copies.

        This is called from the parent Collection.

        :remote_blocks:
            Shared cache of remote to local block mappings. This is used to avoid
            doing extra work when blocks are shared by more than one file in
            different subdirectories.
        """

        for s in self._segments:
            if '+R' in s.locator:
                try:
                    loc = remote_blocks[s.locator]
                except KeyError:
                    loc = self.parent._my_keep().refresh_signature(s.locator)
                    remote_blocks[s.locator] = loc
                s.locator = loc
                self.parent.set_committed(False)
        return remote_blocks

    @synchronized
    def segments(self):
        return copy.copy(self._segments)

    @synchronized
    def clone(self, new_parent, new_name):
        """Make a copy of this file."""
        cp = ArvadosFile(new_parent, new_name)
        cp.replace_contents(self)
        return cp

    @must_be_writable
    @synchronized
    def replace_contents(self, other):
        """Replace segments of this file with segments from another `ArvadosFile` object."""

        map_loc = {}
        self._segments = []
        for other_segment in other.segments():
            new_loc = other_segment.locator
            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
                if other_segment.locator not in map_loc:
                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
                    if bufferblock.state() != _BufferBlock.WRITABLE:
                        map_loc[other_segment.locator] = bufferblock.locator()
                    else:
                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
                new_loc = map_loc[other_segment.locator]

            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))

        self.set_committed(False)

    def __eq__(self, other):
        if other is self:
            return True
        if not isinstance(other, ArvadosFile):
            return False

        othersegs = other.segments()
        with self.lock:
            if len(self._segments) != len(othersegs):
                return False
            for i in range(0, len(othersegs)):
                seg1 = self._segments[i]
                seg2 = othersegs[i]
                loc1 = seg1.locator
                loc2 = seg2.locator

                if self.parent._my_block_manager().is_bufferblock(loc1):
                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()

                if other.parent._my_block_manager().is_bufferblock(loc2):
                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()

                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
                    seg1.range_start != seg2.range_start or
                    seg1.range_size != seg2.range_size or
                    seg1.segment_offset != seg2.segment_offset):
                    return False

        return True

    def __ne__(self, other):
        return not self.__eq__(other)

    @synchronized
    def set_segments(self, segs):
        self._segments = segs

    @synchronized
    def set_committed(self, value=True):
        """Set committed flag.

        If value is True, set committed to be True.

        If value is False, set committed to be False for this and all parents.
        """
        if value == self._committed:
            return
        self._committed = value
        if self._committed is False and self.parent is not None:
            self.parent.set_committed(False)

    @synchronized
    def committed(self):
        """Get whether this is committed or not."""
        return self._committed

    @synchronized
    def add_writer(self, writer):
        """Add an ArvadosFileWriter reference to the list of writers"""
        if isinstance(writer, ArvadosFileWriter):
            self._writers.add(writer)

    @synchronized
    def remove_writer(self, writer, flush):
        """
        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
        and do some block maintenance tasks.
        """
        self._writers.remove(writer)

        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
            # File writer closed, not small enough for repacking
            self.flush()
        elif self.closed():
            # All writers closed and size is adequate for repacking
            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())

    def closed(self):
        """
        Get whether this is closed or not. When the writers list is empty, the file
        is supposed to be closed.
        """
        return len(self._writers) == 0

    @must_be_writable
    @synchronized
    def truncate(self, size):
        """Shrink or expand the size of the file.

        If `size` is less than the size of the file, the file contents after
        `size` will be discarded.  If `size` is greater than the current size
        of the file, it will be filled with zero bytes.

        """
        if size < self.size():
            new_segs = []
            for r in self._segments:
                range_end = r.range_start+r.range_size
                if r.range_start >= size:
                    # segment is past the trucate size, all done
                    break
                elif size < range_end:
                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
                    nr.segment_offset = r.segment_offset
                    new_segs.append(nr)
                    break
                else:
                    new_segs.append(r)

            self._segments = new_segs
            self.set_committed(False)
        elif size > self.size():
            padding = self.parent._my_block_manager().get_padding_block()
            diff = size - self.size()
            while diff > config.KEEP_BLOCK_SIZE:
                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
                diff -= config.KEEP_BLOCK_SIZE
            if diff > 0:
                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
            self.set_committed(False)
        else:
            # size == self.size()
            pass

    def readfrom(self, offset, size, num_retries, exact=False):
        """Read up to `size` bytes from the file starting at `offset`.

        :exact:
         If False (default), return less data than requested if the read
         crosses a block boundary and the next block isn't cached.  If True,
         only return less data than requested when hitting EOF.
        """

        with self.lock:
            if size == 0 or offset >= self.size():
                return b''
            readsegs = locators_and_ranges(self._segments, offset, size)
            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)

        locs = set()
        data = []
        for lr in readsegs:
            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
            if block:
                blockview = memoryview(block)
                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
                locs.add(lr.locator)
            else:
                break

        for lr in prefetch:
            if lr.locator not in locs:
                self.parent._my_block_manager().block_prefetch(lr.locator)
                locs.add(lr.locator)

        return b''.join(data)

    @must_be_writable
    @synchronized
    def writeto(self, offset, data, num_retries):
        """Write `data` to the file starting at `offset`.

        This will update existing bytes and/or extend the size of the file as
        necessary.

        """
        if not isinstance(data, bytes) and not isinstance(data, memoryview):
            data = data.encode()
        if len(data) == 0:
            return

        if offset > self.size():
            self.truncate(offset)

        if len(data) > config.KEEP_BLOCK_SIZE:
            # Chunk it up into smaller writes
            n = 0
            dataview = memoryview(data)
            while n < len(data):
                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
                n += config.KEEP_BLOCK_SIZE
            return

        self.set_committed(False)

        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)

        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
            self._current_bblock.repack_writes()
            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)

        self._current_bblock.append(data)

        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))

        self.parent.notify(WRITE, self.parent, self.name, (self, self))

        return len(data)

    @synchronized
    def flush(self, sync=True, num_retries=0):
        """Flush the current bufferblock to Keep.

        :sync:
          If True, commit block synchronously, wait until buffer block has been written.
          If False, commit block asynchronously, return immediately after putting block into
          the keep put queue.
        """
        if self.committed():
            return

        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
            if self._current_bblock.state() == _BufferBlock.WRITABLE:
                self._current_bblock.repack_writes()
            if self._current_bblock.state() != _BufferBlock.DELETED:
                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)

        if sync:
            to_delete = set()
            for s in self._segments:
                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
                if bb:
                    if bb.state() != _BufferBlock.COMMITTED:
                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
                    to_delete.add(s.locator)
                    s.locator = bb.locator()
            for s in to_delete:
                # Don't delete the bufferblock if it's owned by many files. It'll be
                # deleted after all of its owners are flush()ed.
                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
                    self.parent._my_block_manager().delete_bufferblock(s)

        self.parent.notify(MOD, self.parent, self.name, (self, self))

    @must_be_writable
    @synchronized
    def add_segment(self, blocks, pos, size):
        """Add a segment to the end of the file.

        `pos` and `offset` reference a section of the stream described by
        `blocks` (a list of Range objects)

        """
        self._add_segment(blocks, pos, size)

    def _add_segment(self, blocks, pos, size):
        """Internal implementation of add_segment."""
        self.set_committed(False)
        for lr in locators_and_ranges(blocks, pos, size):
            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
            self._segments.append(r)

    @synchronized
    def size(self):
        """Get the file size."""
        if self._segments:
            n = self._segments[-1]
            return n.range_start + n.range_size
        else:
            return 0

    @synchronized
    def manifest_text(self, stream_name=".", portable_locators=False,
                      normalize=False, only_committed=False):
        buf = ""
        filestream = []
        for segment in self._segments:
            loc = segment.locator
            if self.parent._my_block_manager().is_bufferblock(loc):
                if only_committed:
                    continue
                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
            if portable_locators:
                loc = KeepLocator(loc).stripped()
            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
                                 segment.segment_offset, segment.range_size))
        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
        buf += "\n"
        return buf

    @must_be_writable
    @synchronized
    def _reparent(self, newparent, newname):
        self.set_committed(False)
        self.flush(sync=True)
        self.parent.remove(self.name)
        self.parent = newparent
        self.name = newname
        self.lock = self.parent.root_collection().lock

Instance variables

var fuse_entry
var lock
var name
var parent

Methods

def add_segment(self, blocks, pos, size)

Add a segment to the end of the file.

pos and offset reference a section of the stream described by blocks (a list of Range objects)

def add_writer(self, writer)

Add an ArvadosFileWriter reference to the list of writers

def clone(self, new_parent, new_name)

Make a copy of this file.

def closed(self)

Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.

def committed(self)

Get whether this is committed or not.

def flush(self, sync=True, num_retries=0)

Flush the current bufferblock to Keep.

:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.

def has_remote_blocks(self)

Returns True if any of the segment's locators has a +R signature

def manifest_text(self, stream_name='.', portable_locators=False, normalize=False, only_committed=False)
def permission_expired(self, as_of_dt=None)

Returns True if any of the segment's locators is expired

def readfrom(self, offset, size, num_retries, exact=False)

Read up to size bytes from the file starting at offset.

:exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF.

def remove_writer(self, writer, flush)

Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.

def replace_contents(self, other)

Replace segments of this file with segments from another ArvadosFile object.

def segments(self)
def set_committed(self, value=True)

Set committed flag.

If value is True, set committed to be True.

If value is False, set committed to be False for this and all parents.

def set_segments(self, segs)
def size(self)

Get the file size.

def truncate(self, size)

Shrink or expand the size of the file.

If size is less than the size of the file, the file contents after size will be discarded. If size is greater than the current size of the file, it will be filled with zero bytes.

def writable(self)
def writeto(self, offset, data, num_retries)

Write data to the file starting at offset.

This will update existing bytes and/or extend the size of the file as necessary.

class ArvadosFileReader (arvadosfile, mode='r', num_retries=None)

Wraps ArvadosFile in a file-like object supporting reading only.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

Expand source code
class ArvadosFileReader(ArvadosFileReaderBase):
    """Wraps ArvadosFile in a file-like object supporting reading only.

    Be aware that this class is NOT thread safe as there is no locking around
    updating file pointer.

    """

    def __init__(self, arvadosfile, mode="r", num_retries=None):
        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
        self.arvadosfile = arvadosfile

    def size(self):
        return self.arvadosfile.size()

    def stream_name(self):
        return self.arvadosfile.parent.stream_name()

    def readinto(self, b):
        data = self.read(len(b))
        b[:len(data)] = data
        return len(data)

    @_FileLikeObjectBase._before_close
    @retry_method
    def read(self, size=None, num_retries=None):
        """Read up to `size` bytes from the file and return the result.

        Starts at the current file position.  If `size` is None, read the
        entire remainder of the file.
        """
        if size is None:
            data = []
            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
            while rd:
                data.append(rd)
                self._filepos += len(rd)
                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
            return b''.join(data)
        else:
            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
            self._filepos += len(data)
            return data

    @_FileLikeObjectBase._before_close
    @retry_method
    def readfrom(self, offset, size, num_retries=None):
        """Read up to `size` bytes from the stream, starting at the specified file offset.

        This method does not change the file position.
        """
        return self.arvadosfile.readfrom(offset, size, num_retries)

    def flush(self):
        pass

Ancestors

Subclasses

Methods

def flush(self)
def read(self, size=None, num_retries=None)

Read up to size bytes from the file and return the result.

Starts at the current file position. If size is None, read the entire remainder of the file.

def readfrom(self, offset, size, num_retries=None)

Read up to size bytes from the stream, starting at the specified file offset.

This method does not change the file position.

def readinto(self, b)
def size(self)
def stream_name(self)
class ArvadosFileReaderBase (name, mode, num_retries=None)
Expand source code
class ArvadosFileReaderBase(_FileLikeObjectBase):
    def __init__(self, name, mode, num_retries=None):
        super(ArvadosFileReaderBase, self).__init__(name, mode)
        self._filepos = 0
        self.num_retries = num_retries
        self._readline_cache = (None, None)

    def __iter__(self):
        while True:
            data = self.readline()
            if not data:
                break
            yield data

    def decompressed_name(self):
        return re.sub('\.(bz2|gz)$', '', self.name)

    @_FileLikeObjectBase._before_close
    def seek(self, pos, whence=os.SEEK_SET):
        if whence == os.SEEK_CUR:
            pos += self._filepos
        elif whence == os.SEEK_END:
            pos += self.size()
        if pos < 0:
            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
        self._filepos = pos
        return self._filepos

    def tell(self):
        return self._filepos

    def readable(self):
        return True

    def writable(self):
        return False

    def seekable(self):
        return True

    @_FileLikeObjectBase._before_close
    @retry_method
    def readall(self, size=2**20, num_retries=None):
        while True:
            data = self.read(size, num_retries=num_retries)
            if len(data) == 0:
                break
            yield data

    @_FileLikeObjectBase._before_close
    @retry_method
    def readline(self, size=float('inf'), num_retries=None):
        cache_pos, cache_data = self._readline_cache
        if self.tell() == cache_pos:
            data = [cache_data]
            self._filepos += len(cache_data)
        else:
            data = [b'']
        data_size = len(data[-1])
        while (data_size < size) and (b'\n' not in data[-1]):
            next_read = self.read(2 ** 20, num_retries=num_retries)
            if not next_read:
                break
            data.append(next_read)
            data_size += len(next_read)
        data = b''.join(data)
        try:
            nextline_index = data.index(b'\n') + 1
        except ValueError:
            nextline_index = len(data)
        nextline_index = min(nextline_index, size)
        self._filepos -= len(data) - nextline_index
        self._readline_cache = (self.tell(), data[nextline_index:])
        return data[:nextline_index].decode()

    @_FileLikeObjectBase._before_close
    @retry_method
    def decompress(self, decompress, size, num_retries=None):
        for segment in self.readall(size, num_retries=num_retries):
            data = decompress(segment)
            if data:
                yield data

    @_FileLikeObjectBase._before_close
    @retry_method
    def readall_decompressed(self, size=2**20, num_retries=None):
        self.seek(0)
        if self.name.endswith('.bz2'):
            dc = bz2.BZ2Decompressor()
            return self.decompress(dc.decompress, size,
                                   num_retries=num_retries)
        elif self.name.endswith('.gz'):
            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
                                   size, num_retries=num_retries)
        else:
            return self.readall(size, num_retries=num_retries)

    @_FileLikeObjectBase._before_close
    @retry_method
    def readlines(self, sizehint=float('inf'), num_retries=None):
        data = []
        data_size = 0
        for s in self.readall(num_retries=num_retries):
            data.append(s)
            data_size += len(s)
            if data_size >= sizehint:
                break
        return b''.join(data).decode().splitlines(True)

    def size(self):
        raise IOError(errno.ENOSYS, "Not implemented")

    def read(self, size, num_retries=None):
        raise IOError(errno.ENOSYS, "Not implemented")

    def readfrom(self, start, size, num_retries=None):
        raise IOError(errno.ENOSYS, "Not implemented")

Ancestors

  • arvados.arvfile._FileLikeObjectBase

Subclasses

Methods

def decompress(self, decompress, size, num_retries=None)
def decompressed_name(self)
def read(self, size, num_retries=None)
def readable(self)
def readall(self, size=1048576, num_retries=None)
def readall_decompressed(self, size=1048576, num_retries=None)
def readfrom(self, start, size, num_retries=None)
def readline(self, size=inf, num_retries=None)
def readlines(self, sizehint=inf, num_retries=None)
def seek(self, pos, whence=0)
def seekable(self)
def size(self)
def tell(self)
def writable(self)
class ArvadosFileWriter (arvadosfile, mode, num_retries=None)

Wraps ArvadosFile in a file-like object supporting both reading and writing.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

Expand source code
class ArvadosFileWriter(ArvadosFileReader):
    """Wraps ArvadosFile in a file-like object supporting both reading and writing.

    Be aware that this class is NOT thread safe as there is no locking around
    updating file pointer.

    """

    def __init__(self, arvadosfile, mode, num_retries=None):
        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
        self.arvadosfile.add_writer(self)

    def writable(self):
        return True

    @_FileLikeObjectBase._before_close
    @retry_method
    def write(self, data, num_retries=None):
        if self.mode[0] == "a":
            self._filepos = self.size()
        self.arvadosfile.writeto(self._filepos, data, num_retries)
        self._filepos += len(data)
        return len(data)

    @_FileLikeObjectBase._before_close
    @retry_method
    def writelines(self, seq, num_retries=None):
        for s in seq:
            self.write(s, num_retries=num_retries)

    @_FileLikeObjectBase._before_close
    def truncate(self, size=None):
        if size is None:
            size = self._filepos
        self.arvadosfile.truncate(size)

    @_FileLikeObjectBase._before_close
    def flush(self):
        self.arvadosfile.flush()

    def close(self, flush=True):
        if not self.closed:
            self.arvadosfile.remove_writer(self, flush)
            super(ArvadosFileWriter, self).close()

Ancestors

Methods

def close(self, flush=True)
def flush(self)
def truncate(self, size=None)
def writable(self)
def write(self, data, num_retries=None)
def writelines(self, seq, num_retries=None)

Inherited members

class NoopLock
Expand source code
class NoopLock(object):
    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass

    def acquire(self, blocking=False):
        pass

    def release(self):
        pass

Methods

def acquire(self, blocking=False)
def release(self)
class StateChangeError (message, state, nextstate)

Common base class for all non-exit exceptions.

Expand source code
class StateChangeError(Exception):
    def __init__(self, message, state, nextstate):
        super(StateChangeError, self).__init__(message)
        self.state = state
        self.nextstate = nextstate

Ancestors

  • builtins.Exception
  • builtins.BaseException
class StreamFileReader (stream, segments, name)
Expand source code
class StreamFileReader(ArvadosFileReaderBase):
    class _NameAttribute(str):
        # The Python file API provides a plain .name attribute.
        # Older SDK provided a name() method.
        # This class provides both, for maximum compatibility.
        def __call__(self):
            return self

    def __init__(self, stream, segments, name):
        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
        self._stream = stream
        self.segments = segments

    def stream_name(self):
        return self._stream.name()

    def size(self):
        n = self.segments[-1]
        return n.range_start + n.range_size

    @_FileLikeObjectBase._before_close
    @retry_method
    def read(self, size, num_retries=None):
        """Read up to 'size' bytes from the stream, starting at the current file position"""
        if size == 0:
            return b''

        data = b''
        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
        if available_chunks:
            lr = available_chunks[0]
            data = self._stream.readfrom(lr.locator+lr.segment_offset,
                                         lr.segment_size,
                                         num_retries=num_retries)

        self._filepos += len(data)
        return data

    @_FileLikeObjectBase._before_close
    @retry_method
    def readfrom(self, start, size, num_retries=None):
        """Read up to 'size' bytes from the stream, starting at 'start'"""
        if size == 0:
            return b''

        data = []
        for lr in locators_and_ranges(self.segments, start, size):
            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
                                              num_retries=num_retries))
        return b''.join(data)

    def as_manifest(self):
        segs = []
        for r in self.segments:
            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"

Ancestors

Methods

def as_manifest(self)
def read(self, size, num_retries=None)

Read up to 'size' bytes from the stream, starting at the current file position

def readfrom(self, start, size, num_retries=None)

Read up to 'size' bytes from the stream, starting at 'start'

def size(self)
def stream_name(self)
class UnownedBlockError (*args, **kwargs)

Raised when there's an writable block without an owner on the BlockManager.

Expand source code
class UnownedBlockError(Exception):
    """Raised when there's an writable block without an owner on the BlockManager."""
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class WrappableFile (f)

An interface to an Arvados file that's compatible with io wrappers.

Expand source code
class WrappableFile(object):
    """An interface to an Arvados file that's compatible with io wrappers.

    """
    def __init__(self, f):
        self.f = f
        self.closed = False
    def close(self):
        self.closed = True
        return self.f.close()
    def flush(self):
        return self.f.flush()
    def read(self, *args, **kwargs):
        return self.f.read(*args, **kwargs)
    def readable(self):
        return self.f.readable()
    def readinto(self, *args, **kwargs):
        return self.f.readinto(*args, **kwargs)
    def seek(self, *args, **kwargs):
        return self.f.seek(*args, **kwargs)
    def seekable(self):
        return self.f.seekable()
    def tell(self):
        return self.f.tell()
    def writable(self):
        return self.f.writable()
    def write(self, *args, **kwargs):
        return self.f.write(*args, **kwargs)

Methods

def close(self)
def flush(self)
def read(self, *args, **kwargs)
def readable(self)
def readinto(self, *args, **kwargs)
def seek(self, *args, **kwargs)
def seekable(self)
def tell(self)
def writable(self)
def write(self, *args, **kwargs)