arvados.arvfile

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5from __future__ import absolute_import
   6from __future__ import division
   7from future import standard_library
   8from future.utils import listitems, listvalues
   9standard_library.install_aliases()
  10from builtins import range
  11from builtins import object
  12import bz2
  13import collections
  14import copy
  15import errno
  16import functools
  17import hashlib
  18import logging
  19import os
  20import queue
  21import re
  22import sys
  23import threading
  24import uuid
  25import zlib
  26
  27from . import config
  28from .errors import KeepWriteError, AssertionError, ArgumentError
  29from .keep import KeepLocator
  30from ._normalize_stream import normalize_stream
  31from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
  32from .retry import retry_method
  33
  34MOD = "mod"
  35WRITE = "write"
  36
  37_logger = logging.getLogger('arvados.arvfile')
  38
  39def split(path):
  40    """split(path) -> streamname, filename
  41
  42    Separate the stream name and file name in a /-separated stream path and
  43    return a tuple (stream_name, file_name).  If no stream name is available,
  44    assume '.'.
  45
  46    """
  47    try:
  48        stream_name, file_name = path.rsplit('/', 1)
  49    except ValueError:  # No / in string
  50        stream_name, file_name = '.', path
  51    return stream_name, file_name
  52
  53
  54class UnownedBlockError(Exception):
  55    """Raised when there's an writable block without an owner on the BlockManager."""
  56    pass
  57
  58
  59class _FileLikeObjectBase(object):
  60    def __init__(self, name, mode):
  61        self.name = name
  62        self.mode = mode
  63        self.closed = False
  64
  65    @staticmethod
  66    def _before_close(orig_func):
  67        @functools.wraps(orig_func)
  68        def before_close_wrapper(self, *args, **kwargs):
  69            if self.closed:
  70                raise ValueError("I/O operation on closed stream file")
  71            return orig_func(self, *args, **kwargs)
  72        return before_close_wrapper
  73
  74    def __enter__(self):
  75        return self
  76
  77    def __exit__(self, exc_type, exc_value, traceback):
  78        try:
  79            self.close()
  80        except Exception:
  81            if exc_type is None:
  82                raise
  83
  84    def close(self):
  85        self.closed = True
  86
  87
  88class ArvadosFileReaderBase(_FileLikeObjectBase):
  89    def __init__(self, name, mode, num_retries=None):
  90        super(ArvadosFileReaderBase, self).__init__(name, mode)
  91        self._filepos = 0
  92        self.num_retries = num_retries
  93        self._readline_cache = (None, None)
  94
  95    def __iter__(self):
  96        while True:
  97            data = self.readline()
  98            if not data:
  99                break
 100            yield data
 101
 102    def decompressed_name(self):
 103        return re.sub(r'\.(bz2|gz)$', '', self.name)
 104
 105    @_FileLikeObjectBase._before_close
 106    def seek(self, pos, whence=os.SEEK_SET):
 107        if whence == os.SEEK_CUR:
 108            pos += self._filepos
 109        elif whence == os.SEEK_END:
 110            pos += self.size()
 111        if pos < 0:
 112            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
 113        self._filepos = pos
 114        return self._filepos
 115
 116    def tell(self):
 117        return self._filepos
 118
 119    def readable(self):
 120        return True
 121
 122    def writable(self):
 123        return False
 124
 125    def seekable(self):
 126        return True
 127
 128    @_FileLikeObjectBase._before_close
 129    @retry_method
 130    def readall(self, size=2**20, num_retries=None):
 131        while True:
 132            data = self.read(size, num_retries=num_retries)
 133            if len(data) == 0:
 134                break
 135            yield data
 136
 137    @_FileLikeObjectBase._before_close
 138    @retry_method
 139    def readline(self, size=float('inf'), num_retries=None):
 140        cache_pos, cache_data = self._readline_cache
 141        if self.tell() == cache_pos:
 142            data = [cache_data]
 143            self._filepos += len(cache_data)
 144        else:
 145            data = [b'']
 146        data_size = len(data[-1])
 147        while (data_size < size) and (b'\n' not in data[-1]):
 148            next_read = self.read(2 ** 20, num_retries=num_retries)
 149            if not next_read:
 150                break
 151            data.append(next_read)
 152            data_size += len(next_read)
 153        data = b''.join(data)
 154        try:
 155            nextline_index = data.index(b'\n') + 1
 156        except ValueError:
 157            nextline_index = len(data)
 158        nextline_index = min(nextline_index, size)
 159        self._filepos -= len(data) - nextline_index
 160        self._readline_cache = (self.tell(), data[nextline_index:])
 161        return data[:nextline_index].decode()
 162
 163    @_FileLikeObjectBase._before_close
 164    @retry_method
 165    def decompress(self, decompress, size, num_retries=None):
 166        for segment in self.readall(size, num_retries=num_retries):
 167            data = decompress(segment)
 168            if data:
 169                yield data
 170
 171    @_FileLikeObjectBase._before_close
 172    @retry_method
 173    def readall_decompressed(self, size=2**20, num_retries=None):
 174        self.seek(0)
 175        if self.name.endswith('.bz2'):
 176            dc = bz2.BZ2Decompressor()
 177            return self.decompress(dc.decompress, size,
 178                                   num_retries=num_retries)
 179        elif self.name.endswith('.gz'):
 180            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
 181            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
 182                                   size, num_retries=num_retries)
 183        else:
 184            return self.readall(size, num_retries=num_retries)
 185
 186    @_FileLikeObjectBase._before_close
 187    @retry_method
 188    def readlines(self, sizehint=float('inf'), num_retries=None):
 189        data = []
 190        data_size = 0
 191        for s in self.readall(num_retries=num_retries):
 192            data.append(s)
 193            data_size += len(s)
 194            if data_size >= sizehint:
 195                break
 196        return b''.join(data).decode().splitlines(True)
 197
 198    def size(self):
 199        raise IOError(errno.ENOSYS, "Not implemented")
 200
 201    def read(self, size, num_retries=None):
 202        raise IOError(errno.ENOSYS, "Not implemented")
 203
 204    def readfrom(self, start, size, num_retries=None):
 205        raise IOError(errno.ENOSYS, "Not implemented")
 206
 207
 208class StreamFileReader(ArvadosFileReaderBase):
 209    class _NameAttribute(str):
 210        # The Python file API provides a plain .name attribute.
 211        # Older SDK provided a name() method.
 212        # This class provides both, for maximum compatibility.
 213        def __call__(self):
 214            return self
 215
 216    def __init__(self, stream, segments, name):
 217        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
 218        self._stream = stream
 219        self.segments = segments
 220
 221    def stream_name(self):
 222        return self._stream.name()
 223
 224    def size(self):
 225        n = self.segments[-1]
 226        return n.range_start + n.range_size
 227
 228    @_FileLikeObjectBase._before_close
 229    @retry_method
 230    def read(self, size, num_retries=None):
 231        """Read up to 'size' bytes from the stream, starting at the current file position"""
 232        if size == 0:
 233            return b''
 234
 235        data = b''
 236        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
 237        if available_chunks:
 238            lr = available_chunks[0]
 239            data = self._stream.readfrom(lr.locator+lr.segment_offset,
 240                                         lr.segment_size,
 241                                         num_retries=num_retries)
 242
 243        self._filepos += len(data)
 244        return data
 245
 246    @_FileLikeObjectBase._before_close
 247    @retry_method
 248    def readfrom(self, start, size, num_retries=None):
 249        """Read up to 'size' bytes from the stream, starting at 'start'"""
 250        if size == 0:
 251            return b''
 252
 253        data = []
 254        for lr in locators_and_ranges(self.segments, start, size):
 255            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
 256                                              num_retries=num_retries))
 257        return b''.join(data)
 258
 259    def as_manifest(self):
 260        segs = []
 261        for r in self.segments:
 262            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
 263        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
 264
 265
 266def synchronized(orig_func):
 267    @functools.wraps(orig_func)
 268    def synchronized_wrapper(self, *args, **kwargs):
 269        with self.lock:
 270            return orig_func(self, *args, **kwargs)
 271    return synchronized_wrapper
 272
 273
 274class StateChangeError(Exception):
 275    def __init__(self, message, state, nextstate):
 276        super(StateChangeError, self).__init__(message)
 277        self.state = state
 278        self.nextstate = nextstate
 279
 280class _BufferBlock(object):
 281    """A stand-in for a Keep block that is in the process of being written.
 282
 283    Writers can append to it, get the size, and compute the Keep locator.
 284    There are three valid states:
 285
 286    WRITABLE
 287      Can append to block.
 288
 289    PENDING
 290      Block is in the process of being uploaded to Keep, append is an error.
 291
 292    COMMITTED
 293      The block has been written to Keep, its internal buffer has been
 294      released, fetching the block will fetch it via keep client (since we
 295      discarded the internal copy), and identifiers referring to the BufferBlock
 296      can be replaced with the block locator.
 297
 298    """
 299
 300    WRITABLE = 0
 301    PENDING = 1
 302    COMMITTED = 2
 303    ERROR = 3
 304    DELETED = 4
 305
 306    def __init__(self, blockid, starting_capacity, owner):
 307        """
 308        :blockid:
 309          the identifier for this block
 310
 311        :starting_capacity:
 312          the initial buffer capacity
 313
 314        :owner:
 315          ArvadosFile that owns this block
 316
 317        """
 318        self.blockid = blockid
 319        self.buffer_block = bytearray(starting_capacity)
 320        self.buffer_view = memoryview(self.buffer_block)
 321        self.write_pointer = 0
 322        self._state = _BufferBlock.WRITABLE
 323        self._locator = None
 324        self.owner = owner
 325        self.lock = threading.Lock()
 326        self.wait_for_commit = threading.Event()
 327        self.error = None
 328
 329    @synchronized
 330    def append(self, data):
 331        """Append some data to the buffer.
 332
 333        Only valid if the block is in WRITABLE state.  Implements an expanding
 334        buffer, doubling capacity as needed to accomdate all the data.
 335
 336        """
 337        if self._state == _BufferBlock.WRITABLE:
 338            if not isinstance(data, bytes) and not isinstance(data, memoryview):
 339                data = data.encode()
 340            while (self.write_pointer+len(data)) > len(self.buffer_block):
 341                new_buffer_block = bytearray(len(self.buffer_block) * 2)
 342                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
 343                self.buffer_block = new_buffer_block
 344                self.buffer_view = memoryview(self.buffer_block)
 345            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
 346            self.write_pointer += len(data)
 347            self._locator = None
 348        else:
 349            raise AssertionError("Buffer block is not writable")
 350
 351    STATE_TRANSITIONS = frozenset([
 352            (WRITABLE, PENDING),
 353            (PENDING, COMMITTED),
 354            (PENDING, ERROR),
 355            (ERROR, PENDING)])
 356
 357    @synchronized
 358    def set_state(self, nextstate, val=None):
 359        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
 360            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
 361        self._state = nextstate
 362
 363        if self._state == _BufferBlock.PENDING:
 364            self.wait_for_commit.clear()
 365
 366        if self._state == _BufferBlock.COMMITTED:
 367            self._locator = val
 368            self.buffer_view = None
 369            self.buffer_block = None
 370            self.wait_for_commit.set()
 371
 372        if self._state == _BufferBlock.ERROR:
 373            self.error = val
 374            self.wait_for_commit.set()
 375
 376    @synchronized
 377    def state(self):
 378        return self._state
 379
 380    def size(self):
 381        """The amount of data written to the buffer."""
 382        return self.write_pointer
 383
 384    @synchronized
 385    def locator(self):
 386        """The Keep locator for this buffer's contents."""
 387        if self._locator is None:
 388            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
 389        return self._locator
 390
 391    @synchronized
 392    def clone(self, new_blockid, owner):
 393        if self._state == _BufferBlock.COMMITTED:
 394            raise AssertionError("Cannot duplicate committed buffer block")
 395        bufferblock = _BufferBlock(new_blockid, self.size(), owner)
 396        bufferblock.append(self.buffer_view[0:self.size()])
 397        return bufferblock
 398
 399    @synchronized
 400    def clear(self):
 401        self._state = _BufferBlock.DELETED
 402        self.owner = None
 403        self.buffer_block = None
 404        self.buffer_view = None
 405
 406    @synchronized
 407    def repack_writes(self):
 408        """Optimize buffer block by repacking segments in file sequence.
 409
 410        When the client makes random writes, they appear in the buffer block in
 411        the sequence they were written rather than the sequence they appear in
 412        the file.  This makes for inefficient, fragmented manifests.  Attempt
 413        to optimize by repacking writes in file sequence.
 414
 415        """
 416        if self._state != _BufferBlock.WRITABLE:
 417            raise AssertionError("Cannot repack non-writable block")
 418
 419        segs = self.owner.segments()
 420
 421        # Collect the segments that reference the buffer block.
 422        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
 423
 424        # Collect total data referenced by segments (could be smaller than
 425        # bufferblock size if a portion of the file was written and
 426        # then overwritten).
 427        write_total = sum([s.range_size for s in bufferblock_segs])
 428
 429        if write_total < self.size() or len(bufferblock_segs) > 1:
 430            # If there's more than one segment referencing this block, it is
 431            # due to out-of-order writes and will produce a fragmented
 432            # manifest, so try to optimize by re-packing into a new buffer.
 433            contents = self.buffer_view[0:self.write_pointer].tobytes()
 434            new_bb = _BufferBlock(None, write_total, None)
 435            for t in bufferblock_segs:
 436                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
 437                t.segment_offset = new_bb.size() - t.range_size
 438
 439            self.buffer_block = new_bb.buffer_block
 440            self.buffer_view = new_bb.buffer_view
 441            self.write_pointer = new_bb.write_pointer
 442            self._locator = None
 443            new_bb.clear()
 444            self.owner.set_segments(segs)
 445
 446    def __repr__(self):
 447        return "<BufferBlock %s>" % (self.blockid)
 448
 449
 450class NoopLock(object):
 451    def __enter__(self):
 452        return self
 453
 454    def __exit__(self, exc_type, exc_value, traceback):
 455        pass
 456
 457    def acquire(self, blocking=False):
 458        pass
 459
 460    def release(self):
 461        pass
 462
 463
 464def must_be_writable(orig_func):
 465    @functools.wraps(orig_func)
 466    def must_be_writable_wrapper(self, *args, **kwargs):
 467        if not self.writable():
 468            raise IOError(errno.EROFS, "Collection is read-only.")
 469        return orig_func(self, *args, **kwargs)
 470    return must_be_writable_wrapper
 471
 472
 473class _BlockManager(object):
 474    """BlockManager handles buffer blocks.
 475
 476    Also handles background block uploads, and background block prefetch for a
 477    Collection of ArvadosFiles.
 478
 479    """
 480
 481    DEFAULT_PUT_THREADS = 2
 482
 483    def __init__(self, keep,
 484                 copies=None,
 485                 put_threads=None,
 486                 num_retries=None,
 487                 storage_classes_func=None):
 488        """keep: KeepClient object to use"""
 489        self._keep = keep
 490        self._bufferblocks = collections.OrderedDict()
 491        self._put_queue = None
 492        self._put_threads = None
 493        self.lock = threading.Lock()
 494        self.prefetch_lookahead = self._keep.num_prefetch_threads
 495        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
 496        self.copies = copies
 497        self.storage_classes = storage_classes_func or (lambda: [])
 498        self._pending_write_size = 0
 499        self.threads_lock = threading.Lock()
 500        self.padding_block = None
 501        self.num_retries = num_retries
 502
 503    @synchronized
 504    def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 505        """Allocate a new, empty bufferblock in WRITABLE state and return it.
 506
 507        :blockid:
 508          optional block identifier, otherwise one will be automatically assigned
 509
 510        :starting_capacity:
 511          optional capacity, otherwise will use default capacity
 512
 513        :owner:
 514          ArvadosFile that owns this block
 515
 516        """
 517        return self._alloc_bufferblock(blockid, starting_capacity, owner)
 518
 519    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 520        if blockid is None:
 521            blockid = str(uuid.uuid4())
 522        bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
 523        self._bufferblocks[bufferblock.blockid] = bufferblock
 524        return bufferblock
 525
 526    @synchronized
 527    def dup_block(self, block, owner):
 528        """Create a new bufferblock initialized with the content of an existing bufferblock.
 529
 530        :block:
 531          the buffer block to copy.
 532
 533        :owner:
 534          ArvadosFile that owns the new block
 535
 536        """
 537        new_blockid = str(uuid.uuid4())
 538        bufferblock = block.clone(new_blockid, owner)
 539        self._bufferblocks[bufferblock.blockid] = bufferblock
 540        return bufferblock
 541
 542    @synchronized
 543    def is_bufferblock(self, locator):
 544        return locator in self._bufferblocks
 545
 546    def _commit_bufferblock_worker(self):
 547        """Background uploader thread."""
 548
 549        while True:
 550            try:
 551                bufferblock = self._put_queue.get()
 552                if bufferblock is None:
 553                    return
 554
 555                if self.copies is None:
 556                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 557                else:
 558                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 559                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 560            except Exception as e:
 561                bufferblock.set_state(_BufferBlock.ERROR, e)
 562            finally:
 563                if self._put_queue is not None:
 564                    self._put_queue.task_done()
 565
 566    def start_put_threads(self):
 567        with self.threads_lock:
 568            if self._put_threads is None:
 569                # Start uploader threads.
 570
 571                # If we don't limit the Queue size, the upload queue can quickly
 572                # grow to take up gigabytes of RAM if the writing process is
 573                # generating data more quickly than it can be sent to the Keep
 574                # servers.
 575                #
 576                # With two upload threads and a queue size of 2, this means up to 4
 577                # blocks pending.  If they are full 64 MiB blocks, that means up to
 578                # 256 MiB of internal buffering, which is the same size as the
 579                # default download block cache in KeepClient.
 580                self._put_queue = queue.Queue(maxsize=2)
 581
 582                self._put_threads = []
 583                for i in range(0, self.num_put_threads):
 584                    thread = threading.Thread(target=self._commit_bufferblock_worker)
 585                    self._put_threads.append(thread)
 586                    thread.daemon = True
 587                    thread.start()
 588
 589    @synchronized
 590    def stop_threads(self):
 591        """Shut down and wait for background upload and download threads to finish."""
 592
 593        if self._put_threads is not None:
 594            for t in self._put_threads:
 595                self._put_queue.put(None)
 596            for t in self._put_threads:
 597                t.join()
 598        self._put_threads = None
 599        self._put_queue = None
 600
 601    def __enter__(self):
 602        return self
 603
 604    def __exit__(self, exc_type, exc_value, traceback):
 605        self.stop_threads()
 606
 607    @synchronized
 608    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
 609        """Packs small blocks together before uploading"""
 610
 611        self._pending_write_size += closed_file_size
 612
 613        # Check if there are enough small blocks for filling up one in full
 614        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
 615            return
 616
 617        # Search blocks ready for getting packed together before being
 618        # committed to Keep.
 619        # A WRITABLE block always has an owner.
 620        # A WRITABLE block with its owner.closed() implies that its
 621        # size is <= KEEP_BLOCK_SIZE/2.
 622        try:
 623            small_blocks = [b for b in listvalues(self._bufferblocks)
 624                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
 625        except AttributeError:
 626            # Writable blocks without owner shouldn't exist.
 627            raise UnownedBlockError()
 628
 629        if len(small_blocks) <= 1:
 630            # Not enough small blocks for repacking
 631            return
 632
 633        for bb in small_blocks:
 634            bb.repack_writes()
 635
 636        # Update the pending write size count with its true value, just in case
 637        # some small file was opened, written and closed several times.
 638        self._pending_write_size = sum([b.size() for b in small_blocks])
 639
 640        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
 641            return
 642
 643        new_bb = self._alloc_bufferblock()
 644        new_bb.owner = []
 645        files = []
 646        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
 647            bb = small_blocks.pop(0)
 648            new_bb.owner.append(bb.owner)
 649            self._pending_write_size -= bb.size()
 650            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
 651            files.append((bb, new_bb.write_pointer - bb.size()))
 652
 653        self.commit_bufferblock(new_bb, sync=sync)
 654
 655        for bb, new_bb_segment_offset in files:
 656            newsegs = bb.owner.segments()
 657            for s in newsegs:
 658                if s.locator == bb.blockid:
 659                    s.locator = new_bb.blockid
 660                    s.segment_offset = new_bb_segment_offset+s.segment_offset
 661            bb.owner.set_segments(newsegs)
 662            self._delete_bufferblock(bb.blockid)
 663
 664    def commit_bufferblock(self, block, sync):
 665        """Initiate a background upload of a bufferblock.
 666
 667        :block:
 668          The block object to upload
 669
 670        :sync:
 671          If `sync` is True, upload the block synchronously.
 672          If `sync` is False, upload the block asynchronously.  This will
 673          return immediately unless the upload queue is at capacity, in
 674          which case it will wait on an upload queue slot.
 675
 676        """
 677        try:
 678            # Mark the block as PENDING so to disallow any more appends.
 679            block.set_state(_BufferBlock.PENDING)
 680        except StateChangeError as e:
 681            if e.state == _BufferBlock.PENDING:
 682                if sync:
 683                    block.wait_for_commit.wait()
 684                else:
 685                    return
 686            if block.state() == _BufferBlock.COMMITTED:
 687                return
 688            elif block.state() == _BufferBlock.ERROR:
 689                raise block.error
 690            else:
 691                raise
 692
 693        if sync:
 694            try:
 695                if self.copies is None:
 696                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 697                else:
 698                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 699                block.set_state(_BufferBlock.COMMITTED, loc)
 700            except Exception as e:
 701                block.set_state(_BufferBlock.ERROR, e)
 702                raise
 703        else:
 704            self.start_put_threads()
 705            self._put_queue.put(block)
 706
 707    @synchronized
 708    def get_bufferblock(self, locator):
 709        return self._bufferblocks.get(locator)
 710
 711    @synchronized
 712    def get_padding_block(self):
 713        """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
 714        when using truncate() to extend the size of a file.
 715
 716        For reference (and possible future optimization), the md5sum of the
 717        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
 718
 719        """
 720
 721        if self.padding_block is None:
 722            self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
 723            self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
 724            self.commit_bufferblock(self.padding_block, False)
 725        return self.padding_block
 726
 727    @synchronized
 728    def delete_bufferblock(self, locator):
 729        self._delete_bufferblock(locator)
 730
 731    def _delete_bufferblock(self, locator):
 732        if locator in self._bufferblocks:
 733            bb = self._bufferblocks[locator]
 734            bb.clear()
 735            del self._bufferblocks[locator]
 736
 737    def get_block_contents(self, locator, num_retries, cache_only=False):
 738        """Fetch a block.
 739
 740        First checks to see if the locator is a BufferBlock and return that, if
 741        not, passes the request through to KeepClient.get().
 742
 743        """
 744        with self.lock:
 745            if locator in self._bufferblocks:
 746                bufferblock = self._bufferblocks[locator]
 747                if bufferblock.state() != _BufferBlock.COMMITTED:
 748                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
 749                else:
 750                    locator = bufferblock._locator
 751        if cache_only:
 752            return self._keep.get_from_cache(locator)
 753        else:
 754            return self._keep.get(locator, num_retries=num_retries)
 755
 756    def commit_all(self):
 757        """Commit all outstanding buffer blocks.
 758
 759        This is a synchronous call, and will not return until all buffer blocks
 760        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 761
 762        """
 763        self.repack_small_blocks(force=True, sync=True)
 764
 765        with self.lock:
 766            items = listitems(self._bufferblocks)
 767
 768        for k,v in items:
 769            if v.state() != _BufferBlock.COMMITTED and v.owner:
 770                # Ignore blocks with a list of owners, as if they're not in COMMITTED
 771                # state, they're already being committed asynchronously.
 772                if isinstance(v.owner, ArvadosFile):
 773                    v.owner.flush(sync=False)
 774
 775        with self.lock:
 776            if self._put_queue is not None:
 777                self._put_queue.join()
 778
 779                err = []
 780                for k,v in items:
 781                    if v.state() == _BufferBlock.ERROR:
 782                        err.append((v.locator(), v.error))
 783                if err:
 784                    raise KeepWriteError("Error writing some blocks", err, label="block")
 785
 786        for k,v in items:
 787            # flush again with sync=True to remove committed bufferblocks from
 788            # the segments.
 789            if v.owner:
 790                if isinstance(v.owner, ArvadosFile):
 791                    v.owner.flush(sync=True)
 792                elif isinstance(v.owner, list) and len(v.owner) > 0:
 793                    # This bufferblock is referenced by many files as a result
 794                    # of repacking small blocks, so don't delete it when flushing
 795                    # its owners, just do it after flushing them all.
 796                    for owner in v.owner:
 797                        owner.flush(sync=True)
 798                    self.delete_bufferblock(k)
 799
 800        self.stop_threads()
 801
 802    def block_prefetch(self, locator):
 803        """Initiate a background download of a block.
 804        """
 805
 806        if not self.prefetch_lookahead:
 807            return
 808
 809        with self.lock:
 810            if locator in self._bufferblocks:
 811                return
 812
 813        self._keep.block_prefetch(locator)
 814
 815
 816class ArvadosFile(object):
 817    """Represent a file in a Collection.
 818
 819    ArvadosFile manages the underlying representation of a file in Keep as a
 820    sequence of segments spanning a set of blocks, and implements random
 821    read/write access.
 822
 823    This object may be accessed from multiple threads.
 824
 825    """
 826
 827    __slots__ = ('parent', 'name', '_writers', '_committed',
 828                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 829
 830    def __init__(self, parent, name, stream=[], segments=[]):
 831        """
 832        ArvadosFile constructor.
 833
 834        :stream:
 835          a list of Range objects representing a block stream
 836
 837        :segments:
 838          a list of Range objects representing segments
 839        """
 840        self.parent = parent
 841        self.name = name
 842        self._writers = set()
 843        self._committed = False
 844        self._segments = []
 845        self.lock = parent.root_collection().lock
 846        for s in segments:
 847            self._add_segment(stream, s.locator, s.range_size)
 848        self._current_bblock = None
 849        self._read_counter = 0
 850
 851    def writable(self):
 852        return self.parent.writable()
 853
 854    @synchronized
 855    def permission_expired(self, as_of_dt=None):
 856        """Returns True if any of the segment's locators is expired"""
 857        for r in self._segments:
 858            if KeepLocator(r.locator).permission_expired(as_of_dt):
 859                return True
 860        return False
 861
 862    @synchronized
 863    def has_remote_blocks(self):
 864        """Returns True if any of the segment's locators has a +R signature"""
 865
 866        for s in self._segments:
 867            if '+R' in s.locator:
 868                return True
 869        return False
 870
 871    @synchronized
 872    def _copy_remote_blocks(self, remote_blocks={}):
 873        """Ask Keep to copy remote blocks and point to their local copies.
 874
 875        This is called from the parent Collection.
 876
 877        :remote_blocks:
 878            Shared cache of remote to local block mappings. This is used to avoid
 879            doing extra work when blocks are shared by more than one file in
 880            different subdirectories.
 881        """
 882
 883        for s in self._segments:
 884            if '+R' in s.locator:
 885                try:
 886                    loc = remote_blocks[s.locator]
 887                except KeyError:
 888                    loc = self.parent._my_keep().refresh_signature(s.locator)
 889                    remote_blocks[s.locator] = loc
 890                s.locator = loc
 891                self.parent.set_committed(False)
 892        return remote_blocks
 893
 894    @synchronized
 895    def segments(self):
 896        return copy.copy(self._segments)
 897
 898    @synchronized
 899    def clone(self, new_parent, new_name):
 900        """Make a copy of this file."""
 901        cp = ArvadosFile(new_parent, new_name)
 902        cp.replace_contents(self)
 903        return cp
 904
 905    @must_be_writable
 906    @synchronized
 907    def replace_contents(self, other):
 908        """Replace segments of this file with segments from another `ArvadosFile` object."""
 909
 910        map_loc = {}
 911        self._segments = []
 912        for other_segment in other.segments():
 913            new_loc = other_segment.locator
 914            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 915                if other_segment.locator not in map_loc:
 916                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 917                    if bufferblock.state() != _BufferBlock.WRITABLE:
 918                        map_loc[other_segment.locator] = bufferblock.locator()
 919                    else:
 920                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 921                new_loc = map_loc[other_segment.locator]
 922
 923            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 924
 925        self.set_committed(False)
 926
 927    def __eq__(self, other):
 928        if other is self:
 929            return True
 930        if not isinstance(other, ArvadosFile):
 931            return False
 932
 933        othersegs = other.segments()
 934        with self.lock:
 935            if len(self._segments) != len(othersegs):
 936                return False
 937            for i in range(0, len(othersegs)):
 938                seg1 = self._segments[i]
 939                seg2 = othersegs[i]
 940                loc1 = seg1.locator
 941                loc2 = seg2.locator
 942
 943                if self.parent._my_block_manager().is_bufferblock(loc1):
 944                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 945
 946                if other.parent._my_block_manager().is_bufferblock(loc2):
 947                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 948
 949                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 950                    seg1.range_start != seg2.range_start or
 951                    seg1.range_size != seg2.range_size or
 952                    seg1.segment_offset != seg2.segment_offset):
 953                    return False
 954
 955        return True
 956
 957    def __ne__(self, other):
 958        return not self.__eq__(other)
 959
 960    @synchronized
 961    def set_segments(self, segs):
 962        self._segments = segs
 963
 964    @synchronized
 965    def set_committed(self, value=True):
 966        """Set committed flag.
 967
 968        If value is True, set committed to be True.
 969
 970        If value is False, set committed to be False for this and all parents.
 971        """
 972        if value == self._committed:
 973            return
 974        self._committed = value
 975        if self._committed is False and self.parent is not None:
 976            self.parent.set_committed(False)
 977
 978    @synchronized
 979    def committed(self):
 980        """Get whether this is committed or not."""
 981        return self._committed
 982
 983    @synchronized
 984    def add_writer(self, writer):
 985        """Add an ArvadosFileWriter reference to the list of writers"""
 986        if isinstance(writer, ArvadosFileWriter):
 987            self._writers.add(writer)
 988
 989    @synchronized
 990    def remove_writer(self, writer, flush):
 991        """
 992        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 993        and do some block maintenance tasks.
 994        """
 995        self._writers.remove(writer)
 996
 997        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 998            # File writer closed, not small enough for repacking
 999            self.flush()
1000        elif self.closed():
1001            # All writers closed and size is adequate for repacking
1002            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1003
1004    def closed(self):
1005        """
1006        Get whether this is closed or not. When the writers list is empty, the file
1007        is supposed to be closed.
1008        """
1009        return len(self._writers) == 0
1010
1011    @must_be_writable
1012    @synchronized
1013    def truncate(self, size):
1014        """Shrink or expand the size of the file.
1015
1016        If `size` is less than the size of the file, the file contents after
1017        `size` will be discarded.  If `size` is greater than the current size
1018        of the file, it will be filled with zero bytes.
1019
1020        """
1021        if size < self.size():
1022            new_segs = []
1023            for r in self._segments:
1024                range_end = r.range_start+r.range_size
1025                if r.range_start >= size:
1026                    # segment is past the trucate size, all done
1027                    break
1028                elif size < range_end:
1029                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1030                    nr.segment_offset = r.segment_offset
1031                    new_segs.append(nr)
1032                    break
1033                else:
1034                    new_segs.append(r)
1035
1036            self._segments = new_segs
1037            self.set_committed(False)
1038        elif size > self.size():
1039            padding = self.parent._my_block_manager().get_padding_block()
1040            diff = size - self.size()
1041            while diff > config.KEEP_BLOCK_SIZE:
1042                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1043                diff -= config.KEEP_BLOCK_SIZE
1044            if diff > 0:
1045                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1046            self.set_committed(False)
1047        else:
1048            # size == self.size()
1049            pass
1050
1051    def readfrom(self, offset, size, num_retries, exact=False):
1052        """Read up to `size` bytes from the file starting at `offset`.
1053
1054        :exact:
1055         If False (default), return less data than requested if the read
1056         crosses a block boundary and the next block isn't cached.  If True,
1057         only return less data than requested when hitting EOF.
1058        """
1059
1060        with self.lock:
1061            if size == 0 or offset >= self.size():
1062                return b''
1063            readsegs = locators_and_ranges(self._segments, offset, size)
1064
1065            prefetch = None
1066            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1067            if prefetch_lookahead:
1068                # Doing prefetch on every read() call is surprisingly expensive
1069                # when we're trying to deliver data at 600+ MiBps and want
1070                # the read() fast path to be as lightweight as possible.
1071                #
1072                # Only prefetching every 128 read operations
1073                # dramatically reduces the overhead while still
1074                # getting the benefit of prefetching (e.g. when
1075                # reading 128 KiB at a time, it checks for prefetch
1076                # every 16 MiB).
1077                self._read_counter = (self._read_counter+1) % 128
1078                if self._read_counter == 1:
1079                    prefetch = locators_and_ranges(self._segments,
1080                                                   offset + size,
1081                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1082                                                   limit=(1+prefetch_lookahead))
1083
1084        locs = set()
1085        data = []
1086        for lr in readsegs:
1087            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1088            if block:
1089                blockview = memoryview(block)
1090                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1091                locs.add(lr.locator)
1092            else:
1093                break
1094
1095        if prefetch:
1096            for lr in prefetch:
1097                if lr.locator not in locs:
1098                    self.parent._my_block_manager().block_prefetch(lr.locator)
1099                    locs.add(lr.locator)
1100
1101        if len(data) == 1:
1102            return data[0]
1103        else:
1104            return b''.join(data)
1105
1106    @must_be_writable
1107    @synchronized
1108    def writeto(self, offset, data, num_retries):
1109        """Write `data` to the file starting at `offset`.
1110
1111        This will update existing bytes and/or extend the size of the file as
1112        necessary.
1113
1114        """
1115        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1116            data = data.encode()
1117        if len(data) == 0:
1118            return
1119
1120        if offset > self.size():
1121            self.truncate(offset)
1122
1123        if len(data) > config.KEEP_BLOCK_SIZE:
1124            # Chunk it up into smaller writes
1125            n = 0
1126            dataview = memoryview(data)
1127            while n < len(data):
1128                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1129                n += config.KEEP_BLOCK_SIZE
1130            return
1131
1132        self.set_committed(False)
1133
1134        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1135            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1136
1137        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1138            self._current_bblock.repack_writes()
1139            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1140                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1141                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1142
1143        self._current_bblock.append(data)
1144
1145        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1146
1147        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1148
1149        return len(data)
1150
1151    @synchronized
1152    def flush(self, sync=True, num_retries=0):
1153        """Flush the current bufferblock to Keep.
1154
1155        :sync:
1156          If True, commit block synchronously, wait until buffer block has been written.
1157          If False, commit block asynchronously, return immediately after putting block into
1158          the keep put queue.
1159        """
1160        if self.committed():
1161            return
1162
1163        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1164            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1165                self._current_bblock.repack_writes()
1166            if self._current_bblock.state() != _BufferBlock.DELETED:
1167                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1168
1169        if sync:
1170            to_delete = set()
1171            for s in self._segments:
1172                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1173                if bb:
1174                    if bb.state() != _BufferBlock.COMMITTED:
1175                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1176                    to_delete.add(s.locator)
1177                    s.locator = bb.locator()
1178            for s in to_delete:
1179                # Don't delete the bufferblock if it's owned by many files. It'll be
1180                # deleted after all of its owners are flush()ed.
1181                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1182                    self.parent._my_block_manager().delete_bufferblock(s)
1183
1184        self.parent.notify(MOD, self.parent, self.name, (self, self))
1185
1186    @must_be_writable
1187    @synchronized
1188    def add_segment(self, blocks, pos, size):
1189        """Add a segment to the end of the file.
1190
1191        `pos` and `offset` reference a section of the stream described by
1192        `blocks` (a list of Range objects)
1193
1194        """
1195        self._add_segment(blocks, pos, size)
1196
1197    def _add_segment(self, blocks, pos, size):
1198        """Internal implementation of add_segment."""
1199        self.set_committed(False)
1200        for lr in locators_and_ranges(blocks, pos, size):
1201            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1202            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1203            self._segments.append(r)
1204
1205    @synchronized
1206    def size(self):
1207        """Get the file size."""
1208        if self._segments:
1209            n = self._segments[-1]
1210            return n.range_start + n.range_size
1211        else:
1212            return 0
1213
1214    @synchronized
1215    def manifest_text(self, stream_name=".", portable_locators=False,
1216                      normalize=False, only_committed=False):
1217        buf = ""
1218        filestream = []
1219        for segment in self._segments:
1220            loc = segment.locator
1221            if self.parent._my_block_manager().is_bufferblock(loc):
1222                if only_committed:
1223                    continue
1224                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1225            if portable_locators:
1226                loc = KeepLocator(loc).stripped()
1227            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1228                                 segment.segment_offset, segment.range_size))
1229        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1230        buf += "\n"
1231        return buf
1232
1233    @must_be_writable
1234    @synchronized
1235    def _reparent(self, newparent, newname):
1236        self.set_committed(False)
1237        self.flush(sync=True)
1238        self.parent.remove(self.name)
1239        self.parent = newparent
1240        self.name = newname
1241        self.lock = self.parent.root_collection().lock
1242
1243
1244class ArvadosFileReader(ArvadosFileReaderBase):
1245    """Wraps ArvadosFile in a file-like object supporting reading only.
1246
1247    Be aware that this class is NOT thread safe as there is no locking around
1248    updating file pointer.
1249
1250    """
1251
1252    def __init__(self, arvadosfile, mode="r", num_retries=None):
1253        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1254        self.arvadosfile = arvadosfile
1255
1256    def size(self):
1257        return self.arvadosfile.size()
1258
1259    def stream_name(self):
1260        return self.arvadosfile.parent.stream_name()
1261
1262    def readinto(self, b):
1263        data = self.read(len(b))
1264        b[:len(data)] = data
1265        return len(data)
1266
1267    @_FileLikeObjectBase._before_close
1268    @retry_method
1269    def read(self, size=None, num_retries=None):
1270        """Read up to `size` bytes from the file and return the result.
1271
1272        Starts at the current file position.  If `size` is None, read the
1273        entire remainder of the file.
1274        """
1275        if size is None:
1276            data = []
1277            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1278            while rd:
1279                data.append(rd)
1280                self._filepos += len(rd)
1281                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1282            return b''.join(data)
1283        else:
1284            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1285            self._filepos += len(data)
1286            return data
1287
1288    @_FileLikeObjectBase._before_close
1289    @retry_method
1290    def readfrom(self, offset, size, num_retries=None):
1291        """Read up to `size` bytes from the stream, starting at the specified file offset.
1292
1293        This method does not change the file position.
1294        """
1295        return self.arvadosfile.readfrom(offset, size, num_retries)
1296
1297    def flush(self):
1298        pass
1299
1300
1301class ArvadosFileWriter(ArvadosFileReader):
1302    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1303
1304    Be aware that this class is NOT thread safe as there is no locking around
1305    updating file pointer.
1306
1307    """
1308
1309    def __init__(self, arvadosfile, mode, num_retries=None):
1310        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1311        self.arvadosfile.add_writer(self)
1312
1313    def writable(self):
1314        return True
1315
1316    @_FileLikeObjectBase._before_close
1317    @retry_method
1318    def write(self, data, num_retries=None):
1319        if self.mode[0] == "a":
1320            self._filepos = self.size()
1321        self.arvadosfile.writeto(self._filepos, data, num_retries)
1322        self._filepos += len(data)
1323        return len(data)
1324
1325    @_FileLikeObjectBase._before_close
1326    @retry_method
1327    def writelines(self, seq, num_retries=None):
1328        for s in seq:
1329            self.write(s, num_retries=num_retries)
1330
1331    @_FileLikeObjectBase._before_close
1332    def truncate(self, size=None):
1333        if size is None:
1334            size = self._filepos
1335        self.arvadosfile.truncate(size)
1336
1337    @_FileLikeObjectBase._before_close
1338    def flush(self):
1339        self.arvadosfile.flush()
1340
1341    def close(self, flush=True):
1342        if not self.closed:
1343            self.arvadosfile.remove_writer(self, flush)
1344            super(ArvadosFileWriter, self).close()
1345
1346
1347class WrappableFile(object):
1348    """An interface to an Arvados file that's compatible with io wrappers.
1349
1350    """
1351    def __init__(self, f):
1352        self.f = f
1353        self.closed = False
1354    def close(self):
1355        self.closed = True
1356        return self.f.close()
1357    def flush(self):
1358        return self.f.flush()
1359    def read(self, *args, **kwargs):
1360        return self.f.read(*args, **kwargs)
1361    def readable(self):
1362        return self.f.readable()
1363    def readinto(self, *args, **kwargs):
1364        return self.f.readinto(*args, **kwargs)
1365    def seek(self, *args, **kwargs):
1366        return self.f.seek(*args, **kwargs)
1367    def seekable(self):
1368        return self.f.seekable()
1369    def tell(self):
1370        return self.f.tell()
1371    def writable(self):
1372        return self.f.writable()
1373    def write(self, *args, **kwargs):
1374        return self.f.write(*args, **kwargs)
MOD = 'mod'
WRITE = 'write'
def split(path):
40def split(path):
41    """split(path) -> streamname, filename
42
43    Separate the stream name and file name in a /-separated stream path and
44    return a tuple (stream_name, file_name).  If no stream name is available,
45    assume '.'.
46
47    """
48    try:
49        stream_name, file_name = path.rsplit('/', 1)
50    except ValueError:  # No / in string
51        stream_name, file_name = '.', path
52    return stream_name, file_name

split(path) -> streamname, filename

Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.

class UnownedBlockError(builtins.Exception):
55class UnownedBlockError(Exception):
56    """Raised when there's an writable block without an owner on the BlockManager."""
57    pass

