arvados.arvfile

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import bz2
   6import collections
   7import copy
   8import errno
   9import functools
  10import hashlib
  11import logging
  12import os
  13import queue
  14import re
  15import sys
  16import threading
  17import uuid
  18import zlib
  19
  20from . import config
  21from ._internal import streams
  22from .errors import KeepWriteError, AssertionError, ArgumentError
  23from .keep import KeepLocator
  24from .retry import retry_method
  25
  26MOD = "mod"
  27WRITE = "write"
  28
  29_logger = logging.getLogger('arvados.arvfile')
  30
  31def split(path):
  32    """split(path) -> streamname, filename
  33
  34    Separate the stream name and file name in a /-separated stream path and
  35    return a tuple (stream_name, file_name).  If no stream name is available,
  36    assume '.'.
  37
  38    """
  39    try:
  40        stream_name, file_name = path.rsplit('/', 1)
  41    except ValueError:  # No / in string
  42        stream_name, file_name = '.', path
  43    return stream_name, file_name
  44
  45
  46class UnownedBlockError(Exception):
  47    """Raised when there's an writable block without an owner on the BlockManager."""
  48    pass
  49
  50
  51class _FileLikeObjectBase(object):
  52    def __init__(self, name, mode):
  53        self.name = name
  54        self.mode = mode
  55        self.closed = False
  56
  57    @staticmethod
  58    def _before_close(orig_func):
  59        @functools.wraps(orig_func)
  60        def before_close_wrapper(self, *args, **kwargs):
  61            if self.closed:
  62                raise ValueError("I/O operation on closed stream file")
  63            return orig_func(self, *args, **kwargs)
  64        return before_close_wrapper
  65
  66    def __enter__(self):
  67        return self
  68
  69    def __exit__(self, exc_type, exc_value, traceback):
  70        try:
  71            self.close()
  72        except Exception:
  73            if exc_type is None:
  74                raise
  75
  76    def close(self):
  77        self.closed = True
  78
  79
  80class ArvadosFileReaderBase(_FileLikeObjectBase):
  81    def __init__(self, name, mode, num_retries=None):
  82        super(ArvadosFileReaderBase, self).__init__(name, mode)
  83        self._filepos = 0
  84        self.num_retries = num_retries
  85        self._readline_cache = (None, None)
  86
  87    def __iter__(self):
  88        while True:
  89            data = self.readline()
  90            if not data:
  91                break
  92            yield data
  93
  94    def decompressed_name(self):
  95        return re.sub(r'\.(bz2|gz)$', '', self.name)
  96
  97    @_FileLikeObjectBase._before_close
  98    def seek(self, pos, whence=os.SEEK_SET):
  99        if whence == os.SEEK_CUR:
 100            pos += self._filepos
 101        elif whence == os.SEEK_END:
 102            pos += self.size()
 103        if pos < 0:
 104            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
 105        self._filepos = pos
 106        return self._filepos
 107
 108    def tell(self):
 109        return self._filepos
 110
 111    def readable(self):
 112        return True
 113
 114    def writable(self):
 115        return False
 116
 117    def seekable(self):
 118        return True
 119
 120    @_FileLikeObjectBase._before_close
 121    @retry_method
 122    def readall(self, size=2**20, num_retries=None):
 123        while True:
 124            data = self.read(size, num_retries=num_retries)
 125            if len(data) == 0:
 126                break
 127            yield data
 128
 129    @_FileLikeObjectBase._before_close
 130    @retry_method
 131    def readline(self, size=float('inf'), num_retries=None):
 132        cache_pos, cache_data = self._readline_cache
 133        if self.tell() == cache_pos:
 134            data = [cache_data]
 135            self._filepos += len(cache_data)
 136        else:
 137            data = [b'']
 138        data_size = len(data[-1])
 139        while (data_size < size) and (b'\n' not in data[-1]):
 140            next_read = self.read(2 ** 20, num_retries=num_retries)
 141            if not next_read:
 142                break
 143            data.append(next_read)
 144            data_size += len(next_read)
 145        data = b''.join(data)
 146        try:
 147            nextline_index = data.index(b'\n') + 1
 148        except ValueError:
 149            nextline_index = len(data)
 150        nextline_index = min(nextline_index, size)
 151        self._filepos -= len(data) - nextline_index
 152        self._readline_cache = (self.tell(), data[nextline_index:])
 153        return data[:nextline_index].decode()
 154
 155    @_FileLikeObjectBase._before_close
 156    @retry_method
 157    def decompress(self, decompress, size, num_retries=None):
 158        for segment in self.readall(size, num_retries=num_retries):
 159            data = decompress(segment)
 160            if data:
 161                yield data
 162
 163    @_FileLikeObjectBase._before_close
 164    @retry_method
 165    def readall_decompressed(self, size=2**20, num_retries=None):
 166        self.seek(0)
 167        if self.name.endswith('.bz2'):
 168            dc = bz2.BZ2Decompressor()
 169            return self.decompress(dc.decompress, size,
 170                                   num_retries=num_retries)
 171        elif self.name.endswith('.gz'):
 172            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
 173            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
 174                                   size, num_retries=num_retries)
 175        else:
 176            return self.readall(size, num_retries=num_retries)
 177
 178    @_FileLikeObjectBase._before_close
 179    @retry_method
 180    def readlines(self, sizehint=float('inf'), num_retries=None):
 181        data = []
 182        data_size = 0
 183        for s in self.readall(num_retries=num_retries):
 184            data.append(s)
 185            data_size += len(s)
 186            if data_size >= sizehint:
 187                break
 188        return b''.join(data).decode().splitlines(True)
 189
 190    def size(self):
 191        raise IOError(errno.ENOSYS, "Not implemented")
 192
 193    def read(self, size, num_retries=None):
 194        raise IOError(errno.ENOSYS, "Not implemented")
 195
 196    def readfrom(self, start, size, num_retries=None):
 197        raise IOError(errno.ENOSYS, "Not implemented")
 198
 199
 200def synchronized(orig_func):
 201    @functools.wraps(orig_func)
 202    def synchronized_wrapper(self, *args, **kwargs):
 203        with self.lock:
 204            return orig_func(self, *args, **kwargs)
 205    return synchronized_wrapper
 206
 207
 208class StateChangeError(Exception):
 209    def __init__(self, message, state, nextstate):
 210        super(StateChangeError, self).__init__(message)
 211        self.state = state
 212        self.nextstate = nextstate
 213
 214class _BufferBlock(object):
 215    """A stand-in for a Keep block that is in the process of being written.
 216
 217    Writers can append to it, get the size, and compute the Keep locator.
 218    There are three valid states:
 219
 220    WRITABLE
 221      Can append to block.
 222
 223    PENDING
 224      Block is in the process of being uploaded to Keep, append is an error.
 225
 226    COMMITTED
 227      The block has been written to Keep, its internal buffer has been
 228      released, fetching the block will fetch it via keep client (since we
 229      discarded the internal copy), and identifiers referring to the BufferBlock
 230      can be replaced with the block locator.
 231
 232    """
 233
 234    WRITABLE = 0
 235    PENDING = 1
 236    COMMITTED = 2
 237    ERROR = 3
 238    DELETED = 4
 239
 240    def __init__(self, blockid, starting_capacity, owner):
 241        """
 242        :blockid:
 243          the identifier for this block
 244
 245        :starting_capacity:
 246          the initial buffer capacity
 247
 248        :owner:
 249          ArvadosFile that owns this block
 250
 251        """
 252        self.blockid = blockid
 253        self.buffer_block = bytearray(starting_capacity)
 254        self.buffer_view = memoryview(self.buffer_block)
 255        self.write_pointer = 0
 256        self._state = _BufferBlock.WRITABLE
 257        self._locator = None
 258        self.owner = owner
 259        self.lock = threading.Lock()
 260        self.wait_for_commit = threading.Event()
 261        self.error = None
 262
 263    @synchronized
 264    def append(self, data):
 265        """Append some data to the buffer.
 266
 267        Only valid if the block is in WRITABLE state.  Implements an expanding
 268        buffer, doubling capacity as needed to accomdate all the data.
 269
 270        """
 271        if self._state == _BufferBlock.WRITABLE:
 272            if not isinstance(data, bytes) and not isinstance(data, memoryview):
 273                data = data.encode()
 274            while (self.write_pointer+len(data)) > len(self.buffer_block):
 275                new_buffer_block = bytearray(len(self.buffer_block) * 2)
 276                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
 277                self.buffer_block = new_buffer_block
 278                self.buffer_view = memoryview(self.buffer_block)
 279            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
 280            self.write_pointer += len(data)
 281            self._locator = None
 282        else:
 283            raise AssertionError("Buffer block is not writable")
 284
 285    STATE_TRANSITIONS = frozenset([
 286            (WRITABLE, PENDING),
 287            (PENDING, COMMITTED),
 288            (PENDING, ERROR),
 289            (ERROR, PENDING)])
 290
 291    @synchronized
 292    def set_state(self, nextstate, val=None):
 293        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
 294            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
 295        self._state = nextstate
 296
 297        if self._state == _BufferBlock.PENDING:
 298            self.wait_for_commit.clear()
 299
 300        if self._state == _BufferBlock.COMMITTED:
 301            self._locator = val
 302            self.buffer_view = None
 303            self.buffer_block = None
 304            self.wait_for_commit.set()
 305
 306        if self._state == _BufferBlock.ERROR:
 307            self.error = val
 308            self.wait_for_commit.set()
 309
 310    @synchronized
 311    def state(self):
 312        return self._state
 313
 314    def size(self):
 315        """The amount of data written to the buffer."""
 316        return self.write_pointer
 317
 318    @synchronized
 319    def locator(self):
 320        """The Keep locator for this buffer's contents."""
 321        if self._locator is None:
 322            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
 323        return self._locator
 324
 325    @synchronized
 326    def clone(self, new_blockid, owner):
 327        if self._state == _BufferBlock.COMMITTED:
 328            raise AssertionError("Cannot duplicate committed buffer block")
 329        bufferblock = _BufferBlock(new_blockid, self.size(), owner)
 330        bufferblock.append(self.buffer_view[0:self.size()])
 331        return bufferblock
 332
 333    @synchronized
 334    def clear(self):
 335        self._state = _BufferBlock.DELETED
 336        self.owner = None
 337        self.buffer_block = None
 338        self.buffer_view = None
 339
 340    @synchronized
 341    def repack_writes(self):
 342        """Optimize buffer block by repacking segments in file sequence.
 343
 344        When the client makes random writes, they appear in the buffer block in
 345        the sequence they were written rather than the sequence they appear in
 346        the file.  This makes for inefficient, fragmented manifests.  Attempt
 347        to optimize by repacking writes in file sequence.
 348
 349        """
 350        if self._state != _BufferBlock.WRITABLE:
 351            raise AssertionError("Cannot repack non-writable block")
 352
 353        segs = self.owner.segments()
 354
 355        # Collect the segments that reference the buffer block.
 356        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
 357
 358        # Collect total data referenced by segments (could be smaller than
 359        # bufferblock size if a portion of the file was written and
 360        # then overwritten).
 361        write_total = sum([s.range_size for s in bufferblock_segs])
 362
 363        if write_total < self.size() or len(bufferblock_segs) > 1:
 364            # If there's more than one segment referencing this block, it is
 365            # due to out-of-order writes and will produce a fragmented
 366            # manifest, so try to optimize by re-packing into a new buffer.
 367            contents = self.buffer_view[0:self.write_pointer].tobytes()
 368            new_bb = _BufferBlock(None, write_total, None)
 369            for t in bufferblock_segs:
 370                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
 371                t.segment_offset = new_bb.size() - t.range_size
 372
 373            self.buffer_block = new_bb.buffer_block
 374            self.buffer_view = new_bb.buffer_view
 375            self.write_pointer = new_bb.write_pointer
 376            self._locator = None
 377            new_bb.clear()
 378            self.owner.set_segments(segs)
 379
 380    def __repr__(self):
 381        return "<BufferBlock %s>" % (self.blockid)
 382
 383
 384class NoopLock(object):
 385    def __enter__(self):
 386        return self
 387
 388    def __exit__(self, exc_type, exc_value, traceback):
 389        pass
 390
 391    def acquire(self, blocking=False):
 392        pass
 393
 394    def release(self):
 395        pass
 396
 397
 398def must_be_writable(orig_func):
 399    @functools.wraps(orig_func)
 400    def must_be_writable_wrapper(self, *args, **kwargs):
 401        if not self.writable():
 402            raise IOError(errno.EROFS, "Collection is read-only.")
 403        return orig_func(self, *args, **kwargs)
 404    return must_be_writable_wrapper
 405
 406
 407class _BlockManager(object):
 408    """BlockManager handles buffer blocks.
 409
 410    Also handles background block uploads, and background block prefetch for a
 411    Collection of ArvadosFiles.
 412
 413    """
 414
 415    DEFAULT_PUT_THREADS = 2
 416
 417    def __init__(self, keep,
 418                 copies=None,
 419                 put_threads=None,
 420                 num_retries=None,
 421                 storage_classes_func=None):
 422        """keep: KeepClient object to use"""
 423        self._keep = keep
 424        self._bufferblocks = collections.OrderedDict()
 425        self._put_queue = None
 426        self._put_threads = None
 427        self.lock = threading.Lock()
 428        self.prefetch_lookahead = self._keep.num_prefetch_threads
 429        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
 430        self.copies = copies
 431        self.storage_classes = storage_classes_func or (lambda: [])
 432        self._pending_write_size = 0
 433        self.threads_lock = threading.Lock()
 434        self.padding_block = None
 435        self.num_retries = num_retries
 436
 437    @synchronized
 438    def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 439        """Allocate a new, empty bufferblock in WRITABLE state and return it.
 440
 441        :blockid:
 442          optional block identifier, otherwise one will be automatically assigned
 443
 444        :starting_capacity:
 445          optional capacity, otherwise will use default capacity
 446
 447        :owner:
 448          ArvadosFile that owns this block
 449
 450        """
 451        return self._alloc_bufferblock(blockid, starting_capacity, owner)
 452
 453    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 454        if blockid is None:
 455            blockid = str(uuid.uuid4())
 456        bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
 457        self._bufferblocks[bufferblock.blockid] = bufferblock
 458        return bufferblock
 459
 460    @synchronized
 461    def dup_block(self, block, owner):
 462        """Create a new bufferblock initialized with the content of an existing bufferblock.
 463
 464        :block:
 465          the buffer block to copy.
 466
 467        :owner:
 468          ArvadosFile that owns the new block
 469
 470        """
 471        new_blockid = str(uuid.uuid4())
 472        bufferblock = block.clone(new_blockid, owner)
 473        self._bufferblocks[bufferblock.blockid] = bufferblock
 474        return bufferblock
 475
 476    @synchronized
 477    def is_bufferblock(self, locator):
 478        return locator in self._bufferblocks
 479
 480    def _commit_bufferblock_worker(self):
 481        """Background uploader thread."""
 482
 483        while True:
 484            try:
 485                bufferblock = self._put_queue.get()
 486                if bufferblock is None:
 487                    return
 488
 489                if self.copies is None:
 490                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 491                else:
 492                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 493                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 494            except Exception as e:
 495                bufferblock.set_state(_BufferBlock.ERROR, e)
 496            finally:
 497                if self._put_queue is not None:
 498                    self._put_queue.task_done()
 499
 500    def start_put_threads(self):
 501        with self.threads_lock:
 502            if self._put_threads is None:
 503                # Start uploader threads.
 504
 505                # If we don't limit the Queue size, the upload queue can quickly
 506                # grow to take up gigabytes of RAM if the writing process is
 507                # generating data more quickly than it can be sent to the Keep
 508                # servers.
 509                #
 510                # With two upload threads and a queue size of 2, this means up to 4
 511                # blocks pending.  If they are full 64 MiB blocks, that means up to
 512                # 256 MiB of internal buffering, which is the same size as the
 513                # default download block cache in KeepClient.
 514                self._put_queue = queue.Queue(maxsize=2)
 515
 516                self._put_threads = []
 517                for i in range(0, self.num_put_threads):
 518                    thread = threading.Thread(target=self._commit_bufferblock_worker)
 519                    self._put_threads.append(thread)
 520                    thread.daemon = True
 521                    thread.start()
 522
 523    @synchronized
 524    def stop_threads(self):
 525        """Shut down and wait for background upload and download threads to finish."""
 526
 527        if self._put_threads is not None:
 528            for t in self._put_threads:
 529                self._put_queue.put(None)
 530            for t in self._put_threads:
 531                t.join()
 532        self._put_threads = None
 533        self._put_queue = None
 534
 535    def __enter__(self):
 536        return self
 537
 538    def __exit__(self, exc_type, exc_value, traceback):
 539        self.stop_threads()
 540
 541    @synchronized
 542    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
 543        """Packs small blocks together before uploading"""
 544
 545        self._pending_write_size += closed_file_size
 546
 547        # Check if there are enough small blocks for filling up one in full
 548        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
 549            return
 550
 551        # Search blocks ready for getting packed together before being
 552        # committed to Keep.
 553        # A WRITABLE block always has an owner.
 554        # A WRITABLE block with its owner.closed() implies that its
 555        # size is <= KEEP_BLOCK_SIZE/2.
 556        try:
 557            small_blocks = [b for b in self._bufferblocks.values()
 558                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
 559        except AttributeError:
 560            # Writable blocks without owner shouldn't exist.
 561            raise UnownedBlockError()
 562
 563        if len(small_blocks) <= 1:
 564            # Not enough small blocks for repacking
 565            return
 566
 567        for bb in small_blocks:
 568            bb.repack_writes()
 569
 570        # Update the pending write size count with its true value, just in case
 571        # some small file was opened, written and closed several times.
 572        self._pending_write_size = sum([b.size() for b in small_blocks])
 573
 574        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
 575            return
 576
 577        new_bb = self._alloc_bufferblock()
 578        new_bb.owner = []
 579        files = []
 580        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
 581            bb = small_blocks.pop(0)
 582            new_bb.owner.append(bb.owner)
 583            self._pending_write_size -= bb.size()
 584            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
 585            files.append((bb, new_bb.write_pointer - bb.size()))
 586
 587        self.commit_bufferblock(new_bb, sync=sync)
 588
 589        for bb, new_bb_segment_offset in files:
 590            newsegs = bb.owner.segments()
 591            for s in newsegs:
 592                if s.locator == bb.blockid:
 593                    s.locator = new_bb.blockid
 594                    s.segment_offset = new_bb_segment_offset+s.segment_offset
 595            bb.owner.set_segments(newsegs)
 596            self._delete_bufferblock(bb.blockid)
 597
 598    def commit_bufferblock(self, block, sync):
 599        """Initiate a background upload of a bufferblock.
 600
 601        :block:
 602          The block object to upload
 603
 604        :sync:
 605          If `sync` is True, upload the block synchronously.
 606          If `sync` is False, upload the block asynchronously.  This will
 607          return immediately unless the upload queue is at capacity, in
 608          which case it will wait on an upload queue slot.
 609
 610        """
 611        try:
 612            # Mark the block as PENDING so to disallow any more appends.
 613            block.set_state(_BufferBlock.PENDING)
 614        except StateChangeError as e:
 615            if e.state == _BufferBlock.PENDING:
 616                if sync:
 617                    block.wait_for_commit.wait()
 618                else:
 619                    return
 620            if block.state() == _BufferBlock.COMMITTED:
 621                return
 622            elif block.state() == _BufferBlock.ERROR:
 623                raise block.error
 624            else:
 625                raise
 626
 627        if sync:
 628            try:
 629                if self.copies is None:
 630                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 631                else:
 632                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 633                block.set_state(_BufferBlock.COMMITTED, loc)
 634            except Exception as e:
 635                block.set_state(_BufferBlock.ERROR, e)
 636                raise
 637        else:
 638            self.start_put_threads()
 639            self._put_queue.put(block)
 640
 641    @synchronized
 642    def get_bufferblock(self, locator):
 643        return self._bufferblocks.get(locator)
 644
 645    @synchronized
 646    def get_padding_block(self):
 647        """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
 648        when using truncate() to extend the size of a file.
 649
 650        For reference (and possible future optimization), the md5sum of the
 651        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
 652
 653        """
 654
 655        if self.padding_block is None:
 656            self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
 657            self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
 658            self.commit_bufferblock(self.padding_block, False)
 659        return self.padding_block
 660
 661    @synchronized
 662    def delete_bufferblock(self, locator):
 663        self._delete_bufferblock(locator)
 664
 665    def _delete_bufferblock(self, locator):
 666        if locator in self._bufferblocks:
 667            bb = self._bufferblocks[locator]
 668            bb.clear()
 669            del self._bufferblocks[locator]
 670
 671    def get_block_contents(self, locator, num_retries, cache_only=False):
 672        """Fetch a block.
 673
 674        First checks to see if the locator is a BufferBlock and return that, if
 675        not, passes the request through to KeepClient.get().
 676
 677        """
 678        with self.lock:
 679            if locator in self._bufferblocks:
 680                bufferblock = self._bufferblocks[locator]
 681                if bufferblock.state() != _BufferBlock.COMMITTED:
 682                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
 683                else:
 684                    locator = bufferblock._locator
 685        if cache_only:
 686            return self._keep.get_from_cache(locator)
 687        else:
 688            return self._keep.get(locator, num_retries=num_retries)
 689
 690    def commit_all(self):
 691        """Commit all outstanding buffer blocks.
 692
 693        This is a synchronous call, and will not return until all buffer blocks
 694        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 695
 696        """
 697        self.repack_small_blocks(force=True, sync=True)
 698
 699        with self.lock:
 700            items = list(self._bufferblocks.items())
 701
 702        for k,v in items:
 703            if v.state() != _BufferBlock.COMMITTED and v.owner:
 704                # Ignore blocks with a list of owners, as if they're not in COMMITTED
 705                # state, they're already being committed asynchronously.
 706                if isinstance(v.owner, ArvadosFile):
 707                    v.owner.flush(sync=False)
 708
 709        with self.lock:
 710            if self._put_queue is not None:
 711                self._put_queue.join()
 712
 713                err = []
 714                for k,v in items:
 715                    if v.state() == _BufferBlock.ERROR:
 716                        err.append((v.locator(), v.error))
 717                if err:
 718                    raise KeepWriteError("Error writing some blocks", err, label="block")
 719
 720        for k,v in items:
 721            # flush again with sync=True to remove committed bufferblocks from
 722            # the segments.
 723            if v.owner:
 724                if isinstance(v.owner, ArvadosFile):
 725                    v.owner.flush(sync=True)
 726                elif isinstance(v.owner, list) and len(v.owner) > 0:
 727                    # This bufferblock is referenced by many files as a result
 728                    # of repacking small blocks, so don't delete it when flushing
 729                    # its owners, just do it after flushing them all.
 730                    for owner in v.owner:
 731                        owner.flush(sync=True)
 732                    self.delete_bufferblock(k)
 733
 734        self.stop_threads()
 735
 736    def block_prefetch(self, locator):
 737        """Initiate a background download of a block.
 738        """
 739
 740        if not self.prefetch_lookahead:
 741            return
 742
 743        with self.lock:
 744            if locator in self._bufferblocks:
 745                return
 746
 747        self._keep.block_prefetch(locator)
 748
 749
 750class ArvadosFile(object):
 751    """Represent a file in a Collection.
 752
 753    ArvadosFile manages the underlying representation of a file in Keep as a
 754    sequence of segments spanning a set of blocks, and implements random
 755    read/write access.
 756
 757    This object may be accessed from multiple threads.
 758
 759    """
 760
 761    __slots__ = ('parent', 'name', '_writers', '_committed',
 762                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 763
 764    def __init__(self, parent, name, stream=[], segments=[]):
 765        """
 766        ArvadosFile constructor.
 767
 768        :stream:
 769          a list of Range objects representing a block stream
 770
 771        :segments:
 772          a list of Range objects representing segments
 773        """
 774        self.parent = parent
 775        self.name = name
 776        self._writers = set()
 777        self._committed = False
 778        self._segments = []
 779        self.lock = parent.root_collection().lock
 780        for s in segments:
 781            self._add_segment(stream, s.locator, s.range_size)
 782        self._current_bblock = None
 783        self._read_counter = 0
 784
 785    def writable(self):
 786        return self.parent.writable()
 787
 788    @synchronized
 789    def permission_expired(self, as_of_dt=None):
 790        """Returns True if any of the segment's locators is expired"""
 791        for r in self._segments:
 792            if KeepLocator(r.locator).permission_expired(as_of_dt):
 793                return True
 794        return False
 795
 796    @synchronized
 797    def has_remote_blocks(self):
 798        """Returns True if any of the segment's locators has a +R signature"""
 799
 800        for s in self._segments:
 801            if '+R' in s.locator:
 802                return True
 803        return False
 804
 805    @synchronized
 806    def _copy_remote_blocks(self, remote_blocks={}):
 807        """Ask Keep to copy remote blocks and point to their local copies.
 808
 809        This is called from the parent Collection.
 810
 811        :remote_blocks:
 812            Shared cache of remote to local block mappings. This is used to avoid
 813            doing extra work when blocks are shared by more than one file in
 814            different subdirectories.
 815        """
 816
 817        for s in self._segments:
 818            if '+R' in s.locator:
 819                try:
 820                    loc = remote_blocks[s.locator]
 821                except KeyError:
 822                    loc = self.parent._my_keep().refresh_signature(s.locator)
 823                    remote_blocks[s.locator] = loc
 824                s.locator = loc
 825                self.parent.set_committed(False)
 826        return remote_blocks
 827
 828    @synchronized
 829    def segments(self):
 830        return copy.copy(self._segments)
 831
 832    @synchronized
 833    def clone(self, new_parent, new_name):
 834        """Make a copy of this file."""
 835        cp = ArvadosFile(new_parent, new_name)
 836        cp.replace_contents(self)
 837        return cp
 838
 839    @must_be_writable
 840    @synchronized
 841    def replace_contents(self, other):
 842        """Replace segments of this file with segments from another `ArvadosFile` object."""
 843
 844        map_loc = {}
 845        self._segments = []
 846        for other_segment in other.segments():
 847            new_loc = other_segment.locator
 848            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 849                if other_segment.locator not in map_loc:
 850                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 851                    if bufferblock.state() != _BufferBlock.WRITABLE:
 852                        map_loc[other_segment.locator] = bufferblock.locator()
 853                    else:
 854                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 855                new_loc = map_loc[other_segment.locator]
 856
 857            self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 858
 859        self.set_committed(False)
 860
 861    def __eq__(self, other):
 862        if other is self:
 863            return True
 864        if not isinstance(other, ArvadosFile):
 865            return False
 866
 867        othersegs = other.segments()
 868        with self.lock:
 869            if len(self._segments) != len(othersegs):
 870                return False
 871            for i in range(0, len(othersegs)):
 872                seg1 = self._segments[i]
 873                seg2 = othersegs[i]
 874                loc1 = seg1.locator
 875                loc2 = seg2.locator
 876
 877                if self.parent._my_block_manager().is_bufferblock(loc1):
 878                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 879
 880                if other.parent._my_block_manager().is_bufferblock(loc2):
 881                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 882
 883                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 884                    seg1.range_start != seg2.range_start or
 885                    seg1.range_size != seg2.range_size or
 886                    seg1.segment_offset != seg2.segment_offset):
 887                    return False
 888
 889        return True
 890
 891    def __ne__(self, other):
 892        return not self.__eq__(other)
 893
 894    @synchronized
 895    def set_segments(self, segs):
 896        self._segments = segs
 897
 898    @synchronized
 899    def set_committed(self, value=True):
 900        """Set committed flag.
 901
 902        If value is True, set committed to be True.
 903
 904        If value is False, set committed to be False for this and all parents.
 905        """
 906        if value == self._committed:
 907            return
 908        self._committed = value
 909        if self._committed is False and self.parent is not None:
 910            self.parent.set_committed(False)
 911
 912    @synchronized
 913    def committed(self):
 914        """Get whether this is committed or not."""
 915        return self._committed
 916
 917    @synchronized
 918    def add_writer(self, writer):
 919        """Add an ArvadosFileWriter reference to the list of writers"""
 920        if isinstance(writer, ArvadosFileWriter):
 921            self._writers.add(writer)
 922
 923    @synchronized
 924    def remove_writer(self, writer, flush):
 925        """
 926        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 927        and do some block maintenance tasks.
 928        """
 929        self._writers.remove(writer)
 930
 931        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 932            # File writer closed, not small enough for repacking
 933            self.flush()
 934        elif self.closed():
 935            # All writers closed and size is adequate for repacking
 936            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 937
 938    def closed(self):
 939        """
 940        Get whether this is closed or not. When the writers list is empty, the file
 941        is supposed to be closed.
 942        """
 943        return len(self._writers) == 0
 944
 945    @must_be_writable
 946    @synchronized
 947    def truncate(self, size):
 948        """Shrink or expand the size of the file.
 949
 950        If `size` is less than the size of the file, the file contents after
 951        `size` will be discarded.  If `size` is greater than the current size
 952        of the file, it will be filled with zero bytes.
 953
 954        """
 955        if size < self.size():
 956            new_segs = []
 957            for r in self._segments:
 958                range_end = r.range_start+r.range_size
 959                if r.range_start >= size:
 960                    # segment is past the trucate size, all done
 961                    break
 962                elif size < range_end:
 963                    nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
 964                    nr.segment_offset = r.segment_offset
 965                    new_segs.append(nr)
 966                    break
 967                else:
 968                    new_segs.append(r)
 969
 970            self._segments = new_segs
 971            self.set_committed(False)
 972        elif size > self.size():
 973            padding = self.parent._my_block_manager().get_padding_block()
 974            diff = size - self.size()
 975            while diff > config.KEEP_BLOCK_SIZE:
 976                self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
 977                diff -= config.KEEP_BLOCK_SIZE
 978            if diff > 0:
 979                self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
 980            self.set_committed(False)
 981        else:
 982            # size == self.size()
 983            pass
 984
 985    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
 986        """Read up to `size` bytes from the file starting at `offset`.
 987
 988        Arguments:
 989
 990        * exact: bool --- If False (default), return less data than
 991         requested if the read crosses a block boundary and the next
 992         block isn't cached.  If True, only return less data than
 993         requested when hitting EOF.
 994
 995        * return_memoryview: bool --- If False (default) return a
 996          `bytes` object, which may entail making a copy in some
 997          situations.  If True, return a `memoryview` object which may
 998          avoid making a copy, but may be incompatible with code
 999          expecting a `bytes` object.
1000
1001        """
1002
1003        with self.lock:
1004            if size == 0 or offset >= self.size():
1005                return memoryview(b'') if return_memoryview else b''
1006            readsegs = streams.locators_and_ranges(self._segments, offset, size)
1007
1008            prefetch = None
1009            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1010            if prefetch_lookahead:
1011                # Doing prefetch on every read() call is surprisingly expensive
1012                # when we're trying to deliver data at 600+ MiBps and want
1013                # the read() fast path to be as lightweight as possible.
1014                #
1015                # Only prefetching every 128 read operations
1016                # dramatically reduces the overhead while still
1017                # getting the benefit of prefetching (e.g. when
1018                # reading 128 KiB at a time, it checks for prefetch
1019                # every 16 MiB).
1020                self._read_counter = (self._read_counter+1) % 128
1021                if self._read_counter == 1:
1022                    prefetch = streams.locators_and_ranges(
1023                        self._segments,
1024                        offset + size,
1025                        config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1026                        limit=(1+prefetch_lookahead),
1027                    )
1028
1029        locs = set()
1030        data = []
1031        for lr in readsegs:
1032            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1033            if block:
1034                blockview = memoryview(block)
1035                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1036                locs.add(lr.locator)
1037            else:
1038                break
1039
1040        if prefetch:
1041            for lr in prefetch:
1042                if lr.locator not in locs:
1043                    self.parent._my_block_manager().block_prefetch(lr.locator)
1044                    locs.add(lr.locator)
1045
1046        if len(data) == 1:
1047            return data[0] if return_memoryview else data[0].tobytes()
1048        else:
1049            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1050
1051
1052    @must_be_writable
1053    @synchronized
1054    def writeto(self, offset, data, num_retries):
1055        """Write `data` to the file starting at `offset`.
1056
1057        This will update existing bytes and/or extend the size of the file as
1058        necessary.
1059
1060        """
1061        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1062            data = data.encode()
1063        if len(data) == 0:
1064            return
1065
1066        if offset > self.size():
1067            self.truncate(offset)
1068
1069        if len(data) > config.KEEP_BLOCK_SIZE:
1070            # Chunk it up into smaller writes
1071            n = 0
1072            dataview = memoryview(data)
1073            while n < len(data):
1074                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1075                n += config.KEEP_BLOCK_SIZE
1076            return
1077
1078        self.set_committed(False)
1079
1080        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1081            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1082
1083        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1084            self._current_bblock.repack_writes()
1085            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1086                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1087                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1088
1089        self._current_bblock.append(data)
1090        streams.replace_range(
1091            self._segments,
1092            offset,
1093            len(data),
1094            self._current_bblock.blockid,
1095            self._current_bblock.write_pointer - len(data),
1096        )
1097        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1098        return len(data)
1099
1100    @synchronized
1101    def flush(self, sync=True, num_retries=0):
1102        """Flush the current bufferblock to Keep.
1103
1104        :sync:
1105          If True, commit block synchronously, wait until buffer block has been written.
1106          If False, commit block asynchronously, return immediately after putting block into
1107          the keep put queue.
1108        """
1109        if self.committed():
1110            return
1111
1112        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1113            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1114                self._current_bblock.repack_writes()
1115            if self._current_bblock.state() != _BufferBlock.DELETED:
1116                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1117
1118        if sync:
1119            to_delete = set()
1120            for s in self._segments:
1121                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1122                if bb:
1123                    if bb.state() != _BufferBlock.COMMITTED:
1124                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1125                    to_delete.add(s.locator)
1126                    s.locator = bb.locator()
1127            for s in to_delete:
1128                # Don't delete the bufferblock if it's owned by many files. It'll be
1129                # deleted after all of its owners are flush()ed.
1130                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1131                    self.parent._my_block_manager().delete_bufferblock(s)
1132
1133        self.parent.notify(MOD, self.parent, self.name, (self, self))
1134
1135    @must_be_writable
1136    @synchronized
1137    def add_segment(self, blocks, pos, size):
1138        """Add a segment to the end of the file.
1139
1140        `pos` and `offset` reference a section of the stream described by
1141        `blocks` (a list of Range objects)
1142
1143        """
1144        self._add_segment(blocks, pos, size)
1145
1146    def _add_segment(self, blocks, pos, size):
1147        """Internal implementation of add_segment."""
1148        self.set_committed(False)
1149        for lr in streams.locators_and_ranges(blocks, pos, size):
1150            last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0)
1151            r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1152            self._segments.append(r)
1153
1154    @synchronized
1155    def size(self):
1156        """Get the file size."""
1157        if self._segments:
1158            n = self._segments[-1]
1159            return n.range_start + n.range_size
1160        else:
1161            return 0
1162
1163    @synchronized
1164    def manifest_text(self, stream_name=".", portable_locators=False,
1165                      normalize=False, only_committed=False):
1166        buf = ""
1167        filestream = []
1168        for segment in self._segments:
1169            loc = segment.locator
1170            if self.parent._my_block_manager().is_bufferblock(loc):
1171                if only_committed:
1172                    continue
1173                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1174            if portable_locators:
1175                loc = KeepLocator(loc).stripped()
1176            filestream.append(streams.LocatorAndRange(
1177                loc,
1178                KeepLocator(loc).size,
1179                segment.segment_offset,
1180                segment.range_size,
1181            ))
1182        buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1183        buf += "\n"
1184        return buf
1185
1186    @must_be_writable
1187    @synchronized
1188    def _reparent(self, newparent, newname):
1189        self.set_committed(False)
1190        self.flush(sync=True)
1191        self.parent.remove(self.name)
1192        self.parent = newparent
1193        self.name = newname
1194        self.lock = self.parent.root_collection().lock
1195
1196
1197class ArvadosFileReader(ArvadosFileReaderBase):
1198    """Wraps ArvadosFile in a file-like object supporting reading only.
1199
1200    Be aware that this class is NOT thread safe as there is no locking around
1201    updating file pointer.
1202
1203    """
1204
1205    def __init__(self, arvadosfile, mode="r", num_retries=None):
1206        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1207        self.arvadosfile = arvadosfile
1208
1209    def size(self):
1210        return self.arvadosfile.size()
1211
1212    def stream_name(self):
1213        return self.arvadosfile.parent.stream_name()
1214
1215    def readinto(self, b):
1216        data = self.read(len(b))
1217        b[:len(data)] = data
1218        return len(data)
1219
1220    @_FileLikeObjectBase._before_close
1221    @retry_method
1222    def read(self, size=-1, num_retries=None, return_memoryview=False):
1223        """Read up to `size` bytes from the file and return the result.
1224
1225        Starts at the current file position.  If `size` is negative or None,
1226        read the entire remainder of the file.
1227
1228        Returns None if the file pointer is at the end of the file.
1229
1230        Returns a `bytes` object, unless `return_memoryview` is True,
1231        in which case it returns a memory view, which may avoid an
1232        unnecessary data copy in some situations.
1233
1234        """
1235        if size < 0 or size is None:
1236            data = []
1237            #
1238            # specify exact=False, return_memoryview=True here so that we
1239            # only copy data once into the final buffer.
1240            #
1241            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1242            while rd:
1243                data.append(rd)
1244                self._filepos += len(rd)
1245                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1246            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1247        else:
1248            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1249            self._filepos += len(data)
1250            return data
1251
1252    @_FileLikeObjectBase._before_close
1253    @retry_method
1254    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1255        """Read up to `size` bytes from the stream, starting at the specified file offset.
1256
1257        This method does not change the file position.
1258
1259        Returns a `bytes` object, unless `return_memoryview` is True,
1260        in which case it returns a memory view, which may avoid an
1261        unnecessary data copy in some situations.
1262
1263        """
1264        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1265
1266    def flush(self):
1267        pass
1268
1269
1270class ArvadosFileWriter(ArvadosFileReader):
1271    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1272
1273    Be aware that this class is NOT thread safe as there is no locking around
1274    updating file pointer.
1275
1276    """
1277
1278    def __init__(self, arvadosfile, mode, num_retries=None):
1279        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1280        self.arvadosfile.add_writer(self)
1281
1282    def writable(self):
1283        return True
1284
1285    @_FileLikeObjectBase._before_close
1286    @retry_method
1287    def write(self, data, num_retries=None):
1288        if self.mode[0] == "a":
1289            self._filepos = self.size()
1290        self.arvadosfile.writeto(self._filepos, data, num_retries)
1291        self._filepos += len(data)
1292        return len(data)
1293
1294    @_FileLikeObjectBase._before_close
1295    @retry_method
1296    def writelines(self, seq, num_retries=None):
1297        for s in seq:
1298            self.write(s, num_retries=num_retries)
1299
1300    @_FileLikeObjectBase._before_close
1301    def truncate(self, size=None):
1302        if size is None:
1303            size = self._filepos
1304        self.arvadosfile.truncate(size)
1305
1306    @_FileLikeObjectBase._before_close
1307    def flush(self):
1308        self.arvadosfile.flush()
1309
1310    def close(self, flush=True):
1311        if not self.closed:
1312            self.arvadosfile.remove_writer(self, flush)
1313            super(ArvadosFileWriter, self).close()
1314
1315
1316class WrappableFile(object):
1317    """An interface to an Arvados file that's compatible with io wrappers.
1318
1319    """
1320    def __init__(self, f):
1321        self.f = f
1322        self.closed = False
1323    def close(self):
1324        self.closed = True
1325        return self.f.close()
1326    def flush(self):
1327        return self.f.flush()
1328    def read(self, *args, **kwargs):
1329        return self.f.read(*args, **kwargs)
1330    def readable(self):
1331        return self.f.readable()
1332    def readinto(self, *args, **kwargs):
1333        return self.f.readinto(*args, **kwargs)
1334    def seek(self, *args, **kwargs):
1335        return self.f.seek(*args, **kwargs)
1336    def seekable(self):
1337        return self.f.seekable()
1338    def tell(self):
1339        return self.f.tell()
1340    def writable(self):
1341        return self.f.writable()
1342    def write(self, *args, **kwargs):
1343        return self.f.write(*args, **kwargs)
MOD = 'mod'
WRITE = 'write'
def split(path):
32def split(path):
33    """split(path) -> streamname, filename
34
35    Separate the stream name and file name in a /-separated stream path and
36    return a tuple (stream_name, file_name).  If no stream name is available,
37    assume '.'.
38
39    """
40    try:
41        stream_name, file_name = path.rsplit('/', 1)
42    except ValueError:  # No / in string
43        stream_name, file_name = '.', path
44    return stream_name, file_name

split(path) -> streamname, filename

Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.

class UnownedBlockError(builtins.Exception):
47class UnownedBlockError(Exception):
48    """Raised when there's an writable block without an owner on the BlockManager."""
49    pass

