Module arvados.arvfile
Functions
def must_be_writable(orig_func)-
Expand source code
def must_be_writable(orig_func): @functools.wraps(orig_func) def must_be_writable_wrapper(self, *args, **kwargs): if not self.writable(): raise IOError(errno.EROFS, "Collection is read-only.") return orig_func(self, *args, **kwargs) return must_be_writable_wrapper def split(path)-
Expand source code
def split(path): """split(path) -> streamname, filename Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume '.'. """ try: stream_name, file_name = path.rsplit('/', 1) except ValueError: # No / in string stream_name, file_name = '.', path return stream_name, file_namesplit(path) -> streamname, filename
Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume '.'.
def synchronized(orig_func)-
Expand source code
def synchronized(orig_func): @functools.wraps(orig_func) def synchronized_wrapper(self, *args, **kwargs): with self.lock: return orig_func(self, *args, **kwargs) return synchronized_wrapper
Classes
class ArvadosFile (parent, name, stream=[], segments=[])-
Expand source code
class ArvadosFile(object): """Represent a file in a Collection. ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. This object may be accessed from multiple threads. """ __slots__ = ('parent', 'name', '_writers', '_committed', '_segments', 'lock', '_current_bblock', 'fuse_entry') def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. :stream: a list of Range objects representing a block stream :segments: a list of Range objects representing segments """ self.parent = parent self.name = name self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None def writable(self): return self.parent.writable() @synchronized def permission_expired(self, as_of_dt=None): """Returns True if any of the segment's locators is expired""" for r in self._segments: if KeepLocator(r.locator).permission_expired(as_of_dt): return True return False @synchronized def has_remote_blocks(self): """Returns True if any of the segment's locators has a +R signature""" for s in self._segments: if '+R' in s.locator: return True return False @synchronized def _copy_remote_blocks(self, remote_blocks={}): """Ask Keep to copy remote blocks and point to their local copies. This is called from the parent Collection. :remote_blocks: Shared cache of remote to local block mappings. This is used to avoid doing extra work when blocks are shared by more than one file in different subdirectories. """ for s in self._segments: if '+R' in s.locator: try: loc = remote_blocks[s.locator] except KeyError: loc = self.parent._my_keep().refresh_signature(s.locator) remote_blocks[s.locator] = loc s.locator = loc self.parent.set_committed(False) return remote_blocks @synchronized def segments(self): return copy.copy(self._segments) @synchronized def clone(self, new_parent, new_name): """Make a copy of this file.""" cp = ArvadosFile(new_parent, new_name) cp.replace_contents(self) return cp @must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} self._segments = [] for other_segment in other.segments(): new_loc = other_segment.locator if other.parent._my_block_manager().is_bufferblock(other_segment.locator): if other_segment.locator not in map_loc: bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) if bufferblock.state() != _BufferBlock.WRITABLE: map_loc[other_segment.locator] = bufferblock.locator() else: map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid new_loc = map_loc[other_segment.locator] self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) self.set_committed(False) def __eq__(self, other): if other is self: return True if not isinstance(other, ArvadosFile): return False othersegs = other.segments() with self.lock: if len(self._segments) != len(othersegs): return False for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator loc2 = seg2.locator if self.parent._my_block_manager().is_bufferblock(loc1): loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() if other.parent._my_block_manager().is_bufferblock(loc2): loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or seg1.range_start != seg2.range_start or seg1.range_size != seg2.range_size or seg1.segment_offset != seg2.segment_offset): return False return True def __ne__(self, other): return not self.__eq__(other) @synchronized def set_segments(self, segs): self._segments = segs @synchronized def set_committed(self, value=True): """Set committed flag. If value is True, set committed to be True. If value is False, set committed to be False for this and all parents. """ if value == self._committed: return self._committed = value if self._committed is False and self.parent is not None: self.parent.set_committed(False) @synchronized def committed(self): """Get whether this is committed or not.""" return self._committed @synchronized def add_writer(self, writer): """Add an ArvadosFileWriter reference to the list of writers""" if isinstance(writer, ArvadosFileWriter): self._writers.add(writer) @synchronized def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed. """ return len(self._writers) == 0 @must_be_writable @synchronized def truncate(self, size): """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size of the file, it will be filled with zero bytes. """ if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done break elif size < range_end: nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break else: new_segs.append(r) self._segments = new_segs self.set_committed(False) elif size > self.size(): padding = self.parent._my_block_manager().get_padding_block() diff = size - self.size() while diff > config.KEEP_BLOCK_SIZE: self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) diff -= config.KEEP_BLOCK_SIZE if diff > 0: self._segments.append(Range(padding.blockid, self.size(), diff, 0)) self.set_committed(False) else: # size == self.size() pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. :exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF. """ with self.lock: if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) locs.add(lr.locator) else: break for lr in prefetch: if lr.locator not in locs: self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) return b''.join(data) @must_be_writable @synchronized def writeto(self, offset, data, num_retries): """Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. """ if not isinstance(data, bytes) and not isinstance(data, memoryview): data = data.encode() if len(data) == 0: return if offset > self.size(): self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes n = 0 dataview = memoryview(data) while n < len(data): self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) n += config.KEEP_BLOCK_SIZE return self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) self.parent.notify(WRITE, self.parent, self.name, (self, self)) return len(data) @synchronized def flush(self, sync=True, num_retries=0): """Flush the current bufferblock to Keep. :sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue. """ if self.committed(): return if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() for s in self._segments: bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: # Don't delete the bufferblock if it's owned by many files. It'll be # deleted after all of its owners are flush()ed. if self.parent._my_block_manager().get_bufferblock(s).owner is self: self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @synchronized def add_segment(self, blocks, pos, size): """Add a segment to the end of the file. `pos` and `offset` reference a section of the stream described by `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) @synchronized def size(self): """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0 @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False, only_committed=False): buf = "" filestream = [] for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @must_be_writable @synchronized def _reparent(self, newparent, newname): self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent self.name = newname self.lock = self.parent.root_collection().lockRepresent a file in a Collection.
ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.
This object may be accessed from multiple threads.
ArvadosFile constructor.
:stream: a list of Range objects representing a block stream
:segments: a list of Range objects representing segments
Instance variables
var fuse_entry-
Expand source code
class ArvadosFile(object): """Represent a file in a Collection. ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. This object may be accessed from multiple threads. """ __slots__ = ('parent', 'name', '_writers', '_committed', '_segments', 'lock', '_current_bblock', 'fuse_entry') def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. :stream: a list of Range objects representing a block stream :segments: a list of Range objects representing segments """ self.parent = parent self.name = name self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None def writable(self): return self.parent.writable() @synchronized def permission_expired(self, as_of_dt=None): """Returns True if any of the segment's locators is expired""" for r in self._segments: if KeepLocator(r.locator).permission_expired(as_of_dt): return True return False @synchronized def has_remote_blocks(self): """Returns True if any of the segment's locators has a +R signature""" for s in self._segments: if '+R' in s.locator: return True return False @synchronized def _copy_remote_blocks(self, remote_blocks={}): """Ask Keep to copy remote blocks and point to their local copies. This is called from the parent Collection. :remote_blocks: Shared cache of remote to local block mappings. This is used to avoid doing extra work when blocks are shared by more than one file in different subdirectories. """ for s in self._segments: if '+R' in s.locator: try: loc = remote_blocks[s.locator] except KeyError: loc = self.parent._my_keep().refresh_signature(s.locator) remote_blocks[s.locator] = loc s.locator = loc self.parent.set_committed(False) return remote_blocks @synchronized def segments(self): return copy.copy(self._segments) @synchronized def clone(self, new_parent, new_name): """Make a copy of this file.""" cp = ArvadosFile(new_parent, new_name) cp.replace_contents(self) return cp @must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} self._segments = [] for other_segment in other.segments(): new_loc = other_segment.locator if other.parent._my_block_manager().is_bufferblock(other_segment.locator): if other_segment.locator not in map_loc: bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) if bufferblock.state() != _BufferBlock.WRITABLE: map_loc[other_segment.locator] = bufferblock.locator() else: map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid new_loc = map_loc[other_segment.locator] self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) self.set_committed(False) def __eq__(self, other): if other is self: return True if not isinstance(other, ArvadosFile): return False othersegs = other.segments() with self.lock: if len(self._segments) != len(othersegs): return False for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator loc2 = seg2.locator if self.parent._my_block_manager().is_bufferblock(loc1): loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() if other.parent._my_block_manager().is_bufferblock(loc2): loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or seg1.range_start != seg2.range_start or seg1.range_size != seg2.range_size or seg1.segment_offset != seg2.segment_offset): return False return True def __ne__(self, other): return not self.__eq__(other) @synchronized def set_segments(self, segs): self._segments = segs @synchronized def set_committed(self, value=True): """Set committed flag. If value is True, set committed to be True. If value is False, set committed to be False for this and all parents. """ if value == self._committed: return self._committed = value if self._committed is False and self.parent is not None: self.parent.set_committed(False) @synchronized def committed(self): """Get whether this is committed or not.""" return self._committed @synchronized def add_writer(self, writer): """Add an ArvadosFileWriter reference to the list of writers""" if isinstance(writer, ArvadosFileWriter): self._writers.add(writer) @synchronized def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed. """ return len(self._writers) == 0 @must_be_writable @synchronized def truncate(self, size): """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size of the file, it will be filled with zero bytes. """ if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done break elif size < range_end: nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break else: new_segs.append(r) self._segments = new_segs self.set_committed(False) elif size > self.size(): padding = self.parent._my_block_manager().get_padding_block() diff = size - self.size() while diff > config.KEEP_BLOCK_SIZE: self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) diff -= config.KEEP_BLOCK_SIZE if diff > 0: self._segments.append(Range(padding.blockid, self.size(), diff, 0)) self.set_committed(False) else: # size == self.size() pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. :exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF. """ with self.lock: if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) locs.add(lr.locator) else: break for lr in prefetch: if lr.locator not in locs: self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) return b''.join(data) @must_be_writable @synchronized def writeto(self, offset, data, num_retries): """Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. """ if not isinstance(data, bytes) and not isinstance(data, memoryview): data = data.encode() if len(data) == 0: return if offset > self.size(): self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes n = 0 dataview = memoryview(data) while n < len(data): self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) n += config.KEEP_BLOCK_SIZE return self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) self.parent.notify(WRITE, self.parent, self.name, (self, self)) return len(data) @synchronized def flush(self, sync=True, num_retries=0): """Flush the current bufferblock to Keep. :sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue. """ if self.committed(): return if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() for s in self._segments: bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: # Don't delete the bufferblock if it's owned by many files. It'll be # deleted after all of its owners are flush()ed. if self.parent._my_block_manager().get_bufferblock(s).owner is self: self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @synchronized def add_segment(self, blocks, pos, size): """Add a segment to the end of the file. `pos` and `offset` reference a section of the stream described by `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) @synchronized def size(self): """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0 @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False, only_committed=False): buf = "" filestream = [] for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @must_be_writable @synchronized def _reparent(self, newparent, newname): self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent self.name = newname self.lock = self.parent.root_collection().lock var lock-
Expand source code
class ArvadosFile(object): """Represent a file in a Collection. ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. This object may be accessed from multiple threads. """ __slots__ = ('parent', 'name', '_writers', '_committed', '_segments', 'lock', '_current_bblock', 'fuse_entry') def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. :stream: a list of Range objects representing a block stream :segments: a list of Range objects representing segments """ self.parent = parent self.name = name self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None def writable(self): return self.parent.writable() @synchronized def permission_expired(self, as_of_dt=None): """Returns True if any of the segment's locators is expired""" for r in self._segments: if KeepLocator(r.locator).permission_expired(as_of_dt): return True return False @synchronized def has_remote_blocks(self): """Returns True if any of the segment's locators has a +R signature""" for s in self._segments: if '+R' in s.locator: return True return False @synchronized def _copy_remote_blocks(self, remote_blocks={}): """Ask Keep to copy remote blocks and point to their local copies. This is called from the parent Collection. :remote_blocks: Shared cache of remote to local block mappings. This is used to avoid doing extra work when blocks are shared by more than one file in different subdirectories. """ for s in self._segments: if '+R' in s.locator: try: loc = remote_blocks[s.locator] except KeyError: loc = self.parent._my_keep().refresh_signature(s.locator) remote_blocks[s.locator] = loc s.locator = loc self.parent.set_committed(False) return remote_blocks @synchronized def segments(self): return copy.copy(self._segments) @synchronized def clone(self, new_parent, new_name): """Make a copy of this file.""" cp = ArvadosFile(new_parent, new_name) cp.replace_contents(self) return cp @must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} self._segments = [] for other_segment in other.segments(): new_loc = other_segment.locator if other.parent._my_block_manager().is_bufferblock(other_segment.locator): if other_segment.locator not in map_loc: bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) if bufferblock.state() != _BufferBlock.WRITABLE: map_loc[other_segment.locator] = bufferblock.locator() else: map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid new_loc = map_loc[other_segment.locator] self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) self.set_committed(False) def __eq__(self, other): if other is self: return True if not isinstance(other, ArvadosFile): return False othersegs = other.segments() with self.lock: if len(self._segments) != len(othersegs): return False for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator loc2 = seg2.locator if self.parent._my_block_manager().is_bufferblock(loc1): loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() if other.parent._my_block_manager().is_bufferblock(loc2): loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or seg1.range_start != seg2.range_start or seg1.range_size != seg2.range_size or seg1.segment_offset != seg2.segment_offset): return False return True def __ne__(self, other): return not self.__eq__(other) @synchronized def set_segments(self, segs): self._segments = segs @synchronized def set_committed(self, value=True): """Set committed flag. If value is True, set committed to be True. If value is False, set committed to be False for this and all parents. """ if value == self._committed: return self._committed = value if self._committed is False and self.parent is not None: self.parent.set_committed(False) @synchronized def committed(self): """Get whether this is committed or not.""" return self._committed @synchronized def add_writer(self, writer): """Add an ArvadosFileWriter reference to the list of writers""" if isinstance(writer, ArvadosFileWriter): self._writers.add(writer) @synchronized def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed. """ return len(self._writers) == 0 @must_be_writable @synchronized def truncate(self, size): """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size of the file, it will be filled with zero bytes. """ if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done break elif size < range_end: nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break else: new_segs.append(r) self._segments = new_segs self.set_committed(False) elif size > self.size(): padding = self.parent._my_block_manager().get_padding_block() diff = size - self.size() while diff > config.KEEP_BLOCK_SIZE: self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) diff -= config.KEEP_BLOCK_SIZE if diff > 0: self._segments.append(Range(padding.blockid, self.size(), diff, 0)) self.set_committed(False) else: # size == self.size() pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. :exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF. """ with self.lock: if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) locs.add(lr.locator) else: break for lr in prefetch: if lr.locator not in locs: self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) return b''.join(data) @must_be_writable @synchronized def writeto(self, offset, data, num_retries): """Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. """ if not isinstance(data, bytes) and not isinstance(data, memoryview): data = data.encode() if len(data) == 0: return if offset > self.size(): self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes n = 0 dataview = memoryview(data) while n < len(data): self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) n += config.KEEP_BLOCK_SIZE return self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) self.parent.notify(WRITE, self.parent, self.name, (self, self)) return len(data) @synchronized def flush(self, sync=True, num_retries=0): """Flush the current bufferblock to Keep. :sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue. """ if self.committed(): return if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() for s in self._segments: bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: # Don't delete the bufferblock if it's owned by many files. It'll be # deleted after all of its owners are flush()ed. if self.parent._my_block_manager().get_bufferblock(s).owner is self: self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @synchronized def add_segment(self, blocks, pos, size): """Add a segment to the end of the file. `pos` and `offset` reference a section of the stream described by `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) @synchronized def size(self): """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0 @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False, only_committed=False): buf = "" filestream = [] for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @must_be_writable @synchronized def _reparent(self, newparent, newname): self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent self.name = newname self.lock = self.parent.root_collection().lock var name-
Expand source code
class ArvadosFile(object): """Represent a file in a Collection. ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. This object may be accessed from multiple threads. """ __slots__ = ('parent', 'name', '_writers', '_committed', '_segments', 'lock', '_current_bblock', 'fuse_entry') def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. :stream: a list of Range objects representing a block stream :segments: a list of Range objects representing segments """ self.parent = parent self.name = name self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None def writable(self): return self.parent.writable() @synchronized def permission_expired(self, as_of_dt=None): """Returns True if any of the segment's locators is expired""" for r in self._segments: if KeepLocator(r.locator).permission_expired(as_of_dt): return True return False @synchronized def has_remote_blocks(self): """Returns True if any of the segment's locators has a +R signature""" for s in self._segments: if '+R' in s.locator: return True return False @synchronized def _copy_remote_blocks(self, remote_blocks={}): """Ask Keep to copy remote blocks and point to their local copies. This is called from the parent Collection. :remote_blocks: Shared cache of remote to local block mappings. This is used to avoid doing extra work when blocks are shared by more than one file in different subdirectories. """ for s in self._segments: if '+R' in s.locator: try: loc = remote_blocks[s.locator] except KeyError: loc = self.parent._my_keep().refresh_signature(s.locator) remote_blocks[s.locator] = loc s.locator = loc self.parent.set_committed(False) return remote_blocks @synchronized def segments(self): return copy.copy(self._segments) @synchronized def clone(self, new_parent, new_name): """Make a copy of this file.""" cp = ArvadosFile(new_parent, new_name) cp.replace_contents(self) return cp @must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} self._segments = [] for other_segment in other.segments(): new_loc = other_segment.locator if other.parent._my_block_manager().is_bufferblock(other_segment.locator): if other_segment.locator not in map_loc: bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) if bufferblock.state() != _BufferBlock.WRITABLE: map_loc[other_segment.locator] = bufferblock.locator() else: map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid new_loc = map_loc[other_segment.locator] self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) self.set_committed(False) def __eq__(self, other): if other is self: return True if not isinstance(other, ArvadosFile): return False othersegs = other.segments() with self.lock: if len(self._segments) != len(othersegs): return False for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator loc2 = seg2.locator if self.parent._my_block_manager().is_bufferblock(loc1): loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() if other.parent._my_block_manager().is_bufferblock(loc2): loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or seg1.range_start != seg2.range_start or seg1.range_size != seg2.range_size or seg1.segment_offset != seg2.segment_offset): return False return True def __ne__(self, other): return not self.__eq__(other) @synchronized def set_segments(self, segs): self._segments = segs @synchronized def set_committed(self, value=True): """Set committed flag. If value is True, set committed to be True. If value is False, set committed to be False for this and all parents. """ if value == self._committed: return self._committed = value if self._committed is False and self.parent is not None: self.parent.set_committed(False) @synchronized def committed(self): """Get whether this is committed or not.""" return self._committed @synchronized def add_writer(self, writer): """Add an ArvadosFileWriter reference to the list of writers""" if isinstance(writer, ArvadosFileWriter): self._writers.add(writer) @synchronized def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed. """ return len(self._writers) == 0 @must_be_writable @synchronized def truncate(self, size): """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size of the file, it will be filled with zero bytes. """ if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done break elif size < range_end: nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break else: new_segs.append(r) self._segments = new_segs self.set_committed(False) elif size > self.size(): padding = self.parent._my_block_manager().get_padding_block() diff = size - self.size() while diff > config.KEEP_BLOCK_SIZE: self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) diff -= config.KEEP_BLOCK_SIZE if diff > 0: self._segments.append(Range(padding.blockid, self.size(), diff, 0)) self.set_committed(False) else: # size == self.size() pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. :exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF. """ with self.lock: if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) locs.add(lr.locator) else: break for lr in prefetch: if lr.locator not in locs: self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) return b''.join(data) @must_be_writable @synchronized def writeto(self, offset, data, num_retries): """Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. """ if not isinstance(data, bytes) and not isinstance(data, memoryview): data = data.encode() if len(data) == 0: return if offset > self.size(): self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes n = 0 dataview = memoryview(data) while n < len(data): self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) n += config.KEEP_BLOCK_SIZE return self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) self.parent.notify(WRITE, self.parent, self.name, (self, self)) return len(data) @synchronized def flush(self, sync=True, num_retries=0): """Flush the current bufferblock to Keep. :sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue. """ if self.committed(): return if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() for s in self._segments: bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: # Don't delete the bufferblock if it's owned by many files. It'll be # deleted after all of its owners are flush()ed. if self.parent._my_block_manager().get_bufferblock(s).owner is self: self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @synchronized def add_segment(self, blocks, pos, size): """Add a segment to the end of the file. `pos` and `offset` reference a section of the stream described by `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) @synchronized def size(self): """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0 @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False, only_committed=False): buf = "" filestream = [] for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @must_be_writable @synchronized def _reparent(self, newparent, newname): self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent self.name = newname self.lock = self.parent.root_collection().lock var parent-
Expand source code
class ArvadosFile(object): """Represent a file in a Collection. ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access. This object may be accessed from multiple threads. """ __slots__ = ('parent', 'name', '_writers', '_committed', '_segments', 'lock', '_current_bblock', 'fuse_entry') def __init__(self, parent, name, stream=[], segments=[]): """ ArvadosFile constructor. :stream: a list of Range objects representing a block stream :segments: a list of Range objects representing segments """ self.parent = parent self.name = name self._writers = set() self._committed = False self._segments = [] self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None def writable(self): return self.parent.writable() @synchronized def permission_expired(self, as_of_dt=None): """Returns True if any of the segment's locators is expired""" for r in self._segments: if KeepLocator(r.locator).permission_expired(as_of_dt): return True return False @synchronized def has_remote_blocks(self): """Returns True if any of the segment's locators has a +R signature""" for s in self._segments: if '+R' in s.locator: return True return False @synchronized def _copy_remote_blocks(self, remote_blocks={}): """Ask Keep to copy remote blocks and point to their local copies. This is called from the parent Collection. :remote_blocks: Shared cache of remote to local block mappings. This is used to avoid doing extra work when blocks are shared by more than one file in different subdirectories. """ for s in self._segments: if '+R' in s.locator: try: loc = remote_blocks[s.locator] except KeyError: loc = self.parent._my_keep().refresh_signature(s.locator) remote_blocks[s.locator] = loc s.locator = loc self.parent.set_committed(False) return remote_blocks @synchronized def segments(self): return copy.copy(self._segments) @synchronized def clone(self, new_parent, new_name): """Make a copy of this file.""" cp = ArvadosFile(new_parent, new_name) cp.replace_contents(self) return cp @must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} self._segments = [] for other_segment in other.segments(): new_loc = other_segment.locator if other.parent._my_block_manager().is_bufferblock(other_segment.locator): if other_segment.locator not in map_loc: bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) if bufferblock.state() != _BufferBlock.WRITABLE: map_loc[other_segment.locator] = bufferblock.locator() else: map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid new_loc = map_loc[other_segment.locator] self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) self.set_committed(False) def __eq__(self, other): if other is self: return True if not isinstance(other, ArvadosFile): return False othersegs = other.segments() with self.lock: if len(self._segments) != len(othersegs): return False for i in range(0, len(othersegs)): seg1 = self._segments[i] seg2 = othersegs[i] loc1 = seg1.locator loc2 = seg2.locator if self.parent._my_block_manager().is_bufferblock(loc1): loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() if other.parent._my_block_manager().is_bufferblock(loc2): loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or seg1.range_start != seg2.range_start or seg1.range_size != seg2.range_size or seg1.segment_offset != seg2.segment_offset): return False return True def __ne__(self, other): return not self.__eq__(other) @synchronized def set_segments(self, segs): self._segments = segs @synchronized def set_committed(self, value=True): """Set committed flag. If value is True, set committed to be True. If value is False, set committed to be False for this and all parents. """ if value == self._committed: return self._committed = value if self._committed is False and self.parent is not None: self.parent.set_committed(False) @synchronized def committed(self): """Get whether this is committed or not.""" return self._committed @synchronized def add_writer(self, writer): """Add an ArvadosFileWriter reference to the list of writers""" if isinstance(writer, ArvadosFileWriter): self._writers.add(writer) @synchronized def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) def closed(self): """ Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed. """ return len(self._writers) == 0 @must_be_writable @synchronized def truncate(self, size): """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size of the file, it will be filled with zero bytes. """ if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done break elif size < range_end: nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break else: new_segs.append(r) self._segments = new_segs self.set_committed(False) elif size > self.size(): padding = self.parent._my_block_manager().get_padding_block() diff = size - self.size() while diff > config.KEEP_BLOCK_SIZE: self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) diff -= config.KEEP_BLOCK_SIZE if diff > 0: self._segments.append(Range(padding.blockid, self.size(), diff, 0)) self.set_committed(False) else: # size == self.size() pass def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. :exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF. """ with self.lock: if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) locs.add(lr.locator) else: break for lr in prefetch: if lr.locator not in locs: self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) return b''.join(data) @must_be_writable @synchronized def writeto(self, offset, data, num_retries): """Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. """ if not isinstance(data, bytes) and not isinstance(data, memoryview): data = data.encode() if len(data) == 0: return if offset > self.size(): self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes n = 0 dataview = memoryview(data) while n < len(data): self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) n += config.KEEP_BLOCK_SIZE return self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) self.parent.notify(WRITE, self.parent, self.name, (self, self)) return len(data) @synchronized def flush(self, sync=True, num_retries=0): """Flush the current bufferblock to Keep. :sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue. """ if self.committed(): return if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() for s in self._segments: bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: # Don't delete the bufferblock if it's owned by many files. It'll be # deleted after all of its owners are flush()ed. if self.parent._my_block_manager().get_bufferblock(s).owner is self: self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @synchronized def add_segment(self, blocks, pos, size): """Add a segment to the end of the file. `pos` and `offset` reference a section of the stream described by `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size) def _add_segment(self, blocks, pos, size): """Internal implementation of add_segment.""" self.set_committed(False) for lr in locators_and_ranges(blocks, pos, size): last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) @synchronized def size(self): """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0 @synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False, only_committed=False): buf = "" filestream = [] for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf @must_be_writable @synchronized def _reparent(self, newparent, newname): self.set_committed(False) self.flush(sync=True) self.parent.remove(self.name) self.parent = newparent self.name = newname self.lock = self.parent.root_collection().lock
Methods
def add_segment(self, blocks, pos, size)-
Expand source code
@must_be_writable @synchronized def add_segment(self, blocks, pos, size): """Add a segment to the end of the file. `pos` and `offset` reference a section of the stream described by `blocks` (a list of Range objects) """ self._add_segment(blocks, pos, size)Add a segment to the end of the file.
posandoffsetreference a section of the stream described byblocks(a list of Range objects) def add_writer(self, writer)-
Expand source code
@synchronized def add_writer(self, writer): """Add an ArvadosFileWriter reference to the list of writers""" if isinstance(writer, ArvadosFileWriter): self._writers.add(writer)Add an ArvadosFileWriter reference to the list of writers
def clone(self, new_parent, new_name)-
Expand source code
@synchronized def clone(self, new_parent, new_name): """Make a copy of this file.""" cp = ArvadosFile(new_parent, new_name) cp.replace_contents(self) return cpMake a copy of this file.
def closed(self)-
Expand source code
def closed(self): """ Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed. """ return len(self._writers) == 0Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.
def committed(self)-
Expand source code
@synchronized def committed(self): """Get whether this is committed or not.""" return self._committedGet whether this is committed or not.
def flush(self, sync=True, num_retries=0)-
Expand source code
@synchronized def flush(self, sync=True, num_retries=0): """Flush the current bufferblock to Keep. :sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue. """ if self.committed(): return if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: if self._current_bblock.state() == _BufferBlock.WRITABLE: self._current_bblock.repack_writes() if self._current_bblock.state() != _BufferBlock.DELETED: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) if sync: to_delete = set() for s in self._segments: bb = self.parent._my_block_manager().get_bufferblock(s.locator) if bb: if bb.state() != _BufferBlock.COMMITTED: self.parent._my_block_manager().commit_bufferblock(bb, sync=True) to_delete.add(s.locator) s.locator = bb.locator() for s in to_delete: # Don't delete the bufferblock if it's owned by many files. It'll be # deleted after all of its owners are flush()ed. if self.parent._my_block_manager().get_bufferblock(s).owner is self: self.parent._my_block_manager().delete_bufferblock(s) self.parent.notify(MOD, self.parent, self.name, (self, self))Flush the current bufferblock to Keep.
:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.
def has_remote_blocks(self)-
Expand source code
@synchronized def has_remote_blocks(self): """Returns True if any of the segment's locators has a +R signature""" for s in self._segments: if '+R' in s.locator: return True return FalseReturns True if any of the segment's locators has a +R signature
def manifest_text(self,
stream_name='.',
portable_locators=False,
normalize=False,
only_committed=False)-
Expand source code
@synchronized def manifest_text(self, stream_name=".", portable_locators=False, normalize=False, only_committed=False): buf = "" filestream = [] for segment in self._segments: loc = segment.locator if self.parent._my_block_manager().is_bufferblock(loc): if only_committed: continue loc = self.parent._my_block_manager().get_bufferblock(loc).locator() if portable_locators: loc = KeepLocator(loc).stripped() filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, segment.segment_offset, segment.range_size)) buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) buf += "\n" return buf def permission_expired(self, as_of_dt=None)-
Expand source code
@synchronized def permission_expired(self, as_of_dt=None): """Returns True if any of the segment's locators is expired""" for r in self._segments: if KeepLocator(r.locator).permission_expired(as_of_dt): return True return FalseReturns True if any of the segment's locators is expired
def readfrom(self, offset, size, num_retries, exact=False)-
Expand source code
def readfrom(self, offset, size, num_retries, exact=False): """Read up to `size` bytes from the file starting at `offset`. :exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF. """ with self.lock: if size == 0 or offset >= self.size(): return b'' readsegs = locators_and_ranges(self._segments, offset, size) prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) locs = set() data = [] for lr in readsegs: block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) if block: blockview = memoryview(block) data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) locs.add(lr.locator) else: break for lr in prefetch: if lr.locator not in locs: self.parent._my_block_manager().block_prefetch(lr.locator) locs.add(lr.locator) return b''.join(data)Read up to
sizebytes from the file starting atoffset.:exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn't cached. If True, only return less data than requested when hitting EOF.
def remove_writer(self, writer, flush)-
Expand source code
@synchronized def remove_writer(self, writer, flush): """ Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks. """ self._writers.remove(writer) if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: # File writer closed, not small enough for repacking self.flush() elif self.closed(): # All writers closed and size is adequate for repacking self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.
def replace_contents(self, other)-
Expand source code
@must_be_writable @synchronized def replace_contents(self, other): """Replace segments of this file with segments from another `ArvadosFile` object.""" map_loc = {} self._segments = [] for other_segment in other.segments(): new_loc = other_segment.locator if other.parent._my_block_manager().is_bufferblock(other_segment.locator): if other_segment.locator not in map_loc: bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) if bufferblock.state() != _BufferBlock.WRITABLE: map_loc[other_segment.locator] = bufferblock.locator() else: map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid new_loc = map_loc[other_segment.locator] self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) self.set_committed(False)Replace segments of this file with segments from another
ArvadosFileobject. def segments(self)-
Expand source code
@synchronized def segments(self): return copy.copy(self._segments) def set_committed(self, value=True)-
Expand source code
@synchronized def set_committed(self, value=True): """Set committed flag. If value is True, set committed to be True. If value is False, set committed to be False for this and all parents. """ if value == self._committed: return self._committed = value if self._committed is False and self.parent is not None: self.parent.set_committed(False)Set committed flag.
If value is True, set committed to be True.
If value is False, set committed to be False for this and all parents.
def set_segments(self, segs)-
Expand source code
@synchronized def set_segments(self, segs): self._segments = segs def size(self)-
Expand source code
@synchronized def size(self): """Get the file size.""" if self._segments: n = self._segments[-1] return n.range_start + n.range_size else: return 0Get the file size.
def truncate(self, size)-
Expand source code
@must_be_writable @synchronized def truncate(self, size): """Shrink or expand the size of the file. If `size` is less than the size of the file, the file contents after `size` will be discarded. If `size` is greater than the current size of the file, it will be filled with zero bytes. """ if size < self.size(): new_segs = [] for r in self._segments: range_end = r.range_start+r.range_size if r.range_start >= size: # segment is past the trucate size, all done break elif size < range_end: nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break else: new_segs.append(r) self._segments = new_segs self.set_committed(False) elif size > self.size(): padding = self.parent._my_block_manager().get_padding_block() diff = size - self.size() while diff > config.KEEP_BLOCK_SIZE: self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) diff -= config.KEEP_BLOCK_SIZE if diff > 0: self._segments.append(Range(padding.blockid, self.size(), diff, 0)) self.set_committed(False) else: # size == self.size() passShrink or expand the size of the file.
If
sizeis less than the size of the file, the file contents aftersizewill be discarded. Ifsizeis greater than the current size of the file, it will be filled with zero bytes. def writable(self)-
Expand source code
def writable(self): return self.parent.writable() def writeto(self, offset, data, num_retries)-
Expand source code
@must_be_writable @synchronized def writeto(self, offset, data, num_retries): """Write `data` to the file starting at `offset`. This will update existing bytes and/or extend the size of the file as necessary. """ if not isinstance(data, bytes) and not isinstance(data, memoryview): data = data.encode() if len(data) == 0: return if offset > self.size(): self.truncate(offset) if len(data) > config.KEEP_BLOCK_SIZE: # Chunk it up into smaller writes n = 0 dataview = memoryview(data) while n < len(data): self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) n += config.KEEP_BLOCK_SIZE return self.set_committed(False) if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._current_bblock.repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) self.parent.notify(WRITE, self.parent, self.name, (self, self)) return len(data)Write
datato the file starting atoffset.This will update existing bytes and/or extend the size of the file as necessary.
class ArvadosFileReader (arvadosfile, mode='r', num_retries=None)-
Expand source code
class ArvadosFileReader(ArvadosFileReaderBase): """Wraps ArvadosFile in a file-like object supporting reading only. Be aware that this class is NOT thread safe as there is no locking around updating file pointer. """ def __init__(self, arvadosfile, mode="r", num_retries=None): super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) self.arvadosfile = arvadosfile def size(self): return self.arvadosfile.size() def stream_name(self): return self.arvadosfile.parent.stream_name() def readinto(self, b): data = self.read(len(b)) b[:len(data)] = data return len(data) @_FileLikeObjectBase._before_close @retry_method def read(self, size=None, num_retries=None): """Read up to `size` bytes from the file and return the result. Starts at the current file position. If `size` is None, read the entire remainder of the file. """ if size is None: data = [] rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) while rd: data.append(rd) self._filepos += len(rd) rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) return b''.join(data) else: data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) self._filepos += len(data) return data @_FileLikeObjectBase._before_close @retry_method def readfrom(self, offset, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the specified file offset. This method does not change the file position. """ return self.arvadosfile.readfrom(offset, size, num_retries) def flush(self): passWraps ArvadosFile in a file-like object supporting reading only.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
Ancestors
- ArvadosFileReaderBase
- arvados.arvfile._FileLikeObjectBase
Subclasses
Methods
def flush(self)-
Expand source code
def flush(self): pass def read(self, size=None, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def read(self, size=None, num_retries=None): """Read up to `size` bytes from the file and return the result. Starts at the current file position. If `size` is None, read the entire remainder of the file. """ if size is None: data = [] rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) while rd: data.append(rd) self._filepos += len(rd) rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) return b''.join(data) else: data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) self._filepos += len(data) return dataRead up to
sizebytes from the file and return the result.Starts at the current file position. If
sizeis None, read the entire remainder of the file. def readfrom(self, offset, size, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def readfrom(self, offset, size, num_retries=None): """Read up to `size` bytes from the stream, starting at the specified file offset. This method does not change the file position. """ return self.arvadosfile.readfrom(offset, size, num_retries)Read up to
sizebytes from the stream, starting at the specified file offset.This method does not change the file position.
def readinto(self, b)-
Expand source code
def readinto(self, b): data = self.read(len(b)) b[:len(data)] = data return len(data) def size(self)-
Expand source code
def size(self): return self.arvadosfile.size() def stream_name(self)-
Expand source code
def stream_name(self): return self.arvadosfile.parent.stream_name()
class ArvadosFileReaderBase (name, mode, num_retries=None)-
Expand source code
class ArvadosFileReaderBase(_FileLikeObjectBase): def __init__(self, name, mode, num_retries=None): super(ArvadosFileReaderBase, self).__init__(name, mode) self._filepos = 0 self.num_retries = num_retries self._readline_cache = (None, None) def __iter__(self): while True: data = self.readline() if not data: break yield data def decompressed_name(self): return re.sub('\.(bz2|gz)$', '', self.name) @_FileLikeObjectBase._before_close def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_CUR: pos += self._filepos elif whence == os.SEEK_END: pos += self.size() if pos < 0: raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") self._filepos = pos return self._filepos def tell(self): return self._filepos def readable(self): return True def writable(self): return False def seekable(self): return True @_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: data = self.read(size, num_retries=num_retries) if len(data) == 0: break yield data @_FileLikeObjectBase._before_close @retry_method def readline(self, size=float('inf'), num_retries=None): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] self._filepos += len(cache_data) else: data = [b''] data_size = len(data[-1]) while (data_size < size) and (b'\n' not in data[-1]): next_read = self.read(2 ** 20, num_retries=num_retries) if not next_read: break data.append(next_read) data_size += len(next_read) data = b''.join(data) try: nextline_index = data.index(b'\n') + 1 except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index].decode() @_FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): for segment in self.readall(size, num_retries=num_retries): data = decompress(segment) if data: yield data @_FileLikeObjectBase._before_close @retry_method def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) if self.name.endswith('.bz2'): dc = bz2.BZ2Decompressor() return self.decompress(dc.decompress, size, num_retries=num_retries) elif self.name.endswith('.gz'): dc = zlib.decompressobj(16+zlib.MAX_WBITS) return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size, num_retries=num_retries) else: return self.readall(size, num_retries=num_retries) @_FileLikeObjectBase._before_close @retry_method def readlines(self, sizehint=float('inf'), num_retries=None): data = [] data_size = 0 for s in self.readall(num_retries=num_retries): data.append(s) data_size += len(s) if data_size >= sizehint: break return b''.join(data).decode().splitlines(True) def size(self): raise IOError(errno.ENOSYS, "Not implemented") def read(self, size, num_retries=None): raise IOError(errno.ENOSYS, "Not implemented") def readfrom(self, start, size, num_retries=None): raise IOError(errno.ENOSYS, "Not implemented")Ancestors
- arvados.arvfile._FileLikeObjectBase
Subclasses
Methods
def decompress(self, decompress, size, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def decompress(self, decompress, size, num_retries=None): for segment in self.readall(size, num_retries=num_retries): data = decompress(segment) if data: yield data def decompressed_name(self)-
Expand source code
def decompressed_name(self): return re.sub('\.(bz2|gz)$', '', self.name) def read(self, size, num_retries=None)-
Expand source code
def read(self, size, num_retries=None): raise IOError(errno.ENOSYS, "Not implemented") def readable(self)-
Expand source code
def readable(self): return True def readall(self, size=1048576, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def readall(self, size=2**20, num_retries=None): while True: data = self.read(size, num_retries=num_retries) if len(data) == 0: break yield data def readall_decompressed(self, size=1048576, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def readall_decompressed(self, size=2**20, num_retries=None): self.seek(0) if self.name.endswith('.bz2'): dc = bz2.BZ2Decompressor() return self.decompress(dc.decompress, size, num_retries=num_retries) elif self.name.endswith('.gz'): dc = zlib.decompressobj(16+zlib.MAX_WBITS) return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size, num_retries=num_retries) else: return self.readall(size, num_retries=num_retries) def readfrom(self, start, size, num_retries=None)-
Expand source code
def readfrom(self, start, size, num_retries=None): raise IOError(errno.ENOSYS, "Not implemented") def readline(self, size=inf, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def readline(self, size=float('inf'), num_retries=None): cache_pos, cache_data = self._readline_cache if self.tell() == cache_pos: data = [cache_data] self._filepos += len(cache_data) else: data = [b''] data_size = len(data[-1]) while (data_size < size) and (b'\n' not in data[-1]): next_read = self.read(2 ** 20, num_retries=num_retries) if not next_read: break data.append(next_read) data_size += len(next_read) data = b''.join(data) try: nextline_index = data.index(b'\n') + 1 except ValueError: nextline_index = len(data) nextline_index = min(nextline_index, size) self._filepos -= len(data) - nextline_index self._readline_cache = (self.tell(), data[nextline_index:]) return data[:nextline_index].decode() def readlines(self, sizehint=inf, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def readlines(self, sizehint=float('inf'), num_retries=None): data = [] data_size = 0 for s in self.readall(num_retries=num_retries): data.append(s) data_size += len(s) if data_size >= sizehint: break return b''.join(data).decode().splitlines(True) def seek(self, pos, whence=0)-
Expand source code
@_FileLikeObjectBase._before_close def seek(self, pos, whence=os.SEEK_SET): if whence == os.SEEK_CUR: pos += self._filepos elif whence == os.SEEK_END: pos += self.size() if pos < 0: raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") self._filepos = pos return self._filepos def seekable(self)-
Expand source code
def seekable(self): return True def size(self)-
Expand source code
def size(self): raise IOError(errno.ENOSYS, "Not implemented") def tell(self)-
Expand source code
def tell(self): return self._filepos def writable(self)-
Expand source code
def writable(self): return False
class ArvadosFileWriter (arvadosfile, mode, num_retries=None)-
Expand source code
class ArvadosFileWriter(ArvadosFileReader): """Wraps ArvadosFile in a file-like object supporting both reading and writing. Be aware that this class is NOT thread safe as there is no locking around updating file pointer. """ def __init__(self, arvadosfile, mode, num_retries=None): super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) self.arvadosfile.add_writer(self) def writable(self): return True @_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": self._filepos = self.size() self.arvadosfile.writeto(self._filepos, data, num_retries) self._filepos += len(data) return len(data) @_FileLikeObjectBase._before_close @retry_method def writelines(self, seq, num_retries=None): for s in seq: self.write(s, num_retries=num_retries) @_FileLikeObjectBase._before_close def truncate(self, size=None): if size is None: size = self._filepos self.arvadosfile.truncate(size) @_FileLikeObjectBase._before_close def flush(self): self.arvadosfile.flush() def close(self, flush=True): if not self.closed: self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close()Wraps ArvadosFile in a file-like object supporting both reading and writing.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
Ancestors
- ArvadosFileReader
- ArvadosFileReaderBase
- arvados.arvfile._FileLikeObjectBase
Methods
def close(self, flush=True)-
Expand source code
def close(self, flush=True): if not self.closed: self.arvadosfile.remove_writer(self, flush) super(ArvadosFileWriter, self).close() def flush(self)-
Expand source code
@_FileLikeObjectBase._before_close def flush(self): self.arvadosfile.flush() def truncate(self, size=None)-
Expand source code
@_FileLikeObjectBase._before_close def truncate(self, size=None): if size is None: size = self._filepos self.arvadosfile.truncate(size) def writable(self)-
Expand source code
def writable(self): return True def write(self, data, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def write(self, data, num_retries=None): if self.mode[0] == "a": self._filepos = self.size() self.arvadosfile.writeto(self._filepos, data, num_retries) self._filepos += len(data) return len(data) def writelines(self, seq, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def writelines(self, seq, num_retries=None): for s in seq: self.write(s, num_retries=num_retries)
Inherited members
class NoopLock-
Expand source code
class NoopLock(object): def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): pass def acquire(self, blocking=False): pass def release(self): passMethods
def acquire(self, blocking=False)-
Expand source code
def acquire(self, blocking=False): pass def release(self)-
Expand source code
def release(self): pass
class StateChangeError (message, state, nextstate)-
Expand source code
class StateChangeError(Exception): def __init__(self, message, state, nextstate): super(StateChangeError, self).__init__(message) self.state = state self.nextstate = nextstateCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class StreamFileReader (stream, segments, name)-
Expand source code
class StreamFileReader(ArvadosFileReaderBase): class _NameAttribute(str): # The Python file API provides a plain .name attribute. # Older SDK provided a name() method. # This class provides both, for maximum compatibility. def __call__(self): return self def __init__(self, stream, segments, name): super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) self._stream = stream self.segments = segments def stream_name(self): return self._stream.name() def size(self): n = self.segments[-1] return n.range_start + n.range_size @_FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: return b'' data = b'' available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries) self._filepos += len(data) return data @_FileLikeObjectBase._before_close @retry_method def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return b'' data = [] for lr in locators_and_ranges(self.segments, start, size): data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) return b''.join(data) def as_manifest(self): segs = [] for r in self.segments: segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) return " ".join(normalize_stream(".", {self.name: segs})) + "\n"Ancestors
- ArvadosFileReaderBase
- arvados.arvfile._FileLikeObjectBase
Methods
def as_manifest(self)-
Expand source code
def as_manifest(self): segs = [] for r in self.segments: segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) return " ".join(normalize_stream(".", {self.name: segs})) + "\n" def read(self, size, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def read(self, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at the current file position""" if size == 0: return b'' data = b'' available_chunks = locators_and_ranges(self.segments, self._filepos, size) if available_chunks: lr = available_chunks[0] data = self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries) self._filepos += len(data) return dataRead up to 'size' bytes from the stream, starting at the current file position
def readfrom(self, start, size, num_retries=None)-
Expand source code
@_FileLikeObjectBase._before_close @retry_method def readfrom(self, start, size, num_retries=None): """Read up to 'size' bytes from the stream, starting at 'start'""" if size == 0: return b'' data = [] for lr in locators_and_ranges(self.segments, start, size): data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, num_retries=num_retries)) return b''.join(data)Read up to 'size' bytes from the stream, starting at 'start'
def size(self)-
Expand source code
def size(self): n = self.segments[-1] return n.range_start + n.range_size def stream_name(self)-
Expand source code
def stream_name(self): return self._stream.name()
class UnownedBlockError (*args, **kwargs)-
Expand source code
class UnownedBlockError(Exception): """Raised when there's an writable block without an owner on the BlockManager.""" passRaised when there's an writable block without an owner on the BlockManager.
Ancestors
- builtins.Exception
- builtins.BaseException
class WrappableFile (f)-
Expand source code
class WrappableFile(object): """An interface to an Arvados file that's compatible with io wrappers. """ def __init__(self, f): self.f = f self.closed = False def close(self): self.closed = True return self.f.close() def flush(self): return self.f.flush() def read(self, *args, **kwargs): return self.f.read(*args, **kwargs) def readable(self): return self.f.readable() def readinto(self, *args, **kwargs): return self.f.readinto(*args, **kwargs) def seek(self, *args, **kwargs): return self.f.seek(*args, **kwargs) def seekable(self): return self.f.seekable() def tell(self): return self.f.tell() def writable(self): return self.f.writable() def write(self, *args, **kwargs): return self.f.write(*args, **kwargs)An interface to an Arvados file that's compatible with io wrappers.
Methods
def close(self)-
Expand source code
def close(self): self.closed = True return self.f.close() def flush(self)-
Expand source code
def flush(self): return self.f.flush() def read(self, *args, **kwargs)-
Expand source code
def read(self, *args, **kwargs): return self.f.read(*args, **kwargs) def readable(self)-
Expand source code
def readable(self): return self.f.readable() def readinto(self, *args, **kwargs)-
Expand source code
def readinto(self, *args, **kwargs): return self.f.readinto(*args, **kwargs) def seek(self, *args, **kwargs)-
Expand source code
def seek(self, *args, **kwargs): return self.f.seek(*args, **kwargs) def seekable(self)-
Expand source code
def seekable(self): return self.f.seekable() def tell(self)-
Expand source code
def tell(self): return self.f.tell() def writable(self)-
Expand source code
def writable(self): return self.f.writable() def write(self, *args, **kwargs)-
Expand source code
def write(self, *args, **kwargs): return self.f.write(*args, **kwargs)