Raised when there’s an writable block without an owner on the BlockManager.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvadosFileReaderBase(_FileLikeObjectBase):
 89class ArvadosFileReaderBase(_FileLikeObjectBase):
 90    def __init__(self, name, mode, num_retries=None):
 91        super(ArvadosFileReaderBase, self).__init__(name, mode)
 92        self._filepos = 0
 93        self.num_retries = num_retries
 94        self._readline_cache = (None, None)
 95
 96    def __iter__(self):
 97        while True:
 98            data = self.readline()
 99            if not data:
100                break
101            yield data
102
103    def decompressed_name(self):
104        return re.sub(r'\.(bz2|gz)$', '', self.name)
105
106    @_FileLikeObjectBase._before_close
107    def seek(self, pos, whence=os.SEEK_SET):
108        if whence == os.SEEK_CUR:
109            pos += self._filepos
110        elif whence == os.SEEK_END:
111            pos += self.size()
112        if pos < 0:
113            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
114        self._filepos = pos
115        return self._filepos
116
117    def tell(self):
118        return self._filepos
119
120    def readable(self):
121        return True
122
123    def writable(self):
124        return False
125
126    def seekable(self):
127        return True
128
129    @_FileLikeObjectBase._before_close
130    @retry_method
131    def readall(self, size=2**20, num_retries=None):
132        while True:
133            data = self.read(size, num_retries=num_retries)
134            if len(data) == 0:
135                break
136            yield data
137
138    @_FileLikeObjectBase._before_close
139    @retry_method
140    def readline(self, size=float('inf'), num_retries=None):
141        cache_pos, cache_data = self._readline_cache
142        if self.tell() == cache_pos:
143            data = [cache_data]
144            self._filepos += len(cache_data)
145        else:
146            data = [b'']
147        data_size = len(data[-1])
148        while (data_size < size) and (b'\n' not in data[-1]):
149            next_read = self.read(2 ** 20, num_retries=num_retries)
150            if not next_read:
151                break
152            data.append(next_read)
153            data_size += len(next_read)
154        data = b''.join(data)
155        try:
156            nextline_index = data.index(b'\n') + 1
157        except ValueError:
158            nextline_index = len(data)
159        nextline_index = min(nextline_index, size)
160        self._filepos -= len(data) - nextline_index
161        self._readline_cache = (self.tell(), data[nextline_index:])
162        return data[:nextline_index].decode()
163
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def decompress(self, decompress, size, num_retries=None):
167        for segment in self.readall(size, num_retries=num_retries):
168            data = decompress(segment)
169            if data:
170                yield data
171
172    @_FileLikeObjectBase._before_close
173    @retry_method
174    def readall_decompressed(self, size=2**20, num_retries=None):
175        self.seek(0)
176        if self.name.endswith('.bz2'):
177            dc = bz2.BZ2Decompressor()
178            return self.decompress(dc.decompress, size,
179                                   num_retries=num_retries)
180        elif self.name.endswith('.gz'):
181            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
182            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
183                                   size, num_retries=num_retries)
184        else:
185            return self.readall(size, num_retries=num_retries)
186
187    @_FileLikeObjectBase._before_close
188    @retry_method
189    def readlines(self, sizehint=float('inf'), num_retries=None):
190        data = []
191        data_size = 0
192        for s in self.readall(num_retries=num_retries):
193            data.append(s)
194            data_size += len(s)
195            if data_size >= sizehint:
196                break
197        return b''.join(data).decode().splitlines(True)
198
199    def size(self):
200        raise IOError(errno.ENOSYS, "Not implemented")
201
202    def read(self, size, num_retries=None):
203        raise IOError(errno.ENOSYS, "Not implemented")
204
205    def readfrom(self, start, size, num_retries=None):
206        raise IOError(errno.ENOSYS, "Not implemented")
ArvadosFileReaderBase(name, mode, num_retries=None)
90    def __init__(self, name, mode, num_retries=None):
91        super(ArvadosFileReaderBase, self).__init__(name, mode)
92        self._filepos = 0
93        self.num_retries = num_retries
94        self._readline_cache = (None, None)
num_retries
def decompressed_name(self):
103    def decompressed_name(self):
104        return re.sub(r'\.(bz2|gz)$', '', self.name)
def seek(self, pos, whence=0):
106    @_FileLikeObjectBase._before_close
107    def seek(self, pos, whence=os.SEEK_SET):
108        if whence == os.SEEK_CUR:
109            pos += self._filepos
110        elif whence == os.SEEK_END:
111            pos += self.size()
112        if pos < 0:
113            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
114        self._filepos = pos
115        return self._filepos
def tell(self):
117    def tell(self):
118        return self._filepos
def readable(self):
120    def readable(self):
121        return True
def writable(self):
123    def writable(self):
124        return False
def seekable(self):
126    def seekable(self):
127        return True
@retry_method
def readall(self, size=1048576, num_retries=None):
129    @_FileLikeObjectBase._before_close
130    @retry_method
131    def readall(self, size=2**20, num_retries=None):
132        while True:
133            data = self.read(size, num_retries=num_retries)
134            if len(data) == 0:
135                break
136            yield data
@retry_method
def readline(self, size=inf, num_retries=None):
138    @_FileLikeObjectBase._before_close
139    @retry_method
140    def readline(self, size=float('inf'), num_retries=None):
141        cache_pos, cache_data = self._readline_cache
142        if self.tell() == cache_pos:
143            data = [cache_data]
144            self._filepos += len(cache_data)
145        else:
146            data = [b'']
147        data_size = len(data[-1])
148        while (data_size < size) and (b'\n' not in data[-1]):
149            next_read = self.read(2 ** 20, num_retries=num_retries)
150            if not next_read:
151                break
152            data.append(next_read)
153            data_size += len(next_read)
154        data = b''.join(data)
155        try:
156            nextline_index = data.index(b'\n') + 1
157        except ValueError:
158            nextline_index = len(data)
159        nextline_index = min(nextline_index, size)
160        self._filepos -= len(data) - nextline_index
161        self._readline_cache = (self.tell(), data[nextline_index:])
162        return data[:nextline_index].decode()
@retry_method
def decompress(self, decompress, size, num_retries=None):
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def decompress(self, decompress, size, num_retries=None):
167        for segment in self.readall(size, num_retries=num_retries):
168            data = decompress(segment)
169            if data:
170                yield data
@retry_method
def readall_decompressed(self, size=1048576, num_retries=None):
172    @_FileLikeObjectBase._before_close
173    @retry_method
174    def readall_decompressed(self, size=2**20, num_retries=None):
175        self.seek(0)
176        if self.name.endswith('.bz2'):
177            dc = bz2.BZ2Decompressor()
178            return self.decompress(dc.decompress, size,
179                                   num_retries=num_retries)
180        elif self.name.endswith('.gz'):
181            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
182            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
183                                   size, num_retries=num_retries)
184        else:
185            return self.readall(size, num_retries=num_retries)
@retry_method
def readlines(self, sizehint=inf, num_retries=None):
187    @_FileLikeObjectBase._before_close
188    @retry_method
189    def readlines(self, sizehint=float('inf'), num_retries=None):
190        data = []
191        data_size = 0
192        for s in self.readall(num_retries=num_retries):
193            data.append(s)
194            data_size += len(s)
195            if data_size >= sizehint:
196                break
197        return b''.join(data).decode().splitlines(True)
def size(self):
199    def size(self):
200        raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
202    def read(self, size, num_retries=None):
203        raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
205    def readfrom(self, start, size, num_retries=None):
206        raise IOError(errno.ENOSYS, "Not implemented")
class StreamFileReader(ArvadosFileReaderBase):
209class StreamFileReader(ArvadosFileReaderBase):
210    class _NameAttribute(str):
211        # The Python file API provides a plain .name attribute.
212        # Older SDK provided a name() method.
213        # This class provides both, for maximum compatibility.
214        def __call__(self):
215            return self
216
217    def __init__(self, stream, segments, name):
218        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
219        self._stream = stream
220        self.segments = segments
221
222    def stream_name(self):
223        return self._stream.name()
224
225    def size(self):
226        n = self.segments[-1]
227        return n.range_start + n.range_size
228
229    @_FileLikeObjectBase._before_close
230    @retry_method
231    def read(self, size, num_retries=None):
232        """Read up to 'size' bytes from the stream, starting at the current file position"""
233        if size == 0:
234            return b''
235
236        data = b''
237        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
238        if available_chunks:
239            lr = available_chunks[0]
240            data = self._stream.readfrom(lr.locator+lr.segment_offset,
241                                         lr.segment_size,
242                                         num_retries=num_retries)
243
244        self._filepos += len(data)
245        return data
246
247    @_FileLikeObjectBase._before_close
248    @retry_method
249    def readfrom(self, start, size, num_retries=None):
250        """Read up to 'size' bytes from the stream, starting at 'start'"""
251        if size == 0:
252            return b''
253
254        data = []
255        for lr in locators_and_ranges(self.segments, start, size):
256            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
257                                              num_retries=num_retries))
258        return b''.join(data)
259
260    def as_manifest(self):
261        segs = []
262        for r in self.segments:
263            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
264        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
StreamFileReader(stream, segments, name)
217    def __init__(self, stream, segments, name):
218        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
219        self._stream = stream
220        self.segments = segments
segments
def stream_name(self):
222    def stream_name(self):
223        return self._stream.name()
def size(self):
225    def size(self):
226        n = self.segments[-1]
227        return n.range_start + n.range_size
@retry_method
def read(self, size, num_retries=None):
229    @_FileLikeObjectBase._before_close
230    @retry_method
231    def read(self, size, num_retries=None):
232        """Read up to 'size' bytes from the stream, starting at the current file position"""
233        if size == 0:
234            return b''
235
236        data = b''
237        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
238        if available_chunks:
239            lr = available_chunks[0]
240            data = self._stream.readfrom(lr.locator+lr.segment_offset,
241                                         lr.segment_size,
242                                         num_retries=num_retries)
243
244        self._filepos += len(data)
245        return data