Raised when there’s an writable block without an owner on the BlockManager.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvadosFileReaderBase(_FileLikeObjectBase):
 81class ArvadosFileReaderBase(_FileLikeObjectBase):
 82    def __init__(self, name, mode, num_retries=None):
 83        super(ArvadosFileReaderBase, self).__init__(name, mode)
 84        self._filepos = 0
 85        self.num_retries = num_retries
 86        self._readline_cache = (None, None)
 87
 88    def __iter__(self):
 89        while True:
 90            data = self.readline()
 91            if not data:
 92                break
 93            yield data
 94
 95    def decompressed_name(self):
 96        return re.sub(r'\.(bz2|gz)$', '', self.name)
 97
 98    @_FileLikeObjectBase._before_close
 99    def seek(self, pos, whence=os.SEEK_SET):
100        if whence == os.SEEK_CUR:
101            pos += self._filepos
102        elif whence == os.SEEK_END:
103            pos += self.size()
104        if pos < 0:
105            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
106        self._filepos = pos
107        return self._filepos
108
109    def tell(self):
110        return self._filepos
111
112    def readable(self):
113        return True
114
115    def writable(self):
116        return False
117
118    def seekable(self):
119        return True
120
121    @_FileLikeObjectBase._before_close
122    @retry_method
123    def readall(self, size=2**20, num_retries=None):
124        while True:
125            data = self.read(size, num_retries=num_retries)
126            if len(data) == 0:
127                break
128            yield data
129
130    @_FileLikeObjectBase._before_close
131    @retry_method
132    def readline(self, size=float('inf'), num_retries=None):
133        cache_pos, cache_data = self._readline_cache
134        if self.tell() == cache_pos:
135            data = [cache_data]
136            self._filepos += len(cache_data)
137        else:
138            data = [b'']
139        data_size = len(data[-1])
140        while (data_size < size) and (b'\n' not in data[-1]):
141            next_read = self.read(2 ** 20, num_retries=num_retries)
142            if not next_read:
143                break
144            data.append(next_read)
145            data_size += len(next_read)
146        data = b''.join(data)
147        try:
148            nextline_index = data.index(b'\n') + 1
149        except ValueError:
150            nextline_index = len(data)
151        nextline_index = min(nextline_index, size)
152        self._filepos -= len(data) - nextline_index
153        self._readline_cache = (self.tell(), data[nextline_index:])
154        return data[:nextline_index].decode()
155
156    @_FileLikeObjectBase._before_close
157    @retry_method
158    def decompress(self, decompress, size, num_retries=None):
159        for segment in self.readall(size, num_retries=num_retries):
160            data = decompress(segment)
161            if data:
162                yield data
163
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def readall_decompressed(self, size=2**20, num_retries=None):
167        self.seek(0)
168        if self.name.endswith('.bz2'):
169            dc = bz2.BZ2Decompressor()
170            return self.decompress(dc.decompress, size,
171                                   num_retries=num_retries)
172        elif self.name.endswith('.gz'):
173            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
174            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
175                                   size, num_retries=num_retries)
176        else:
177            return self.readall(size, num_retries=num_retries)
178
179    @_FileLikeObjectBase._before_close
180    @retry_method
181    def readlines(self, sizehint=float('inf'), num_retries=None):
182        data = []
183        data_size = 0
184        for s in self.readall(num_retries=num_retries):
185            data.append(s)
186            data_size += len(s)
187            if data_size >= sizehint:
188                break
189        return b''.join(data).decode().splitlines(True)
190
191    def size(self):
192        raise IOError(errno.ENOSYS, "Not implemented")
193
194    def read(self, size, num_retries=None):
195        raise IOError(errno.ENOSYS, "Not implemented")
196
197    def readfrom(self, start, size, num_retries=None):
198        raise IOError(errno.ENOSYS, "Not implemented")
ArvadosFileReaderBase(name, mode, num_retries=None)
82    def __init__(self, name, mode, num_retries=None):
83        super(ArvadosFileReaderBase, self).__init__(name, mode)
84        self._filepos = 0
85        self.num_retries = num_retries
86        self._readline_cache = (None, None)
num_retries
def decompressed_name(self):
95    def decompressed_name(self):
96        return re.sub(r'\.(bz2|gz)$', '', self.name)
def seek(self, pos, whence=0):
 98    @_FileLikeObjectBase._before_close
 99    def seek(self, pos, whence=os.SEEK_SET):