Read up to ‘size’ bytes from the stream, starting at the current file position

@retry_method
def readfrom(self, start, size, num_retries=None):
247    @_FileLikeObjectBase._before_close
248    @retry_method
249    def readfrom(self, start, size, num_retries=None):
250        """Read up to 'size' bytes from the stream, starting at 'start'"""
251        if size == 0:
252            return b''
253
254        data = []
255        for lr in locators_and_ranges(self.segments, start, size):
256            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
257                                              num_retries=num_retries))
258        return b''.join(data)

Read up to ‘size’ bytes from the stream, starting at ‘start’

def as_manifest(self):
260    def as_manifest(self):
261        segs = []
262        for r in self.segments:
263            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
264        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
def synchronized(orig_func):
267def synchronized(orig_func):
268    @functools.wraps(orig_func)
269    def synchronized_wrapper(self, *args, **kwargs):
270        with self.lock:
271            return orig_func(self, *args, **kwargs)
272    return synchronized_wrapper
class StateChangeError(builtins.Exception):
275class StateChangeError(Exception):
276    def __init__(self, message, state, nextstate):
277        super(StateChangeError, self).__init__(message)
278        self.state = state
279        self.nextstate = nextstate

Common base class for all non-exit exceptions.

StateChangeError(message, state, nextstate)
276    def __init__(self, message, state, nextstate):
277        super(StateChangeError, self).__init__(message)
278        self.state = state
279        self.nextstate = nextstate
state
nextstate
Inherited Members
builtins.BaseException
with_traceback
args
class NoopLock:
451class NoopLock(object):
452    def __enter__(self):
453        return self
454
455    def __exit__(self, exc_type, exc_value, traceback):
456        pass
457
458    def acquire(self, blocking=False):
459        pass
460
461    def release(self):
462        pass
def acquire(self, blocking=False):
458    def acquire(self, blocking=False):
459        pass
def release(self):
461    def release(self):
462        pass
def must_be_writable(orig_func):
465def must_be_writable(orig_func):
466    @functools.wraps(orig_func)
467    def must_be_writable_wrapper(self, *args, **kwargs):
468        if not self.writable():
469            raise IOError(errno.EROFS, "Collection is read-only.")
470        return orig_func(self, *args, **kwargs)
471    return must_be_writable_wrapper
class ArvadosFile:
 817class ArvadosFile(object):
 818    """Represent a file in a Collection.
 819
 820    ArvadosFile manages the underlying representation of a file in Keep as a
 821    sequence of segments spanning a set of blocks, and implements random
 822    read/write access.
 823
 824    This object may be accessed from multiple threads.
 825
 826    """
 827
 828    __slots__ = ('parent', 'name', '_writers', '_committed',
 829                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 830
 831    def __init__(self, parent, name, stream=[], segments=[]):
 832        """
 833        ArvadosFile constructor.
 834
 835        :stream:
 836          a list of Range objects representing a block stream
 837
 838        :segments:
 839          a list of Range objects representing segments
 840        """
 841        self.parent = parent
 842        self.name = name
 843        self._writers = set()
 844        self._committed = False
 845        self._segments = []
 846        self.lock = parent.root_collection().lock
 847        for s in segments:
 848            self._add_segment(stream, s.locator, s.range_size)
 849        self._current_bblock = None
 850        self._read_counter = 0
 851
 852    def writable(self):
 853        return self.parent.writable()
 854
 855    @synchronized
 856    def permission_expired(self, as_of_dt=None):
 857        """Returns True if any of the segment's locators is expired"""
 858        for r in self._segments:
 859            if KeepLocator(r.locator).permission_expired(as_of_dt):
 860                return True
 861        return False
 862
 863    @synchronized
 864    def has_remote_blocks(self):
 865        """Returns True if any of the segment's locators has a +R signature"""
 866
 867        for s in self._segments:
 868            if '+R' in s.locator:
 869                return True
 870        return False
 871
 872    @synchronized
 873    def _copy_remote_blocks(self, remote_blocks={}):
 874        """Ask Keep to copy remote blocks and point to their local copies.
 875
 876        This is called from the parent Collection.
 877
 878        :remote_blocks:
 879            Shared cache of remote to local block mappings. This is used to avoid
 880            doing extra work when blocks are shared by more than one file in
 881            different subdirectories.
 882        """
 883
 884        for s in self._segments:
 885            if '+R' in s.locator:
 886                try:
 887                    loc = remote_blocks[s.locator]
 888                except KeyError:
 889                    loc = self.parent._my_keep().refresh_signature(s.locator)
 890                    remote_blocks[s.locator] = loc
 891                s.locator = loc
 892                self.parent.set_committed(False)
 893        return remote_blocks
 894
 895    @synchronized
 896    def segments(self):
 897        return copy.copy(self._segments)
 898
 899    @synchronized
 900    def clone(self, new_parent, new_name):
 901        """Make a copy of this file."""
 902        cp = ArvadosFile(new_parent, new_name)
 903        cp.replace_contents(self)
 904        return cp
 905
 906    @must_be_writable
 907    @synchronized
 908    def replace_contents(self, other):
 909        """Replace segments of this file with segments from another `ArvadosFile` object."""
 910
 911        map_loc = {}
 912        self._segments = []
 913        for other_segment in other.segments():
 914            new_loc = other_segment.locator
 915            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 916                if other_segment.locator not in map_loc:
 917                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 918                    if bufferblock.state() != _BufferBlock.WRITABLE:
 919                        map_loc[other_segment.locator] = bufferblock.locator()
 920                    else:
 921                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 922                new_loc = map_loc[other_segment.locator]
 923
 924            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 925
 926        self.set_committed(False)
 927
 928    def __eq__(self, other):
 929        if other is self:
 930            return True
 931        if not isinstance(other, ArvadosFile):
 932            return False
 933
 934        othersegs = other.segments()
 935        with self.lock:
 936            if len(self._segments) != len(othersegs):
 937                return False
 938            for i in range(0, len(othersegs)):
 939                seg1 = self._segments[i]
 940                seg2 = othersegs[i]
 941                loc1 = seg1.locator
 942                loc2 = seg2.locator
 943
 944                if self.parent._my_block_manager().is_bufferblock(loc1):
 945                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 946
 947                if other.parent._my_block_manager().is_bufferblock(loc2):
 948                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 949
 950                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 951                    seg1.range_start != seg2.range_start or
 952                    seg1.range_size != seg2.range_size or
 953                    seg1.segment_offset != seg2.segment_offset):
 954                    return False
 955
 956        return True
 957
 958    def __ne__(self, other):
 959        return not self.__eq__(other)
 960
 961    @synchronized
 962    def set_segments(self, segs):
 963        self._segments = segs
 964
 965    @synchronized
 966    def set_committed(self, value=True):
 967        """Set committed flag.
 968
 969        If value is True, set committed to be True.
 970
 971        If value is False, set committed to be False for this and all parents.
 972        """
 973        if value == self._committed:
 974            return
 975        self._committed = value
 976        if self._committed is False and self.parent is not None:
 977            self.parent.set_committed(False)
 978
 979    @synchronized
 980    def committed(self):
 981        """Get whether this is committed or not."""
 982        return self._committed
 983
 984    @synchronized
 985    def add_writer(self, writer):
 986        """Add an ArvadosFileWriter reference to the list of writers"""
 987        if isinstance(writer, ArvadosFileWriter):
 988            self._writers.add(writer)
 989
 990    @synchronized
 991    def remove_writer(self, writer, flush):
 992        """
 993        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 994        and do some block maintenance tasks.
 995        """
 996        self._writers.remove(writer)
 997
 998        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 999            # File writer closed, not small enough for repacking
1000            self.flush()
1001        elif self.closed():
1002            # All writers closed and size is adequate for repacking
1003            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1004
1005    def closed(self):
1006        """
1007        Get whether this is closed or not. When the writers list is empty, the file
1008        is supposed to be closed.
1009        """
1010        return len(self._writers) == 0
1011
1012    @must_be_writable
1013    @synchronized
1014    def truncate(self, size):
1015        """Shrink or expand the size of the file.
1016
1017        If `size` is less than the size of the file, the file contents after
1018        `size` will be discarded.  If `size` is greater than the current size
1019        of the file, it will be filled with zero bytes.
1020
1021        """
1022        if size < self.size():
1023            new_segs = []
1024            for r in self._segments:
1025                range_end = r.range_start+r.range_size
1026                if r.range_start >= size:
1027                    # segment is past the trucate size, all done
1028                    break
1029                elif size < range_end:
1030                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1031                    nr.segment_offset = r.segment_offset
1032                    new_segs.append(nr)
1033                    break
1034                else:
1035                    new_segs.append(r)
1036
1037            self._segments = new_segs
1038            self.set_committed(False)
1039        elif size > self.size():
1040            padding = self.parent._my_block_manager().get_padding_block()
1041            diff = size - self.size()
1042            while diff > config.KEEP_BLOCK_SIZE:
1043                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1044                diff -= config.KEEP_BLOCK_SIZE
1045            if diff > 0:
1046                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1047            self.set_committed(False)
1048        else:
1049            # size == self.size()
1050            pass
1051
1052    def readfrom(self, offset, size, num_retries, exact=False):
1053        """Read up to `size` bytes from the file starting at `offset`.
1054
1055        :exact:
1056         If False (default), return less data than requested if the read
1057         crosses a block boundary and the next block isn't cached.  If True,
1058         only return less data than requested when hitting EOF.
1059        """
1060
1061        with self.lock:
1062            if size == 0 or offset >= self.size():
1063                return b''
1064            readsegs = locators_and_ranges(self._segments, offset, size)
1065
1066            prefetch = None
1067            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1068            if prefetch_lookahead:
1069                # Doing prefetch on every read() call is surprisingly expensive
1070                # when we're trying to deliver data at 600+ MiBps and want
1071                # the read() fast path to be as lightweight as possible.
1072                #
1073                # Only prefetching every 128 read operations
1074                # dramatically reduces the overhead while still
1075                # getting the benefit of prefetching (e.g. when
1076                # reading 128 KiB at a time, it checks for prefetch
1077                # every 16 MiB).
1078                self._read_counter = (self._read_counter+1) % 128
1079                if self._read_counter == 1:
1080                    prefetch = locators_and_ranges(self._segments,
1081                                                   offset + size,
1082                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1083                                                   limit=(1+prefetch_lookahead))
1084
1085        locs = set()
1086        data = []
1087        for lr in readsegs:
1088            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1089            if block:
1090                blockview = memoryview(block)
1091                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1092                locs.add(lr.locator)
1093            else:
1094                break
1095
1096        if prefetch:
1097            for lr in prefetch:
1098                if lr.locator not in locs:
1099                    self.parent._my_block_manager().block_prefetch(lr.locator)
1100                    locs.add(lr.locator)
1101
1102        if len(data) == 1:
1103            return data[0]
1104        else:
1105            return b''.join(data)
1106
1107    @must_be_writable
1108    @synchronized
1109    def writeto(self, offset, data, num_retries):
1110        """Write `data` to the file starting at `offset`.
1111
1112        This will update existing bytes and/or extend the size of the file as
1113        necessary.
1114
1115        """
1116        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1117            data = data.encode()
1118        if len(data) == 0:
1119            return
1120
1121        if offset > self.size():
1122            self.truncate(offset)
1123
1124        if len(data) > config.KEEP_BLOCK_SIZE:
1125            # Chunk it up into smaller writes
1126            n = 0
1127            dataview = memoryview(data)
1128            while n < len(data):
1129                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1130                n += config.KEEP_BLOCK_SIZE
1131            return
1132
1133        self.set_committed(False)
1134
1135        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1136            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1137
1138        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1139            self._current_bblock.repack_writes()
1140            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1141                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1142                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1143
1144        self._current_bblock.append(data)
1145
1146        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1147
1148        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1149
1150        return len(data)
1151
1152    @synchronized
1153    def flush(self, sync=True, num_retries=0):
1154        """Flush the current bufferblock to Keep.
1155
1156        :sync:
1157          If True, commit block synchronously, wait until buffer block has been written.
1158          If False, commit block asynchronously, return immediately after putting block into
1159          the keep put queue.
1160        """
1161        if self.committed():
1162            return
1163
1164        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1165            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1166                self._current_bblock.repack_writes()
1167            if self._current_bblock.state() != _BufferBlock.DELETED:
1168                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1169
1170        if sync:
1171            to_delete = set()
1172            for s in self._segments:
1173                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1174                if bb:
1175                    if bb.state() != _BufferBlock.COMMITTED:
1176                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1177                    to_delete.add(s.locator)
1178                    s.locator = bb.locator()
1179            for s in to_delete:
1180                # Don't delete the bufferblock if it's owned by many files. It'll be
1181                # deleted after all of its owners are flush()ed.
1182                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1183                    self.parent._my_block_manager().delete_bufferblock(s)
1184
1185        self.parent.notify(MOD, self.parent, self.name, (self, self))
1186
1187    @must_be_writable
1188    @synchronized
1189    def add_segment(self, blocks, pos, size):
1190        """Add a segment to the end of the file.
1191
1192        `pos` and `offset` reference a section of the stream described by
1193        `blocks` (a list of Range objects)
1194
1195        """
1196        self._add_segment(blocks, pos, size)
1197
1198    def _add_segment(self, blocks, pos, size):
1199        """Internal implementation of add_segment."""
1200        self.set_committed(False)
1201        for lr in locators_and_ranges(blocks, pos, size):
1202            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1203            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1204            self._segments.append(r)
1205
1206    @synchronized
1207    def size(self):
1208        """Get the file size."""
1209        if self._segments:
1210            n = self._segments[-1]
1211            return n.range_start + n.range_size
1212        else:
1213            return 0
1214
1215    @synchronized
1216    def manifest_text(self, stream_name=".", portable_locators=False,
1217                      normalize=False, only_committed=False):
1218        buf = ""
1219        filestream = []
1220        for segment in self._segments:
1221            loc = segment.locator
1222            if self.parent._my_block_manager().is_bufferblock(loc):
1223                if only_committed:
1224                    continue
1225                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1226            if portable_locators:
1227                loc = KeepLocator(loc).stripped()
1228            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1229                                 segment.segment_offset, segment.range_size))
1230        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1231        buf += "\n"
1232        return buf
1233
1234    @must_be_writable
1235    @synchronized
1236    def _reparent(self, newparent, newname):
1237        self.set_committed(False)
1238        self.flush(sync=True)
1239        self.parent.remove(self.name)
1240        self.parent = newparent
1241        self.name = newname
1242        self.lock = self.parent.root_collection().lock