100        if whence == os.SEEK_CUR:
101            pos += self._filepos
102        elif whence == os.SEEK_END:
103            pos += self.size()
104        if pos < 0:
105            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
106        self._filepos = pos
107        return self._filepos
def tell(self):
109    def tell(self):
110        return self._filepos
def readable(self):
112    def readable(self):
113        return True
def writable(self):
115    def writable(self):
116        return False
def seekable(self):
118    def seekable(self):
119        return True
@retry_method
def readall(self, size=1048576, num_retries=None):
121    @_FileLikeObjectBase._before_close
122    @retry_method
123    def readall(self, size=2**20, num_retries=None):
124        while True:
125            data = self.read(size, num_retries=num_retries)
126            if len(data) == 0:
127                break
128            yield data
@retry_method
def readline(self, size=inf, num_retries=None):
130    @_FileLikeObjectBase._before_close
131    @retry_method
132    def readline(self, size=float('inf'), num_retries=None):
133        cache_pos, cache_data = self._readline_cache
134        if self.tell() == cache_pos:
135            data = [cache_data]
136            self._filepos += len(cache_data)
137        else:
138            data = [b'']
139        data_size = len(data[-1])
140        while (data_size < size) and (b'\n' not in data[-1]):
141            next_read = self.read(2 ** 20, num_retries=num_retries)
142            if not next_read:
143                break
144            data.append(next_read)
145            data_size += len(next_read)
146        data = b''.join(data)
147        try:
148            nextline_index = data.index(b'\n') + 1
149        except ValueError:
150            nextline_index = len(data)
151        nextline_index = min(nextline_index, size)
152        self._filepos -= len(data) - nextline_index
153        self._readline_cache = (self.tell(), data[nextline_index:])
154        return data[:nextline_index].decode()
@retry_method
def decompress(self, decompress, size, num_retries=None):
156    @_FileLikeObjectBase._before_close
157    @retry_method
158    def decompress(self, decompress, size, num_retries=None):
159        for segment in self.readall(size, num_retries=num_retries):
160            data = decompress(segment)
161            if data:
162                yield data
@retry_method
def readall_decompressed(self, size=1048576, num_retries=None):
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def readall_decompressed(self, size=2**20, num_retries=None):
167        self.seek(0)
168        if self.name.endswith('.bz2'):
169            dc = bz2.BZ2Decompressor()
170            return self.decompress(dc.decompress, size,
171                                   num_retries=num_retries)
172        elif self.name.endswith('.gz'):
173            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
174            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
175                                   size, num_retries=num_retries)
176        else:
177            return self.readall(size, num_retries=num_retries)
@retry_method
def readlines(self, sizehint=inf, num_retries=None):
179    @_FileLikeObjectBase._before_close
180    @retry_method
181    def readlines(self, sizehint=float('inf'), num_retries=None):
182        data = []
183        data_size = 0
184        for s in self.readall(num_retries=num_retries):
185            data.append(s)
186            data_size += len(s)
187            if data_size >= sizehint:
188                break
189        return b''.join(data).decode().splitlines(True)
def size(self):
191    def size(self):
192        raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
194    def read(self, size, num_retries=None):
195        raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
197    def readfrom(self, start, size, num_retries=None):
198        raise IOError(errno.ENOSYS, "Not implemented")
def synchronized(orig_func):
201def synchronized(orig_func):
202    @functools.wraps(orig_func)
203    def synchronized_wrapper(self, *args, **kwargs):
204        with self.lock:
205            return orig_func(self, *args, **kwargs)
206    return synchronized_wrapper
class StateChangeError(builtins.Exception):
209class StateChangeError(Exception):
210    def __init__(self, message, state, nextstate):
211        super(StateChangeError, self).__init__(message)
212        self.state = state
213        self.nextstate = nextstate