Represent a file in a Collection.

ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.

This object may be accessed from multiple threads.

ArvadosFile(parent, name, stream=[], segments=[])
831    def __init__(self, parent, name, stream=[], segments=[]):
832        """
833        ArvadosFile constructor.
834
835        :stream:
836          a list of Range objects representing a block stream
837
838        :segments:
839          a list of Range objects representing segments
840        """
841        self.parent = parent
842        self.name = name
843        self._writers = set()
844        self._committed = False
845        self._segments = []
846        self.lock = parent.root_collection().lock
847        for s in segments:
848            self._add_segment(stream, s.locator, s.range_size)
849        self._current_bblock = None
850        self._read_counter = 0

ArvadosFile constructor.

:stream: a list of Range objects representing a block stream

:segments: a list of Range objects representing segments

parent
name
lock
def writable(self):
852    def writable(self):
853        return self.parent.writable()
@synchronized
def permission_expired(self, as_of_dt=None):
855    @synchronized
856    def permission_expired(self, as_of_dt=None):
857        """Returns True if any of the segment's locators is expired"""
858        for r in self._segments:
859            if KeepLocator(r.locator).permission_expired(as_of_dt):
860                return True
861        return False

Returns True if any of the segment’s locators is expired

@synchronized
def has_remote_blocks(self):
863    @synchronized
864    def has_remote_blocks(self):
865        """Returns True if any of the segment's locators has a +R signature"""
866
867        for s in self._segments:
868            if '+R' in s.locator:
869                return True
870        return False

Returns True if any of the segment’s locators has a +R signature

@synchronized
def segments(self):
895    @synchronized
896    def segments(self):
897        return copy.copy(self._segments)
@synchronized
def clone(self, new_parent, new_name):
899    @synchronized
900    def clone(self, new_parent, new_name):
901        """Make a copy of this file."""
902        cp = ArvadosFile(new_parent, new_name)
903        cp.replace_contents(self)
904        return cp

Make a copy of this file.

@must_be_writable
@synchronized
def replace_contents(self, other):
906    @must_be_writable
907    @synchronized
908    def replace_contents(self, other):
909        """Replace segments of this file with segments from another `ArvadosFile` object."""
910
911        map_loc = {}
912        self._segments = []
913        for other_segment in other.segments():
914            new_loc = other_segment.locator
915            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
916                if other_segment.locator not in map_loc:
917                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
918                    if bufferblock.state() != _BufferBlock.WRITABLE:
919                        map_loc[other_segment.locator] = bufferblock.locator()
920                    else:
921                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
922                new_loc = map_loc[other_segment.locator]
923
924            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
925
926        self.set_committed(False)

Replace segments of this file with segments from another ArvadosFile object.

@synchronized
def set_segments(self, segs):
961    @synchronized
962    def set_segments(self, segs):
963        self._segments = segs
@synchronized
def set_committed(self, value=True):
965    @synchronized
966    def set_committed(self, value=True):
967        """Set committed flag.
968
969        If value is True, set committed to be True.
970
971        If value is False, set committed to be False for this and all parents.
972        """
973        if value == self._committed:
974            return
975        self._committed = value
976        if self._committed is False and self.parent is not None:
977            self.parent.set_committed(False)