Common base class for all non-exit exceptions.

StateChangeError(message, state, nextstate)
210    def __init__(self, message, state, nextstate):
211        super(StateChangeError, self).__init__(message)
212        self.state = state
213        self.nextstate = nextstate
state
nextstate
Inherited Members
builtins.BaseException
with_traceback
args
class NoopLock:
385class NoopLock(object):
386    def __enter__(self):
387        return self
388
389    def __exit__(self, exc_type, exc_value, traceback):
390        pass
391
392    def acquire(self, blocking=False):
393        pass
394
395    def release(self):
396        pass
def acquire(self, blocking=False):
392    def acquire(self, blocking=False):
393        pass
def release(self):
395    def release(self):
396        pass
def must_be_writable(orig_func):
399def must_be_writable(orig_func):
400    @functools.wraps(orig_func)
401    def must_be_writable_wrapper(self, *args, **kwargs):
402        if not self.writable():
403            raise IOError(errno.EROFS, "Collection is read-only.")
404        return orig_func(self, *args, **kwargs)
405    return must_be_writable_wrapper
class ArvadosFile:
 751class ArvadosFile(object):
 752    """Represent a file in a Collection.
 753
 754    ArvadosFile manages the underlying representation of a file in Keep as a
 755    sequence of segments spanning a set of blocks, and implements random
 756    read/write access.
 757
 758    This object may be accessed from multiple threads.
 759
 760    """
 761
 762    __slots__ = ('parent', 'name', '_writers', '_committed',
 763                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 764
 765    def __init__(self, parent, name, stream=[], segments=[]):
 766        """
 767        ArvadosFile constructor.
 768
 769        :stream:
 770          a list of Range objects representing a block stream
 771
 772        :segments:
 773          a list of Range objects representing segments
 774        """
 775        self.parent = parent
 776        self.name = name
 777        self._writers = set()
 778        self._committed = False
 779        self._segments = []
 780        self.lock = parent.root_collection().lock
 781        for s in segments:
 782            self._add_segment(stream, s.locator, s.range_size)
 783        self._current_bblock = None
 784        self._read_counter = 0
 785
 786    def writable(self):
 787        return self.parent.writable()
 788
 789    @synchronized
 790    def permission_expired(self, as_of_dt=None):
 791        """Returns True if any of the segment's locators is expired"""
 792        for r in self._segments:
 793            if KeepLocator(r.locator).permission_expired(as_of_dt):
 794                return True
 795        return False
 796
 797    @synchronized
 798    def has_remote_blocks(self):
 799        """Returns True if any of the segment's locators has a +R signature"""
 800
 801        for s in self._segments:
 802            if '+R' in s.locator:
 803                return True
 804        return False
 805
 806    @synchronized
 807    def _copy_remote_blocks(self, remote_blocks={}):
 808        """Ask Keep to copy remote blocks and point to their local copies.
 809
 810        This is called from the parent Collection.
 811
 812        :remote_blocks:
 813            Shared cache of remote to local block mappings. This is used to avoid
 814            doing extra work when blocks are shared by more than one file in
 815            different subdirectories.
 816        """
 817
 818        for s in self._segments:
 819            if '+R' in s.locator:
 820                try:
 821                    loc = remote_blocks[s.locator]
 822                except KeyError:
 823                    loc = self.parent._my_keep().refresh_signature(s.locator)
 824                    remote_blocks[s.locator] = loc
 825                s.locator = loc
 826                self.parent.set_committed(False)
 827        return remote_blocks
 828
 829    @synchronized
 830    def segments(self):
 831        return copy.copy(self._segments)
 832
 833    @synchronized
 834    def clone(self, new_parent, new_name):
 835        """Make a copy of this file."""
 836        cp = ArvadosFile(new_parent, new_name)
 837        cp.replace_contents(self)
 838        return cp
 839
 840    @must_be_writable
 841    @synchronized
 842    def replace_contents(self, other):
 843        """Replace segments of this file with segments from another `ArvadosFile` object."""
 844
 845        map_loc = {}
 846        self._segments = []
 847        for other_segment in other.segments():
 848            new_loc = other_segment.locator
 849            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 850                if other_segment.locator not in map_loc:
 851                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 852                    if bufferblock.state() != _BufferBlock.WRITABLE:
 853                        map_loc[other_segment.locator] = bufferblock.locator()
 854                    else:
 855                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 856                new_loc = map_loc[other_segment.locator]
 857
 858            self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 859
 860        self.set_committed(False)
 861
 862    def __eq__(self, other):
 863        if other is self:
 864            return True
 865        if not isinstance(other, ArvadosFile):
 866            return False
 867
 868        othersegs = other.segments()
 869        with self.lock:
 870            if len(self._segments) != len(othersegs):
 871                return False
 872            for i in range(0, len(othersegs)):
 873                seg1 = self._segments[i]
 874                seg2 = othersegs[i]
 875                loc1 = seg1.locator
 876                loc2 = seg2.locator
 877
 878                if self.parent._my_block_manager().is_bufferblock(loc1):
 879                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 880
 881                if other.parent._my_block_manager().is_bufferblock(loc2):
 882                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 883
 884                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 885                    seg1.range_start != seg2.range_start or
 886                    seg1.range_size != seg2.range_size or
 887                    seg1.segment_offset != seg2.segment_offset):
 888                    return False
 889
 890        return True
 891
 892    def __ne__(self, other):
 893        return not self.__eq__(other)
 894
 895    @synchronized
 896    def set_segments(self, segs):
 897        self._segments = segs
 898
 899    @synchronized
 900    def set_committed(self, value=True):
 901        """Set committed flag.
 902
 903        If value is True, set committed to be True.
 904
 905        If value is False, set committed to be False for this and all parents.
 906        """
 907        if value == self._committed:
 908            return
 909        self._committed = value
 910        if self._committed is False and self.parent is not None:
 911            self.parent.set_committed(False)
 912
 913    @synchronized
 914    def committed(self):
 915        """Get whether this is committed or not."""
 916        return self._committed
 917
 918    @synchronized
 919    def add_writer(self, writer):
 920        """Add an ArvadosFileWriter reference to the list of writers"""
 921        if isinstance(writer, ArvadosFileWriter):
 922            self._writers.add(writer)
 923
 924    @synchronized
 925    def remove_writer(self, writer, flush):
 926        """
 927        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 928        and do some block maintenance tasks.
 929        """
 930        self._writers.remove(writer)
 931
 932        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 933            # File writer closed, not small enough for repacking
 934            self.flush()
 935        elif self.closed():
 936            # All writers closed and size is adequate for repacking
 937            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 938
 939    def closed(self):
 940        """
 941        Get whether this is closed or not. When the writers list is empty, the file
 942        is supposed to be closed.
 943        """
 944        return len(self._writers) == 0
 945
 946    @must_be_writable
 947    @synchronized
 948    def truncate(self, size):
 949        """Shrink or expand the size of the file.
 950
 951        If `size` is less than the size of the file, the file contents after
 952        `size` will be discarded.  If `size` is greater than the current size
 953        of the file, it will be filled with zero bytes.
 954
 955        """
 956        if size < self.size():
 957            new_segs = []
 958            for r in self._segments:
 959                range_end = r.range_start+r.range_size
 960                if r.range_start >= size:
 961                    # segment is past the trucate size, all done
 962                    break
 963                elif size < range_end:
 964                    nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
 965                    nr.segment_offset = r.segment_offset
 966                    new_segs.append(nr)
 967                    break
 968                else:
 969                    new_segs.append(r)
 970
 971            self._segments = new_segs
 972            self.set_committed(False)
 973        elif size > self.size():
 974            padding = self.parent._my_block_manager().get_padding_block()
 975            diff = size - self.size()
 976            while diff > config.KEEP_BLOCK_SIZE:
 977                self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
 978                diff -= config.KEEP_BLOCK_SIZE
 979            if diff > 0:
 980                self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
 981            self.set_committed(False)
 982        else:
 983            # size == self.size()
 984            pass
 985
 986    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
 987        """Read up to `size` bytes from the file starting at `offset`.
 988
 989        Arguments:
 990
 991        * exact: bool --- If False (default), return less data than
 992         requested if the read crosses a block boundary and the next
 993         block isn't cached.  If True, only return less data than
 994         requested when hitting EOF.
 995
 996        * return_memoryview: bool --- If False (default) return a
 997          `bytes` object, which may entail making a copy in some
 998          situations.  If True, return a `memoryview` object which may
 999          avoid making a copy, but may be incompatible with code
1000          expecting a `bytes` object.
1001
1002        """
1003
1004        with self.lock:
1005            if size == 0 or offset >= self.size():
1006                return memoryview(b'') if return_memoryview else b''
1007            readsegs = streams.locators_and_ranges(self._segments, offset, size)
1008
1009            prefetch = None
1010            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1011            if prefetch_lookahead:
1012                # Doing prefetch on every read() call is surprisingly expensive
1013                # when we're trying to deliver data at 600+ MiBps and want
1014                # the read() fast path to be as lightweight as possible.
1015                #
1016                # Only prefetching every 128 read operations
1017                # dramatically reduces the overhead while still
1018                # getting the benefit of prefetching (e.g. when
1019                # reading 128 KiB at a time, it checks for prefetch
1020                # every 16 MiB).
1021                self._read_counter = (self._read_counter+1) % 128
1022                if self._read_counter == 1:
1023                    prefetch = streams.locators_and_ranges(
1024                        self._segments,
1025                        offset + size,
1026                        config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1027                        limit=(1+prefetch_lookahead),
1028                    )
1029
1030        locs = set()
1031        data = []
1032        for lr in readsegs:
1033            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1034            if block:
1035                blockview = memoryview(block)
1036                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1037                locs.add(lr.locator)
1038            else:
1039                break
1040
1041        if prefetch:
1042            for lr in prefetch:
1043                if lr.locator not in locs:
1044                    self.parent._my_block_manager().block_prefetch(lr.locator)
1045                    locs.add(lr.locator)
1046
1047        if len(data) == 1:
1048            return data[0] if return_memoryview else data[0].tobytes()
1049        else:
1050            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1051
1052
1053    @must_be_writable
1054    @synchronized
1055    def writeto(self, offset, data, num_retries):
1056        """Write `data` to the file starting at `offset`.
1057
1058        This will update existing bytes and/or extend the size of the file as
1059        necessary.
1060
1061        """
1062        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1063            data = data.encode()
1064        if len(data) == 0:
1065            return
1066
1067        if offset > self.size():
1068            self.truncate(offset)
1069
1070        if len(data) > config.KEEP_BLOCK_SIZE:
1071            # Chunk it up into smaller writes
1072            n = 0
1073            dataview = memoryview(data)
1074            while n < len(data):
1075                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1076                n += config.KEEP_BLOCK_SIZE
1077            return
1078
1079        self.set_committed(False)
1080
1081        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1082            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1083
1084        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1085            self._current_bblock.repack_writes()
1086            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1087                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1088                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1089
1090        self._current_bblock.append(data)
1091        streams.replace_range(
1092            self._segments,
1093            offset,
1094            len(data),
1095            self._current_bblock.blockid,
1096            self._current_bblock.write_pointer - len(data),
1097        )
1098        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1099        return len(data)
1100
1101    @synchronized
1102    def flush(self, sync=True, num_retries=0):
1103        """Flush the current bufferblock to Keep.
1104
1105        :sync:
1106          If True, commit block synchronously, wait until buffer block has been written.
1107          If False, commit block asynchronously, return immediately after putting block into
1108          the keep put queue.
1109        """
1110        if self.committed():
1111            return
1112
1113        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1114            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1115                self._current_bblock.repack_writes()
1116            if self._current_bblock.state() != _BufferBlock.DELETED:
1117                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1118
1119        if sync:
1120            to_delete = set()
1121            for s in self._segments:
1122                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1123                if bb:
1124                    if bb.state() != _BufferBlock.COMMITTED:
1125                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1126                    to_delete.add(s.locator)
1127                    s.locator = bb.locator()
1128            for s in to_delete:
1129                # Don't delete the bufferblock if it's owned by many files. It'll be
1130                # deleted after all of its owners are flush()ed.
1131                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1132                    self.parent._my_block_manager().delete_bufferblock(s)
1133
1134        self.parent.notify(MOD, self.parent, self.name, (self, self))
1135
1136    @must_be_writable
1137    @synchronized
1138    def add_segment(self, blocks, pos, size):
1139        """Add a segment to the end of the file.
1140
1141        `pos` and `offset` reference a section of the stream described by
1142        `blocks` (a list of Range objects)
1143
1144        """
1145        self._add_segment(blocks, pos, size)
1146
1147    def _add_segment(self, blocks, pos, size):
1148        """Internal implementation of add_segment."""
1149        self.set_committed(False)
1150        for lr in streams.locators_and_ranges(blocks, pos, size):
1151            last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0)
1152            r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1153            self._segments.append(r)
1154
1155    @synchronized
1156    def size(self):
1157        """Get the file size."""
1158        if self._segments:
1159            n = self._segments[-1]
1160            return n.range_start + n.range_size
1161        else:
1162            return 0
1163
1164    @synchronized
1165    def manifest_text(self, stream_name=".", portable_locators=False,
1166                      normalize=False, only_committed=False):
1167        buf = ""
1168        filestream = []
1169        for segment in self._segments:
1170            loc = segment.locator
1171            if self.parent._my_block_manager().is_bufferblock(loc):
1172                if only_committed:
1173                    continue
1174                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1175            if portable_locators:
1176                loc = KeepLocator(loc).stripped()
1177            filestream.append(streams.LocatorAndRange(
1178                loc,
1179                KeepLocator(loc).size,
1180                segment.segment_offset,
1181                segment.range_size,
1182            ))
1183        buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1184        buf += "\n"
1185        return buf
1186
1187    @must_be_writable
1188    @synchronized
1189    def _reparent(self, newparent, newname):
1190        self.set_committed(False)
1191        self.flush(sync=True)
1192        self.parent.remove(self.name)
1193        self.parent = newparent
1194        self.name = newname
1195        self.lock = self.parent.root_collection().lock