Set committed flag.

If value is True, set committed to be True.

If value is False, set committed to be False for this and all parents.

@synchronized
def committed(self):
979    @synchronized
980    def committed(self):
981        """Get whether this is committed or not."""
982        return self._committed

Get whether this is committed or not.

@synchronized
def add_writer(self, writer):
984    @synchronized
985    def add_writer(self, writer):
986        """Add an ArvadosFileWriter reference to the list of writers"""
987        if isinstance(writer, ArvadosFileWriter):
988            self._writers.add(writer)

Add an ArvadosFileWriter reference to the list of writers

@synchronized
def remove_writer(self, writer, flush):
 990    @synchronized
 991    def remove_writer(self, writer, flush):
 992        """
 993        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 994        and do some block maintenance tasks.
 995        """
 996        self._writers.remove(writer)
 997
 998        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 999            # File writer closed, not small enough for repacking
1000            self.flush()
1001        elif self.closed():
1002            # All writers closed and size is adequate for repacking
1003            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())

Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.

def closed(self):
1005    def closed(self):
1006        """
1007        Get whether this is closed or not. When the writers list is empty, the file
1008        is supposed to be closed.
1009        """
1010        return len(self._writers) == 0

Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.

@must_be_writable
@synchronized
def truncate(self, size):
1012    @must_be_writable
1013    @synchronized
1014    def truncate(self, size):
1015        """Shrink or expand the size of the file.
1016
1017        If `size` is less than the size of the file, the file contents after
1018        `size` will be discarded.  If `size` is greater than the current size
1019        of the file, it will be filled with zero bytes.
1020
1021        """
1022        if size < self.size():
1023            new_segs = []
1024            for r in self._segments:
1025                range_end = r.range_start+r.range_size
1026                if r.range_start >= size:
1027                    # segment is past the trucate size, all done
1028                    break
1029                elif size < range_end:
1030                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1031                    nr.segment_offset = r.segment_offset
1032                    new_segs.append(nr)
1033                    break
1034                else:
1035                    new_segs.append(r)
1036
1037            self._segments = new_segs
1038            self.set_committed(False)
1039        elif size > self.size():
1040            padding = self.parent._my_block_manager().get_padding_block()
1041            diff = size - self.size()
1042            while diff > config.KEEP_BLOCK_SIZE:
1043                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1044                diff -= config.KEEP_BLOCK_SIZE
1045            if diff > 0:
1046                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1047            self.set_committed(False)
1048        else:
1049            # size == self.size()
1050            pass

Shrink or expand the size of the file.

If size is less than the size of the file, the file contents after size will be discarded. If size is greater than the current size of the file, it will be filled with zero bytes.

def readfrom(self, offset, size, num_retries, exact=False):
1052    def readfrom(self, offset, size, num_retries, exact=False):
1053        """Read up to `size` bytes from the file starting at `offset`.
1054
1055        :exact:
1056         If False (default), return less data than requested if the read
1057         crosses a block boundary and the next block isn't cached.  If True,
1058         only return less data than requested when hitting EOF.
1059        """
1060
1061        with self.lock:
1062            if size == 0 or offset >= self.size():
1063                return b''
1064            readsegs = locators_and_ranges(self._segments, offset, size)
1065
1066            prefetch = None
1067            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1068            if prefetch_lookahead:
1069                # Doing prefetch on every read() call is surprisingly expensive
1070                # when we're trying to deliver data at 600+ MiBps and want
1071                # the read() fast path to be as lightweight as possible.
1072                #
1073                # Only prefetching every 128 read operations
1074                # dramatically reduces the overhead while still
1075                # getting the benefit of prefetching (e.g. when
1076                # reading 128 KiB at a time, it checks for prefetch
1077                # every 16 MiB).
1078                self._read_counter = (self._read_counter+1) % 128
1079                if self._read_counter == 1:
1080                    prefetch = locators_and_ranges(self._segments,
1081                                                   offset + size,
1082                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1083                                                   limit=(1+prefetch_lookahead))
1084
1085        locs = set()
1086        data = []
1087        for lr in readsegs:
1088            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1089            if block:
1090                blockview = memoryview(block)
1091                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1092                locs.add(lr.locator)
1093            else:
1094                break
1095
1096        if prefetch:
1097            for lr in prefetch:
1098                if lr.locator not in locs:
1099                    self.parent._my_block_manager().block_prefetch(lr.locator)
1100                    locs.add(lr.locator)
1101
1102        if len(data) == 1:
1103            return data[0]
1104        else:
1105            return b''.join(data)

Read up to size bytes from the file starting at offset.

:exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.

@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
1107    @must_be_writable
1108    @synchronized
1109    def writeto(self, offset, data, num_retries):
1110        """Write `data` to the file starting at `offset`.
1111
1112        This will update existing bytes and/or extend the size of the file as
1113        necessary.
1114
1115        """
1116        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1117            data = data.encode()
1118        if len(data) == 0:
1119            return
1120
1121        if offset > self.size():
1122            self.truncate(offset)
1123
1124        if len(data) > config.KEEP_BLOCK_SIZE:
1125            # Chunk it up into smaller writes
1126            n = 0
1127            dataview = memoryview(data)
1128            while n < len(data):
1129                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1130                n += config.KEEP_BLOCK_SIZE
1131            return
1132
1133        self.set_committed(False)
1134
1135        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1136            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1137
1138        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1139            self._current_bblock.repack_writes()
1140            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1141                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1142                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1143
1144        self._current_bblock.append(data)
1145
1146        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1147
1148        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1149
1150        return len(data)

Write data to the file starting at offset.

This will update existing bytes and/or extend the size of the file as necessary.

@synchronized
def flush(self, sync=True, num_retries=0):
1152    @synchronized
1153    def flush(self, sync=True, num_retries=0):
1154        """Flush the current bufferblock to Keep.
1155
1156        :sync:
1157          If True, commit block synchronously, wait until buffer block has been written.
1158          If False, commit block asynchronously, return immediately after putting block into
1159          the keep put queue.
1160        """
1161        if self.committed():
1162            return
1163
1164        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1165            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1166                self._current_bblock.repack_writes()
1167            if self._current_bblock.state() != _BufferBlock.DELETED:
1168                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1169
1170        if sync:
1171            to_delete = set()
1172            for s in self._segments:
1173                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1174                if bb:
1175                    if bb.state() != _BufferBlock.COMMITTED:
1176                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1177                    to_delete.add(s.locator)
1178                    s.locator = bb.locator()
1179            for s in to_delete:
1180                # Don't delete the bufferblock if it's owned by many files. It'll be
1181                # deleted after all of its owners are flush()ed.
1182                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1183                    self.parent._my_block_manager().delete_bufferblock(s)
1184
1185        self.parent.notify(MOD, self.parent, self.name, (self, self))

Flush the current bufferblock to Keep.

:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.

@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
1187    @must_be_writable
1188    @synchronized
1189    def add_segment(self, blocks, pos, size):
1190        """Add a segment to the end of the file.
1191
1192        `pos` and `offset` reference a section of the stream described by
1193        `blocks` (a list of Range objects)
1194
1195        """
1196        self._add_segment(blocks, pos, size)

Add a segment to the end of the file.

pos and offset reference a section of the stream described by blocks (a list of Range objects)

@synchronized
def size(self):
1206    @synchronized
1207    def size(self):
1208        """Get the file size."""
1209        if self._segments:
1210            n = self._segments[-1]
1211            return n.range_start + n.range_size
1212        else:
1213            return 0

Get the file size.

@synchronized
def manifest_text( self, stream_name='.', portable_locators=False, normalize=False, only_committed=False):
1215    @synchronized
1216    def manifest_text(self, stream_name=".", portable_locators=False,
1217                      normalize=False, only_committed=False):
1218        buf = ""
1219        filestream = []
1220        for segment in self._segments:
1221            loc = segment.locator
1222            if self.parent._my_block_manager().is_bufferblock(loc):
1223                if only_committed:
1224                    continue
1225                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1226            if portable_locators:
1227                loc = KeepLocator(loc).stripped()
1228            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1229                                 segment.segment_offset, segment.range_size))
1230        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1231        buf += "\n"
1232        return buf
fuse_entry
class ArvadosFileReader(ArvadosFileReaderBase):
1245class ArvadosFileReader(ArvadosFileReaderBase):
1246    """Wraps ArvadosFile in a file-like object supporting reading only.
1247
1248    Be aware that this class is NOT thread safe as there is no locking around
1249    updating file pointer.
1250
1251    """
1252
1253    def __init__(self, arvadosfile, mode="r", num_retries=None):
1254        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1255        self.arvadosfile = arvadosfile
1256
1257    def size(self):
1258        return self.arvadosfile.size()
1259
1260    def stream_name(self):
1261        return self.arvadosfile.parent.stream_name()
1262
1263    def readinto(self, b):
1264        data = self.read(len(b))
1265        b[:len(data)] = data
1266        return len(data)
1267
1268    @_FileLikeObjectBase._before_close
1269    @retry_method
1270    def read(self, size=None, num_retries=None):
1271        """Read up to `size` bytes from the file and return the result.
1272
1273        Starts at the current file position.  If `size` is None, read the
1274        entire remainder of the file.
1275        """
1276        if size is None:
1277            data = []
1278            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1279            while rd:
1280                data.append(rd)
1281                self._filepos += len(rd)
1282                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1283            return b''.join(data)
1284        else:
1285            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1286            self._filepos += len(data)
1287            return data
1288
1289    @_FileLikeObjectBase._before_close
1290    @retry_method
1291    def readfrom(self, offset, size, num_retries=None):
1292        """Read up to `size` bytes from the stream, starting at the specified file offset.
1293
1294        This method does not change the file position.
1295        """
1296        return self.arvadosfile.readfrom(offset, size, num_retries)
1297
1298    def flush(self):
1299        pass

Wraps ArvadosFile in a file-like object supporting reading only.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileReader(arvadosfile, mode='r', num_retries=None)
1253    def __init__(self, arvadosfile, mode="r", num_retries=None):
1254        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1255        self.arvadosfile = arvadosfile
arvadosfile
def size(self):
1257    def size(self):
1258        return self.arvadosfile.size()
def stream_name(self):
1260    def stream_name(self):
1261        return self.arvadosfile.parent.stream_name()
def readinto(self, b):
1263    def readinto(self, b):
1264        data = self.read(len(b))
1265        b[:len(data)] = data
1266        return len(data)
@retry_method
def read(self, size=None, num_retries=None):
1268    @_FileLikeObjectBase._before_close
1269    @retry_method
1270    def read(self, size=None, num_retries=None):
1271        """Read up to `size` bytes from the file and return the result.
1272
1273        Starts at the current file position.  If `size` is None, read the
1274        entire remainder of the file.
1275        """
1276        if size is None:
1277            data = []
1278            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1279            while rd:
1280                data.append(rd)
1281                self._filepos += len(rd)
1282                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1283            return b''.join(data)
1284        else:
1285            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1286            self._filepos += len(data)
1287            return data

Read up to size bytes from the file and return the result.

Starts at the current file position. If size is None, read the entire remainder of the file.

@retry_method
def readfrom(self, offset, size, num_retries=None):
1289    @_FileLikeObjectBase._before_close
1290    @retry_method
1291    def readfrom(self, offset, size, num_retries=None):
1292        """Read up to `size` bytes from the stream, starting at the specified file offset.
1293
1294        This method does not change the file position.
1295        """
1296        return self.arvadosfile.readfrom(offset, size, num_retries)

Read up to size bytes from the stream, starting at the specified file offset.

This method does not change the file position.

def flush(self):
1298    def flush(self):
1299        pass
class ArvadosFileWriter(ArvadosFileReader):
1302class ArvadosFileWriter(ArvadosFileReader):
1303    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1304
1305    Be aware that this class is NOT thread safe as there is no locking around
1306    updating file pointer.
1307
1308    """
1309
1310    def __init__(self, arvadosfile, mode, num_retries=None):
1311        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1312        self.arvadosfile.add_writer(self)
1313
1314    def writable(self):
1315        return True
1316
1317    @_FileLikeObjectBase._before_close
1318    @retry_method
1319    def write(self, data, num_retries=None):
1320        if self.mode[0] == "a":
1321            self._filepos = self.size()
1322        self.arvadosfile.writeto(self._filepos, data, num_retries)
1323        self._filepos += len(data)
1324        return len(data)
1325
1326    @_FileLikeObjectBase._before_close
1327    @retry_method
1328    def writelines(self, seq, num_retries=None):
1329        for s in seq:
1330            self.write(s, num_retries=num_retries)
1331
1332    @_FileLikeObjectBase._before_close
1333    def truncate(self, size=None):
1334        if size is None:
1335            size = self._filepos
1336        self.arvadosfile.truncate(size)
1337
1338    @_FileLikeObjectBase._before_close
1339    def flush(self):
1340        self.arvadosfile.flush()
1341
1342    def close(self, flush=True):
1343        if not self.closed:
1344            self.arvadosfile.remove_writer(self, flush)
1345            super(ArvadosFileWriter, self).close()

Wraps ArvadosFile in a file-like object supporting both reading and writing.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileWriter(arvadosfile, mode, num_retries=None)
1310    def __init__(self, arvadosfile, mode, num_retries=None):
1311        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1312        self.arvadosfile.add_writer(self)
def writable(self):
1314    def writable(self):
1315        return True
@retry_method
def write(self, data, num_retries=None):
1317    @_FileLikeObjectBase._before_close
1318    @retry_method
1319    def write(self, data, num_retries=None):
1320        if self.mode[0] == "a":
1321            self._filepos = self.size()
1322        self.arvadosfile.writeto(self._filepos, data, num_retries)
1323        self._filepos += len(data)
1324        return len(data)
@retry_method
def writelines(self, seq, num_retries=None):
1326    @_FileLikeObjectBase._before_close
1327    @retry_method
1328    def writelines(self, seq, num_retries=None):
1329        for s in seq:
1330            self.write(s, num_retries=num_retries)
def truncate(self, size=None):
1332    @_FileLikeObjectBase._before_close
1333    def truncate(self, size=None):
1334        if size is None:
1335            size = self._filepos
1336        self.arvadosfile.truncate(size)
def flush(self):
1338    @_FileLikeObjectBase._before_close
1339    def flush(self):
1340        self.arvadosfile.flush()
def close(self, flush=True):
1342    def close(self, flush=True):
1343        if not self.closed:
1344            self.arvadosfile.remove_writer(self, flush)
1345            super(ArvadosFileWriter, self).close()
class WrappableFile:
1348class WrappableFile(object):
1349    """An interface to an Arvados file that's compatible with io wrappers.
1350
1351    """
1352    def __init__(self, f):
1353        self.f = f
1354        self.closed = False
1355    def close(self):
1356        self.closed = True
1357        return self.f.close()
1358    def flush(self):
1359        return self.f.flush()
1360    def read(self, *args, **kwargs):
1361        return self.f.read(*args, **kwargs)
1362    def readable(self):
1363        return self.f.readable()
1364    def readinto(self, *args, **kwargs):
1365        return self.f.readinto(*args, **kwargs)
1366    def seek(self, *args, **kwargs):
1367        return self.f.seek(*args, **kwargs)
1368    def seekable(self):
1369        return self.f.seekable()
1370    def tell(self):
1371        return self.f.tell()
1372    def writable(self):
1373        return self.f.writable()
1374    def write(self, *args, **kwargs):
1375        return self.f.write(*args, **kwargs)

An interface to an Arvados file that’s compatible with io wrappers.

WrappableFile(f)
1352    def __init__(self, f):
1353        self.f = f
1354        self.closed = False
f
closed
def close(self):
1355    def close(self):
1356        self.closed = True
1357        return self.f.close()
def flush(self):
1358    def flush(self):
1359        return self.f.flush()
def read(self, *args, **kwargs):
1360    def read(self, *args, **kwargs):
1361        return self.f.read(*args, **kwargs)
def readable(self):
1362    def readable(self):
1363        return self.f.readable()
def readinto(self, *args, **kwargs):
1364    def readinto(self, *args, **kwargs):
1365        return self.f.readinto(*args, **kwargs)
def seek(self, *args, **kwargs):
1366    def seek(self, *args, **kwargs):
1367        return self.f.seek(*args, **kwargs)
def seekable(self):
1368    def seekable(self):
1369        return self.f.seekable()
def tell(self):
1370    def tell(self):
1371        return self.f.tell()
def writable(self):
1372    def writable(self):
1373        return self.f.writable()
def write(self, *args, **kwargs):
1374    def write(self, *args, **kwargs):
1375        return self.f.write(*args, **kwargs)