Represent a file in a Collection.

ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.

This object may be accessed from multiple threads.

ArvadosFile(parent, name, stream=[], segments=[])
765    def __init__(self, parent, name, stream=[], segments=[]):
766        """
767        ArvadosFile constructor.
768
769        :stream:
770          a list of Range objects representing a block stream
771
772        :segments:
773          a list of Range objects representing segments
774        """
775        self.parent = parent
776        self.name = name
777        self._writers = set()
778        self._committed = False
779        self._segments = []
780        self.lock = parent.root_collection().lock
781        for s in segments:
782            self._add_segment(stream, s.locator, s.range_size)
783        self._current_bblock = None
784        self._read_counter = 0

ArvadosFile constructor.

:stream: a list of Range objects representing a block stream

:segments: a list of Range objects representing segments

parent
name
lock
def writable(self):
786    def writable(self):
787        return self.parent.writable()
@synchronized
def permission_expired(self, as_of_dt=None):
789    @synchronized
790    def permission_expired(self, as_of_dt=None):
791        """Returns True if any of the segment's locators is expired"""
792        for r in self._segments:
793            if KeepLocator(r.locator).permission_expired(as_of_dt):
794                return True
795        return False

Returns True if any of the segment’s locators is expired

@synchronized
def has_remote_blocks(self):
797    @synchronized
798    def has_remote_blocks(self):
799        """Returns True if any of the segment's locators has a +R signature"""
800
801        for s in self._segments:
802            if '+R' in s.locator:
803                return True
804        return False

Returns True if any of the segment’s locators has a +R signature

@synchronized
def segments(self):
829    @synchronized
830    def segments(self):
831        return copy.copy(self._segments)
@synchronized
def clone(self, new_parent, new_name):
833    @synchronized
834    def clone(self, new_parent, new_name):
835        """Make a copy of this file."""
836        cp = ArvadosFile(new_parent, new_name)
837        cp.replace_contents(self)
838        return cp

Make a copy of this file.

@must_be_writable
@synchronized
def replace_contents(self, other):
840    @must_be_writable
841    @synchronized
842    def replace_contents(self, other):
843        """Replace segments of this file with segments from another `ArvadosFile` object."""
844
845        map_loc = {}
846        self._segments = []
847        for other_segment in other.segments():
848            new_loc = other_segment.locator
849            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
850                if other_segment.locator not in map_loc:
851                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
852                    if bufferblock.state() != _BufferBlock.WRITABLE:
853                        map_loc[other_segment.locator] = bufferblock.locator()
854                    else:
855                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
856                new_loc = map_loc[other_segment.locator]
857
858            self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
859
860        self.set_committed(False)

Replace segments of this file with segments from another ArvadosFile object.

@synchronized
def set_segments(self, segs):
895    @synchronized
896    def set_segments(self, segs):
897        self._segments = segs
@synchronized
def set_committed(self, value=True):
899    @synchronized
900    def set_committed(self, value=True):
901        """Set committed flag.
902
903        If value is True, set committed to be True.
904
905        If value is False, set committed to be False for this and all parents.
906        """
907        if value == self._committed:
908            return
909        self._committed = value
910        if self._committed is False and self.parent is not None:
911            self.parent.set_committed(False)

Set committed flag.

If value is True, set committed to be True.

If value is False, set committed to be False for this and all parents.

@synchronized
def committed(self):
913    @synchronized
914    def committed(self):
915        """Get whether this is committed or not."""
916        return self._committed

Get whether this is committed or not.

@synchronized
def add_writer(self, writer):
918    @synchronized
919    def add_writer(self, writer):
920        """Add an ArvadosFileWriter reference to the list of writers"""
921        if isinstance(writer, ArvadosFileWriter):
922            self._writers.add(writer)

Add an ArvadosFileWriter reference to the list of writers

@synchronized
def remove_writer(self, writer, flush):
924    @synchronized
925    def remove_writer(self, writer, flush):
926        """
927        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
928        and do some block maintenance tasks.
929        """
930        self._writers.remove(writer)
931
932        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
933            # File writer closed, not small enough for repacking
934            self.flush()
935        elif self.closed():
936            # All writers closed and size is adequate for repacking
937            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())

Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.

def closed(self):
939    def closed(self):
940        """
941        Get whether this is closed or not. When the writers list is empty, the file
942        is supposed to be closed.
943        """
944        return len(self._writers) == 0

Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.

@must_be_writable
@synchronized
def truncate(self, size):
946    @must_be_writable
947    @synchronized
948    def truncate(self, size):
949        """Shrink or expand the size of the file.
950
951        If `size` is less than the size of the file, the file contents after
952        `size` will be discarded.  If `size` is greater than the current size
953        of the file, it will be filled with zero bytes.
954
955        """
956        if size < self.size():
957            new_segs = []
958            for r in self._segments:
959                range_end = r.range_start+r.range_size
960                if r.range_start >= size:
961                    # segment is past the trucate size, all done
962                    break
963                elif size < range_end:
964                    nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
965                    nr.segment_offset = r.segment_offset
966                    new_segs.append(nr)
967                    break
968                else:
969                    new_segs.append(r)
970
971            self._segments = new_segs
972            self.set_committed(False)
973        elif size > self.size():
974            padding = self.parent._my_block_manager().get_padding_block()
975            diff = size - self.size()
976            while diff > config.KEEP_BLOCK_SIZE:
977                self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
978                diff -= config.KEEP_BLOCK_SIZE
979            if diff > 0:
980                self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
981            self.set_committed(False)
982        else:
983            # size == self.size()
984            pass

Shrink or expand the size of the file.

If size is less than the size of the file, the file contents after size will be discarded. If size is greater than the current size of the file, it will be filled with zero bytes.

def readfrom( self, offset, size, num_retries, exact=False, return_memoryview=False):
 986    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
 987        """Read up to `size` bytes from the file starting at `offset`.
 988
 989        Arguments:
 990
 991        * exact: bool --- If False (default), return less data than
 992         requested if the read crosses a block boundary and the next
 993         block isn't cached.  If True, only return less data than
 994         requested when hitting EOF.
 995
 996        * return_memoryview: bool --- If False (default) return a
 997          `bytes` object, which may entail making a copy in some
 998          situations.  If True, return a `memoryview` object which may
 999          avoid making a copy, but may be incompatible with code
1000          expecting a `bytes` object.
1001
1002        """
1003
1004        with self.lock:
1005            if size == 0 or offset >= self.size():
1006                return memoryview(b'') if return_memoryview else b''
1007            readsegs = streams.locators_and_ranges(self._segments, offset, size)
1008
1009            prefetch = None
1010            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1011            if prefetch_lookahead:
1012                # Doing prefetch on every read() call is surprisingly expensive
1013                # when we're trying to deliver data at 600+ MiBps and want
1014                # the read() fast path to be as lightweight as possible.
1015                #
1016                # Only prefetching every 128 read operations
1017                # dramatically reduces the overhead while still
1018                # getting the benefit of prefetching (e.g. when
1019                # reading 128 KiB at a time, it checks for prefetch
1020                # every 16 MiB).
1021                self._read_counter = (self._read_counter+1) % 128
1022                if self._read_counter == 1:
1023                    prefetch = streams.locators_and_ranges(
1024                        self._segments,
1025                        offset + size,
1026                        config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1027                        limit=(1+prefetch_lookahead),
1028                    )
1029
1030        locs = set()
1031        data = []
1032        for lr in readsegs:
1033            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1034            if block:
1035                blockview = memoryview(block)
1036                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1037                locs.add(lr.locator)
1038            else:
1039                break
1040
1041        if prefetch:
1042            for lr in prefetch:
1043                if lr.locator not in locs:
1044                    self.parent._my_block_manager().block_prefetch(lr.locator)
1045                    locs.add(lr.locator)
1046
1047        if len(data) == 1:
1048            return data[0] if return_memoryview else data[0].tobytes()
1049        else:
1050            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)

Read up to size bytes from the file starting at offset.

Arguments:

  • exact: bool — If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.

  • return_memoryview: bool — If False (default) return a bytes object, which may entail making a copy in some situations. If True, return a memoryview object which may avoid making a copy, but may be incompatible with code expecting a bytes object.

@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
1053    @must_be_writable
1054    @synchronized
1055    def writeto(self, offset, data, num_retries):
1056        """Write `data` to the file starting at `offset`.
1057
1058        This will update existing bytes and/or extend the size of the file as
1059        necessary.
1060
1061        """
1062        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1063            data = data.encode()
1064        if len(data) == 0:
1065            return
1066
1067        if offset > self.size():
1068            self.truncate(offset)
1069
1070        if len(data) > config.KEEP_BLOCK_SIZE:
1071            # Chunk it up into smaller writes
1072            n = 0
1073            dataview = memoryview(data)
1074            while n < len(data):
1075                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1076                n += config.KEEP_BLOCK_SIZE
1077            return
1078
1079        self.set_committed(False)
1080
1081        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1082            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1083
1084        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1085            self._current_bblock.repack_writes()
1086            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1087                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1088                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1089
1090        self._current_bblock.append(data)
1091        streams.replace_range(
1092            self._segments,
1093            offset,
1094            len(data),
1095            self._current_bblock.blockid,
1096            self._current_bblock.write_pointer - len(data),
1097        )
1098        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1099        return len(data)

Write data to the file starting at offset.

This will update existing bytes and/or extend the size of the file as necessary.

@synchronized
def flush(self, sync=True, num_retries=0):
1101    @synchronized
1102    def flush(self, sync=True, num_retries=0):
1103        """Flush the current bufferblock to Keep.
1104
1105        :sync:
1106          If True, commit block synchronously, wait until buffer block has been written.
1107          If False, commit block asynchronously, return immediately after putting block into
1108          the keep put queue.
1109        """
1110        if self.committed():
1111            return
1112
1113        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1114            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1115                self._current_bblock.repack_writes()
1116            if self._current_bblock.state() != _BufferBlock.DELETED:
1117                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1118
1119        if sync:
1120            to_delete = set()
1121            for s in self._segments:
1122                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1123                if bb:
1124                    if bb.state() != _BufferBlock.COMMITTED:
1125                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1126                    to_delete.add(s.locator)
1127                    s.locator = bb.locator()
1128            for s in to_delete:
1129                # Don't delete the bufferblock if it's owned by many files. It'll be
1130                # deleted after all of its owners are flush()ed.
1131                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1132                    self.parent._my_block_manager().delete_bufferblock(s)
1133
1134        self.parent.notify(MOD, self.parent, self.name, (self, self))

Flush the current bufferblock to Keep.

:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.

@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
1136    @must_be_writable
1137    @synchronized
1138    def add_segment(self, blocks, pos, size):
1139        """Add a segment to the end of the file.
1140
1141        `pos` and `offset` reference a section of the stream described by
1142        `blocks` (a list of Range objects)
1143
1144        """
1145        self._add_segment(blocks, pos, size)

Add a segment to the end of the file.

pos and offset reference a section of the stream described by blocks (a list of Range objects)

@synchronized
def size(self):
1155    @synchronized
1156    def size(self):
1157        """Get the file size."""
1158        if self._segments:
1159            n = self._segments[-1]
1160            return n.range_start + n.range_size
1161        else:
1162            return 0

Get the file size.

@synchronized
def manifest_text( self, stream_name='.', portable_locators=False, normalize=False, only_committed=False):
1164    @synchronized
1165    def manifest_text(self, stream_name=".", portable_locators=False,
1166                      normalize=False, only_committed=False):
1167        buf = ""
1168        filestream = []
1169        for segment in self._segments:
1170            loc = segment.locator
1171            if self.parent._my_block_manager().is_bufferblock(loc):
1172                if only_committed:
1173                    continue
1174                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1175            if portable_locators:
1176                loc = KeepLocator(loc).stripped()
1177            filestream.append(streams.LocatorAndRange(
1178                loc,
1179                KeepLocator(loc).size,
1180                segment.segment_offset,
1181                segment.range_size,
1182            ))
1183        buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1184        buf += "\n"
1185        return buf
fuse_entry
class ArvadosFileReader(ArvadosFileReaderBase):
1198class ArvadosFileReader(ArvadosFileReaderBase):
1199    """Wraps ArvadosFile in a file-like object supporting reading only.
1200
1201    Be aware that this class is NOT thread safe as there is no locking around
1202    updating file pointer.
1203
1204    """
1205
1206    def __init__(self, arvadosfile, mode="r", num_retries=None):
1207        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1208        self.arvadosfile = arvadosfile
1209
1210    def size(self):
1211        return self.arvadosfile.size()
1212
1213    def stream_name(self):
1214        return self.arvadosfile.parent.stream_name()
1215
1216    def readinto(self, b):
1217        data = self.read(len(b))
1218        b[:len(data)] = data
1219        return len(data)
1220
1221    @_FileLikeObjectBase._before_close
1222    @retry_method
1223    def read(self, size=-1, num_retries=None, return_memoryview=False):
1224        """Read up to `size` bytes from the file and return the result.
1225
1226        Starts at the current file position.  If `size` is negative or None,
1227        read the entire remainder of the file.
1228
1229        Returns None if the file pointer is at the end of the file.
1230
1231        Returns a `bytes` object, unless `return_memoryview` is True,
1232        in which case it returns a memory view, which may avoid an
1233        unnecessary data copy in some situations.
1234
1235        """
1236        if size < 0 or size is None:
1237            data = []
1238            #
1239            # specify exact=False, return_memoryview=True here so that we
1240            # only copy data once into the final buffer.
1241            #
1242            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1243            while rd:
1244                data.append(rd)
1245                self._filepos += len(rd)
1246                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1247            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1248        else:
1249            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1250            self._filepos += len(data)
1251            return data
1252
1253    @_FileLikeObjectBase._before_close
1254    @retry_method
1255    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1256        """Read up to `size` bytes from the stream, starting at the specified file offset.
1257
1258        This method does not change the file position.
1259
1260        Returns a `bytes` object, unless `return_memoryview` is True,
1261        in which case it returns a memory view, which may avoid an
1262        unnecessary data copy in some situations.
1263
1264        """
1265        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1266
1267    def flush(self):
1268        pass

Wraps ArvadosFile in a file-like object supporting reading only.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileReader(arvadosfile, mode='r', num_retries=None)
1206    def __init__(self, arvadosfile, mode="r", num_retries=None):
1207        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1208        self.arvadosfile = arvadosfile
arvadosfile
def size(self):
1210    def size(self):
1211        return self.arvadosfile.size()
def stream_name(self):
1213    def stream_name(self):
1214        return self.arvadosfile.parent.stream_name()
def readinto(self, b):
1216    def readinto(self, b):
1217        data = self.read(len(b))
1218        b[:len(data)] = data
1219        return len(data)
@retry_method
def read(self, size=-1, num_retries=None, return_memoryview=False):
1221    @_FileLikeObjectBase._before_close
1222    @retry_method
1223    def read(self, size=-1, num_retries=None, return_memoryview=False):
1224        """Read up to `size` bytes from the file and return the result.
1225
1226        Starts at the current file position.  If `size` is negative or None,
1227        read the entire remainder of the file.
1228
1229        Returns None if the file pointer is at the end of the file.
1230
1231        Returns a `bytes` object, unless `return_memoryview` is True,
1232        in which case it returns a memory view, which may avoid an
1233        unnecessary data copy in some situations.
1234
1235        """
1236        if size < 0 or size is None:
1237            data = []
1238            #
1239            # specify exact=False, return_memoryview=True here so that we
1240            # only copy data once into the final buffer.
1241            #
1242            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1243            while rd:
1244                data.append(rd)
1245                self._filepos += len(rd)
1246                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1247            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1248        else:
1249            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1250            self._filepos += len(data)
1251            return data

Read up to size bytes from the file and return the result.

Starts at the current file position. If size is negative or None, read the entire remainder of the file.

Returns None if the file pointer is at the end of the file.

Returns a bytes object, unless return_memoryview is True, in which case it returns a memory view, which may avoid an unnecessary data copy in some situations.

@retry_method
def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1253    @_FileLikeObjectBase._before_close
1254    @retry_method
1255    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1256        """Read up to `size` bytes from the stream, starting at the specified file offset.
1257
1258        This method does not change the file position.
1259
1260        Returns a `bytes` object, unless `return_memoryview` is True,
1261        in which case it returns a memory view, which may avoid an
1262        unnecessary data copy in some situations.
1263
1264        """
1265        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)

Read up to size bytes from the stream, starting at the specified file offset.

This method does not change the file position.

Returns a bytes object, unless return_memoryview is True, in which case it returns a memory view, which may avoid an unnecessary data copy in some situations.

def flush(self):
1267    def flush(self):
1268        pass
class ArvadosFileWriter(ArvadosFileReader):
1271class ArvadosFileWriter(ArvadosFileReader):
1272    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1273
1274    Be aware that this class is NOT thread safe as there is no locking around
1275    updating file pointer.
1276
1277    """
1278
1279    def __init__(self, arvadosfile, mode, num_retries=None):
1280        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1281        self.arvadosfile.add_writer(self)
1282
1283    def writable(self):
1284        return True
1285
1286    @_FileLikeObjectBase._before_close
1287    @retry_method
1288    def write(self, data, num_retries=None):
1289        if self.mode[0] == "a":
1290            self._filepos = self.size()
1291        self.arvadosfile.writeto(self._filepos, data, num_retries)
1292        self._filepos += len(data)
1293        return len(data)
1294
1295    @_FileLikeObjectBase._before_close
1296    @retry_method
1297    def writelines(self, seq, num_retries=None):
1298        for s in seq:
1299            self.write(s, num_retries=num_retries)
1300
1301    @_FileLikeObjectBase._before_close
1302    def truncate(self, size=None):
1303        if size is None:
1304            size = self._filepos
1305        self.arvadosfile.truncate(size)
1306
1307    @_FileLikeObjectBase._before_close
1308    def flush(self):
1309        self.arvadosfile.flush()
1310
1311    def close(self, flush=True):
1312        if not self.closed:
1313            self.arvadosfile.remove_writer(self, flush)
1314            super(ArvadosFileWriter, self).close()

Wraps ArvadosFile in a file-like object supporting both reading and writing.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileWriter(arvadosfile, mode, num_retries=None)
1279    def __init__(self, arvadosfile, mode, num_retries=None):
1280        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1281        self.arvadosfile.add_writer(self)
def writable(self):
1283    def writable(self):
1284        return True
@retry_method
def write(self, data, num_retries=None):
1286    @_FileLikeObjectBase._before_close
1287    @retry_method
1288    def write(self, data, num_retries=None):
1289        if self.mode[0] == "a":
1290            self._filepos = self.size()
1291        self.arvadosfile.writeto(self._filepos, data, num_retries)
1292        self._filepos += len(data)
1293        return len(data)
@retry_method
def writelines(self, seq, num_retries=None):
1295    @_FileLikeObjectBase._before_close
1296    @retry_method
1297    def writelines(self, seq, num_retries=None):
1298        for s in seq:
1299            self.write(s, num_retries=num_retries)
def truncate(self, size=None):
1301    @_FileLikeObjectBase._before_close
1302    def truncate(self, size=None):
1303        if size is None:
1304            size = self._filepos
1305        self.arvadosfile.truncate(size)
def flush(self):
1307    @_FileLikeObjectBase._before_close
1308    def flush(self):
1309        self.arvadosfile.flush()
def close(self, flush=True):
1311    def close(self, flush=True):
1312        if not self.closed:
1313            self.arvadosfile.remove_writer(self, flush)
1314            super(ArvadosFileWriter, self).close()
class WrappableFile:
1317class WrappableFile(object):
1318    """An interface to an Arvados file that's compatible with io wrappers.
1319
1320    """
1321    def __init__(self, f):
1322        self.f = f
1323        self.closed = False
1324    def close(self):
1325        self.closed = True
1326        return self.f.close()
1327    def flush(self):
1328        return self.f.flush()
1329    def read(self, *args, **kwargs):
1330        return self.f.read(*args, **kwargs)
1331    def readable(self):
1332        return self.f.readable()
1333    def readinto(self, *args, **kwargs):
1334        return self.f.readinto(*args, **kwargs)
1335    def seek(self, *args, **kwargs):
1336        return self.f.seek(*args, **kwargs)
1337    def seekable(self):
1338        return self.f.seekable()
1339    def tell(self):
1340        return self.f.tell()
1341    def writable(self):
1342        return self.f.writable()
1343    def write(self, *args, **kwargs):
1344        return self.f.write(*args, **kwargs)

An interface to an Arvados file that’s compatible with io wrappers.

WrappableFile(f)
1321    def __init__(self, f):
1322        self.f = f
1323        self.closed = False
f
closed
def close(self):
1324    def close(self):
1325        self.closed = True
1326        return self.f.close()
def flush(self):
1327    def flush(self):
1328        return self.f.flush()
def read(self, *args, **kwargs):
1329    def read(self, *args, **kwargs):
1330        return self.f.read(*args, **kwargs)
def readable(self):
1331    def readable(self):
1332        return self.f.readable()
def readinto(self, *args, **kwargs):
1333    def readinto(self, *args, **kwargs):
1334        return self.f.readinto(*args, **kwargs)
def seek(self, *args, **kwargs):
1335    def seek(self, *args, **kwargs):
1336        return self.f.seek(*args, **kwargs)
def seekable(self):
1337    def seekable(self):
1338        return self.f.seekable()
def tell(self):
1339    def tell(self):
1340        return self.f.tell()
def writable(self):
1341    def writable(self):
1342        return self.f.writable()
def write(self, *args, **kwargs):
1343    def write(self, *args, **kwargs):
1344        return self.f.write(*args, **kwargs)