arvados.arvfile

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import bz2
   6import collections
   7import copy
   8import errno
   9import functools
  10import hashlib
  11import logging
  12import os
  13import queue
  14import re
  15import sys
  16import threading
  17import uuid
  18import zlib
  19
  20from . import config
  21from ._internal import streams
  22from .errors import KeepWriteError, AssertionError, ArgumentError
  23from .keep import KeepLocator
  24from .retry import retry_method
  25
  26ADD = "add"
  27"""Argument value for `Collection` methods to represent an added item"""
  28DEL = "del"
  29"""Argument value for `Collection` methods to represent a removed item"""
  30MOD = "mod"
  31"""Argument value for `Collection` methods to represent a modified item"""
  32TOK = "tok"
  33"""Argument value for `Collection` methods to represent an item with token differences"""
  34WRITE = "write"
  35"""Argument value for `Collection` methods to represent that a file was written to"""
  36
  37_logger = logging.getLogger('arvados.arvfile')
  38
  39def split(path):
  40    """split(path) -> streamname, filename
  41
  42    Separate the stream name and file name in a /-separated stream path and
  43    return a tuple (stream_name, file_name).  If no stream name is available,
  44    assume '.'.
  45
  46    """
  47    try:
  48        stream_name, file_name = path.rsplit('/', 1)
  49    except ValueError:  # No / in string
  50        stream_name, file_name = '.', path
  51    return stream_name, file_name
  52
  53
  54class UnownedBlockError(Exception):
  55    """Raised when there's an writable block without an owner on the BlockManager."""
  56    pass
  57
  58
  59class _FileLikeObjectBase(object):
  60    def __init__(self, name, mode):
  61        self.name = name
  62        self.mode = mode
  63        self.closed = False
  64
  65    @staticmethod
  66    def _before_close(orig_func):
  67        @functools.wraps(orig_func)
  68        def before_close_wrapper(self, *args, **kwargs):
  69            if self.closed:
  70                raise ValueError("I/O operation on closed stream file")
  71            return orig_func(self, *args, **kwargs)
  72        return before_close_wrapper
  73
  74    def __enter__(self):
  75        return self
  76
  77    def __exit__(self, exc_type, exc_value, traceback):
  78        try:
  79            self.close()
  80        except Exception:
  81            if exc_type is None:
  82                raise
  83
  84    def close(self):
  85        self.closed = True
  86
  87
  88class ArvadosFileReaderBase(_FileLikeObjectBase):
  89    def __init__(self, name, mode, num_retries=None):
  90        super(ArvadosFileReaderBase, self).__init__(name, mode)
  91        self._filepos = 0
  92        self.num_retries = num_retries
  93        self._readline_cache = (None, None)
  94
  95    def __iter__(self):
  96        while True:
  97            data = self.readline()
  98            if not data:
  99                break
 100            yield data
 101
 102    def decompressed_name(self):
 103        return re.sub(r'\.(bz2|gz)$', '', self.name)
 104
 105    @_FileLikeObjectBase._before_close
 106    def seek(self, pos, whence=os.SEEK_SET):
 107        if whence == os.SEEK_CUR:
 108            pos += self._filepos
 109        elif whence == os.SEEK_END:
 110            pos += self.size()
 111        if pos < 0:
 112            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
 113        self._filepos = pos
 114        return self._filepos
 115
 116    def tell(self):
 117        return self._filepos
 118
 119    def readable(self):
 120        return True
 121
 122    def writable(self):
 123        return False
 124
 125    def seekable(self):
 126        return True
 127
 128    @_FileLikeObjectBase._before_close
 129    @retry_method
 130    def readall(self, size=2**20, num_retries=None):
 131        while True:
 132            data = self.read(size, num_retries=num_retries)
 133            if len(data) == 0:
 134                break
 135            yield data
 136
 137    @_FileLikeObjectBase._before_close
 138    @retry_method
 139    def readline(self, size=float('inf'), num_retries=None):
 140        cache_pos, cache_data = self._readline_cache
 141        if self.tell() == cache_pos:
 142            data = [cache_data]
 143            self._filepos += len(cache_data)
 144        else:
 145            data = [b'']
 146        data_size = len(data[-1])
 147        while (data_size < size) and (b'\n' not in data[-1]):
 148            next_read = self.read(2 ** 20, num_retries=num_retries)
 149            if not next_read:
 150                break
 151            data.append(next_read)
 152            data_size += len(next_read)
 153        data = b''.join(data)
 154        try:
 155            nextline_index = data.index(b'\n') + 1
 156        except ValueError:
 157            nextline_index = len(data)
 158        nextline_index = min(nextline_index, size)
 159        self._filepos -= len(data) - nextline_index
 160        self._readline_cache = (self.tell(), data[nextline_index:])
 161        return data[:nextline_index].decode()
 162
 163    @_FileLikeObjectBase._before_close
 164    @retry_method
 165    def decompress(self, decompress, size, num_retries=None):
 166        for segment in self.readall(size, num_retries=num_retries):
 167            data = decompress(segment)
 168            if data:
 169                yield data
 170
 171    @_FileLikeObjectBase._before_close
 172    @retry_method
 173    def readall_decompressed(self, size=2**20, num_retries=None):
 174        self.seek(0)
 175        if self.name.endswith('.bz2'):
 176            dc = bz2.BZ2Decompressor()
 177            return self.decompress(dc.decompress, size,
 178                                   num_retries=num_retries)
 179        elif self.name.endswith('.gz'):
 180            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
 181            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
 182                                   size, num_retries=num_retries)
 183        else:
 184            return self.readall(size, num_retries=num_retries)
 185
 186    @_FileLikeObjectBase._before_close
 187    @retry_method
 188    def readlines(self, sizehint=float('inf'), num_retries=None):
 189        data = []
 190        data_size = 0
 191        for s in self.readall(num_retries=num_retries):
 192            data.append(s)
 193            data_size += len(s)
 194            if data_size >= sizehint:
 195                break
 196        return b''.join(data).decode().splitlines(True)
 197
 198    def size(self):
 199        raise IOError(errno.ENOSYS, "Not implemented")
 200
 201    def read(self, size, num_retries=None):
 202        raise IOError(errno.ENOSYS, "Not implemented")
 203
 204    def readfrom(self, start, size, num_retries=None):
 205        raise IOError(errno.ENOSYS, "Not implemented")
 206
 207
 208def synchronized(orig_func):
 209    @functools.wraps(orig_func)
 210    def synchronized_wrapper(self, *args, **kwargs):
 211        with self.lock:
 212            return orig_func(self, *args, **kwargs)
 213    return synchronized_wrapper
 214
 215
 216class StateChangeError(Exception):
 217    def __init__(self, message, state, nextstate):
 218        super(StateChangeError, self).__init__(message)
 219        self.state = state
 220        self.nextstate = nextstate
 221
 222class _BufferBlock(object):
 223    """A stand-in for a Keep block that is in the process of being written.
 224
 225    Writers can append to it, get the size, and compute the Keep locator.
 226    There are three valid states:
 227
 228    WRITABLE
 229      Can append to block.
 230
 231    PENDING
 232      Block is in the process of being uploaded to Keep, append is an error.
 233
 234    COMMITTED
 235      The block has been written to Keep, its internal buffer has been
 236      released, fetching the block will fetch it via keep client (since we
 237      discarded the internal copy), and identifiers referring to the BufferBlock
 238      can be replaced with the block locator.
 239
 240    """
 241
 242    WRITABLE = 0
 243    PENDING = 1
 244    COMMITTED = 2
 245    ERROR = 3
 246    DELETED = 4
 247
 248    def __init__(self, blockid, starting_capacity, owner):
 249        """
 250        :blockid:
 251          the identifier for this block
 252
 253        :starting_capacity:
 254          the initial buffer capacity
 255
 256        :owner:
 257          ArvadosFile that owns this block
 258
 259        """
 260        self.blockid = blockid
 261        self.buffer_block = bytearray(starting_capacity)
 262        self.buffer_view = memoryview(self.buffer_block)
 263        self.write_pointer = 0
 264        self._state = _BufferBlock.WRITABLE
 265        self._locator = None
 266        self.owner = owner
 267        self.lock = threading.Lock()
 268        self.wait_for_commit = threading.Event()
 269        self.error = None
 270
 271    @synchronized
 272    def append(self, data):
 273        """Append some data to the buffer.
 274
 275        Only valid if the block is in WRITABLE state.  Implements an expanding
 276        buffer, doubling capacity as needed to accomdate all the data.
 277
 278        """
 279        if self._state == _BufferBlock.WRITABLE:
 280            if not isinstance(data, bytes) and not isinstance(data, memoryview):
 281                data = data.encode()
 282            while (self.write_pointer+len(data)) > len(self.buffer_block):
 283                new_buffer_block = bytearray(len(self.buffer_block) * 2)
 284                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
 285                self.buffer_block = new_buffer_block
 286                self.buffer_view = memoryview(self.buffer_block)
 287            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
 288            self.write_pointer += len(data)
 289            self._locator = None
 290        else:
 291            raise AssertionError("Buffer block is not writable")
 292
 293    STATE_TRANSITIONS = frozenset([
 294            (WRITABLE, PENDING),
 295            (PENDING, COMMITTED),
 296            (PENDING, ERROR),
 297            (ERROR, PENDING)])
 298
 299    @synchronized
 300    def set_state(self, nextstate, val=None):
 301        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
 302            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
 303        self._state = nextstate
 304
 305        if self._state == _BufferBlock.PENDING:
 306            self.wait_for_commit.clear()
 307
 308        if self._state == _BufferBlock.COMMITTED:
 309            self._locator = val
 310            self.buffer_view = None
 311            self.buffer_block = None
 312            self.wait_for_commit.set()
 313
 314        if self._state == _BufferBlock.ERROR:
 315            self.error = val
 316            self.wait_for_commit.set()
 317
 318    @synchronized
 319    def state(self):
 320        return self._state
 321
 322    def size(self):
 323        """The amount of data written to the buffer."""
 324        return self.write_pointer
 325
 326    @synchronized
 327    def locator(self):
 328        """The Keep locator for this buffer's contents."""
 329        if self._locator is None:
 330            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
 331        return self._locator
 332
 333    @synchronized
 334    def clone(self, new_blockid, owner):
 335        if self._state == _BufferBlock.COMMITTED:
 336            raise AssertionError("Cannot duplicate committed buffer block")
 337        bufferblock = _BufferBlock(new_blockid, self.size(), owner)
 338        bufferblock.append(self.buffer_view[0:self.size()])
 339        return bufferblock
 340
 341    @synchronized
 342    def clear(self):
 343        self._state = _BufferBlock.DELETED
 344        self.owner = None
 345        self.buffer_block = None
 346        self.buffer_view = None
 347
 348    @synchronized
 349    def repack_writes(self):
 350        """Optimize buffer block by repacking segments in file sequence.
 351
 352        When the client makes random writes, they appear in the buffer block in
 353        the sequence they were written rather than the sequence they appear in
 354        the file.  This makes for inefficient, fragmented manifests.  Attempt
 355        to optimize by repacking writes in file sequence.
 356
 357        """
 358        if self._state != _BufferBlock.WRITABLE:
 359            raise AssertionError("Cannot repack non-writable block")
 360
 361        segs = self.owner.segments()
 362
 363        # Collect the segments that reference the buffer block.
 364        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
 365
 366        # Collect total data referenced by segments (could be smaller than
 367        # bufferblock size if a portion of the file was written and
 368        # then overwritten).
 369        write_total = sum([s.range_size for s in bufferblock_segs])
 370
 371        if write_total < self.size() or len(bufferblock_segs) > 1:
 372            # If there's more than one segment referencing this block, it is
 373            # due to out-of-order writes and will produce a fragmented
 374            # manifest, so try to optimize by re-packing into a new buffer.
 375            contents = self.buffer_view[0:self.write_pointer].tobytes()
 376            new_bb = _BufferBlock(None, write_total, None)
 377            for t in bufferblock_segs:
 378                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
 379                t.segment_offset = new_bb.size() - t.range_size
 380
 381            self.buffer_block = new_bb.buffer_block
 382            self.buffer_view = new_bb.buffer_view
 383            self.write_pointer = new_bb.write_pointer
 384            self._locator = None
 385            new_bb.clear()
 386            self.owner.set_segments(segs)
 387
 388    def __repr__(self):
 389        return "<BufferBlock %s>" % (self.blockid)
 390
 391
 392class NoopLock(object):
 393    def __enter__(self):
 394        return self
 395
 396    def __exit__(self, exc_type, exc_value, traceback):
 397        pass
 398
 399    def acquire(self, blocking=False):
 400        pass
 401
 402    def release(self):
 403        pass
 404
 405
 406def must_be_writable(orig_func):
 407    @functools.wraps(orig_func)
 408    def must_be_writable_wrapper(self, *args, **kwargs):
 409        if not self.writable():
 410            raise IOError(errno.EROFS, "Collection is read-only.")
 411        return orig_func(self, *args, **kwargs)
 412    return must_be_writable_wrapper
 413
 414
 415class _BlockManager(object):
 416    """BlockManager handles buffer blocks.
 417
 418    Also handles background block uploads, and background block prefetch for a
 419    Collection of ArvadosFiles.
 420
 421    """
 422
 423    DEFAULT_PUT_THREADS = 2
 424
 425    def __init__(self, keep,
 426                 copies=None,
 427                 put_threads=None,
 428                 num_retries=None,
 429                 storage_classes_func=None):
 430        """keep: KeepClient object to use"""
 431        self._keep = keep
 432        self._bufferblocks = collections.OrderedDict()
 433        self._put_queue = None
 434        self._put_threads = None
 435        self.lock = threading.Lock()
 436        self.prefetch_lookahead = self._keep.num_prefetch_threads
 437        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
 438        self.copies = copies
 439        self.storage_classes = storage_classes_func or (lambda: [])
 440        self._pending_write_size = 0
 441        self.threads_lock = threading.Lock()
 442        self.padding_block = None
 443        self.num_retries = num_retries
 444
 445    @synchronized
 446    def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 447        """Allocate a new, empty bufferblock in WRITABLE state and return it.
 448
 449        :blockid:
 450          optional block identifier, otherwise one will be automatically assigned
 451
 452        :starting_capacity:
 453          optional capacity, otherwise will use default capacity
 454
 455        :owner:
 456          ArvadosFile that owns this block
 457
 458        """
 459        return self._alloc_bufferblock(blockid, starting_capacity, owner)
 460
 461    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 462        if blockid is None:
 463            blockid = str(uuid.uuid4())
 464        bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
 465        self._bufferblocks[bufferblock.blockid] = bufferblock
 466        return bufferblock
 467
 468    @synchronized
 469    def dup_block(self, block, owner):
 470        """Create a new bufferblock initialized with the content of an existing bufferblock.
 471
 472        :block:
 473          the buffer block to copy.
 474
 475        :owner:
 476          ArvadosFile that owns the new block
 477
 478        """
 479        new_blockid = str(uuid.uuid4())
 480        bufferblock = block.clone(new_blockid, owner)
 481        self._bufferblocks[bufferblock.blockid] = bufferblock
 482        return bufferblock
 483
 484    @synchronized
 485    def is_bufferblock(self, locator):
 486        return locator in self._bufferblocks
 487
 488    def _commit_bufferblock_worker(self):
 489        """Background uploader thread."""
 490
 491        while True:
 492            try:
 493                bufferblock = self._put_queue.get()
 494                if bufferblock is None:
 495                    return
 496
 497                if self.copies is None:
 498                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 499                else:
 500                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 501                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 502            except Exception as e:
 503                bufferblock.set_state(_BufferBlock.ERROR, e)
 504            finally:
 505                if self._put_queue is not None:
 506                    self._put_queue.task_done()
 507
 508    def start_put_threads(self):
 509        with self.threads_lock:
 510            if self._put_threads is None:
 511                # Start uploader threads.
 512
 513                # If we don't limit the Queue size, the upload queue can quickly
 514                # grow to take up gigabytes of RAM if the writing process is
 515                # generating data more quickly than it can be sent to the Keep
 516                # servers.
 517                #
 518                # With two upload threads and a queue size of 2, this means up to 4
 519                # blocks pending.  If they are full 64 MiB blocks, that means up to
 520                # 256 MiB of internal buffering, which is the same size as the
 521                # default download block cache in KeepClient.
 522                self._put_queue = queue.Queue(maxsize=2)
 523
 524                self._put_threads = []
 525                for i in range(0, self.num_put_threads):
 526                    thread = threading.Thread(target=self._commit_bufferblock_worker)
 527                    self._put_threads.append(thread)
 528                    thread.daemon = True
 529                    thread.start()
 530
 531    @synchronized
 532    def stop_threads(self):
 533        """Shut down and wait for background upload and download threads to finish."""
 534
 535        if self._put_threads is not None:
 536            for t in self._put_threads:
 537                self._put_queue.put(None)
 538            for t in self._put_threads:
 539                t.join()
 540        self._put_threads = None
 541        self._put_queue = None
 542
 543    def __enter__(self):
 544        return self
 545
 546    def __exit__(self, exc_type, exc_value, traceback):
 547        self.stop_threads()
 548
 549    @synchronized
 550    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
 551        """Packs small blocks together before uploading"""
 552
 553        self._pending_write_size += closed_file_size
 554
 555        # Check if there are enough small blocks for filling up one in full
 556        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
 557            return
 558
 559        # Search blocks ready for getting packed together before being
 560        # committed to Keep.
 561        # A WRITABLE block always has an owner.
 562        # A WRITABLE block with its owner.closed() implies that its
 563        # size is <= KEEP_BLOCK_SIZE/2.
 564        try:
 565            small_blocks = [b for b in self._bufferblocks.values()
 566                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
 567        except AttributeError:
 568            # Writable blocks without owner shouldn't exist.
 569            raise UnownedBlockError()
 570
 571        if len(small_blocks) <= 1:
 572            # Not enough small blocks for repacking
 573            return
 574
 575        for bb in small_blocks:
 576            bb.repack_writes()
 577
 578        # Update the pending write size count with its true value, just in case
 579        # some small file was opened, written and closed several times.
 580        self._pending_write_size = sum([b.size() for b in small_blocks])
 581
 582        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
 583            return
 584
 585        new_bb = self._alloc_bufferblock()
 586        new_bb.owner = []
 587        files = []
 588        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
 589            bb = small_blocks.pop(0)
 590            new_bb.owner.append(bb.owner)
 591            self._pending_write_size -= bb.size()
 592            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
 593            files.append((bb, new_bb.write_pointer - bb.size()))
 594
 595        self.commit_bufferblock(new_bb, sync=sync)
 596
 597        for bb, new_bb_segment_offset in files:
 598            newsegs = bb.owner.segments()
 599            for s in newsegs:
 600                if s.locator == bb.blockid:
 601                    s.locator = new_bb.blockid
 602                    s.segment_offset = new_bb_segment_offset+s.segment_offset
 603            bb.owner.set_segments(newsegs)
 604            self._delete_bufferblock(bb.blockid)
 605
 606    def commit_bufferblock(self, block, sync):
 607        """Initiate a background upload of a bufferblock.
 608
 609        :block:
 610          The block object to upload
 611
 612        :sync:
 613          If `sync` is True, upload the block synchronously.
 614          If `sync` is False, upload the block asynchronously.  This will
 615          return immediately unless the upload queue is at capacity, in
 616          which case it will wait on an upload queue slot.
 617
 618        """
 619        try:
 620            # Mark the block as PENDING so to disallow any more appends.
 621            block.set_state(_BufferBlock.PENDING)
 622        except StateChangeError as e:
 623            if e.state == _BufferBlock.PENDING:
 624                if sync:
 625                    block.wait_for_commit.wait()
 626                else:
 627                    return
 628            if block.state() == _BufferBlock.COMMITTED:
 629                return
 630            elif block.state() == _BufferBlock.ERROR:
 631                raise block.error
 632            else:
 633                raise
 634
 635        if sync:
 636            try:
 637                if self.copies is None:
 638                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 639                else:
 640                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 641                block.set_state(_BufferBlock.COMMITTED, loc)
 642            except Exception as e:
 643                block.set_state(_BufferBlock.ERROR, e)
 644                raise
 645        else:
 646            self.start_put_threads()
 647            self._put_queue.put(block)
 648
 649    @synchronized
 650    def get_bufferblock(self, locator):
 651        return self._bufferblocks.get(locator)
 652
 653    @synchronized
 654    def get_padding_block(self):
 655        """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
 656        when using truncate() to extend the size of a file.
 657
 658        For reference (and possible future optimization), the md5sum of the
 659        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
 660
 661        """
 662
 663        if self.padding_block is None:
 664            self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
 665            self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
 666            self.commit_bufferblock(self.padding_block, False)
 667        return self.padding_block
 668
 669    @synchronized
 670    def delete_bufferblock(self, locator):
 671        self._delete_bufferblock(locator)
 672
 673    def _delete_bufferblock(self, locator):
 674        if locator in self._bufferblocks:
 675            bb = self._bufferblocks[locator]
 676            bb.clear()
 677            del self._bufferblocks[locator]
 678
 679    def get_block_contents(self, locator, num_retries, cache_only=False):
 680        """Fetch a block.
 681
 682        First checks to see if the locator is a BufferBlock and return that, if
 683        not, passes the request through to KeepClient.get().
 684
 685        """
 686        with self.lock:
 687            if locator in self._bufferblocks:
 688                bufferblock = self._bufferblocks[locator]
 689                if bufferblock.state() != _BufferBlock.COMMITTED:
 690                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
 691                else:
 692                    locator = bufferblock._locator
 693        if cache_only:
 694            return self._keep.get_from_cache(locator)
 695        else:
 696            return self._keep.get(locator, num_retries=num_retries)
 697
 698    def commit_all(self):
 699        """Commit all outstanding buffer blocks.
 700
 701        This is a synchronous call, and will not return until all buffer blocks
 702        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 703
 704        """
 705        self.repack_small_blocks(force=True, sync=True)
 706
 707        with self.lock:
 708            items = list(self._bufferblocks.items())
 709
 710        for k,v in items:
 711            if v.state() != _BufferBlock.COMMITTED and v.owner:
 712                # Ignore blocks with a list of owners, as if they're not in COMMITTED
 713                # state, they're already being committed asynchronously.
 714                if isinstance(v.owner, ArvadosFile):
 715                    v.owner.flush(sync=False)
 716
 717        with self.lock:
 718            if self._put_queue is not None:
 719                self._put_queue.join()
 720
 721                err = []
 722                for k,v in items:
 723                    if v.state() == _BufferBlock.ERROR:
 724                        err.append((v.locator(), v.error))
 725                if err:
 726                    raise KeepWriteError("Error writing some blocks", err, label="block")
 727
 728        for k,v in items:
 729            # flush again with sync=True to remove committed bufferblocks from
 730            # the segments.
 731            if v.owner:
 732                if isinstance(v.owner, ArvadosFile):
 733                    v.owner.flush(sync=True)
 734                elif isinstance(v.owner, list) and len(v.owner) > 0:
 735                    # This bufferblock is referenced by many files as a result
 736                    # of repacking small blocks, so don't delete it when flushing
 737                    # its owners, just do it after flushing them all.
 738                    for owner in v.owner:
 739                        owner.flush(sync=True)
 740                    self.delete_bufferblock(k)
 741
 742        self.stop_threads()
 743
 744    def block_prefetch(self, locator):
 745        """Initiate a background download of a block.
 746        """
 747
 748        if not self.prefetch_lookahead:
 749            return
 750
 751        with self.lock:
 752            if locator in self._bufferblocks:
 753                return
 754
 755        self._keep.block_prefetch(locator)
 756
 757
 758class ArvadosFile(object):
 759    """Represent a file in a Collection.
 760
 761    ArvadosFile manages the underlying representation of a file in Keep as a
 762    sequence of segments spanning a set of blocks, and implements random
 763    read/write access.
 764
 765    This object may be accessed from multiple threads.
 766
 767    """
 768
 769    __slots__ = ('parent', 'name', '_writers', '_committed',
 770                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 771
 772    def __init__(self, parent, name, stream=[], segments=[]):
 773        """
 774        ArvadosFile constructor.
 775
 776        :stream:
 777          a list of Range objects representing a block stream
 778
 779        :segments:
 780          a list of Range objects representing segments
 781        """
 782        self.parent = parent
 783        self.name = name
 784        self._writers = set()
 785        self._committed = False
 786        self._segments = []
 787        self.lock = parent.root_collection().lock
 788        for s in segments:
 789            self._add_segment(stream, s.locator, s.range_size)
 790        self._current_bblock = None
 791        self._read_counter = 0
 792
 793    def writable(self):
 794        return self.parent.writable()
 795
 796    @synchronized
 797    def permission_expired(self, as_of_dt=None):
 798        """Returns True if any of the segment's locators is expired"""
 799        for r in self._segments:
 800            if KeepLocator(r.locator).permission_expired(as_of_dt):
 801                return True
 802        return False
 803
 804    @synchronized
 805    def has_remote_blocks(self):
 806        """Returns True if any of the segment's locators has a +R signature"""
 807
 808        for s in self._segments:
 809            if '+R' in s.locator:
 810                return True
 811        return False
 812
 813    @synchronized
 814    def _copy_remote_blocks(self, remote_blocks={}):
 815        """Ask Keep to copy remote blocks and point to their local copies.
 816
 817        This is called from the parent Collection.
 818
 819        :remote_blocks:
 820            Shared cache of remote to local block mappings. This is used to avoid
 821            doing extra work when blocks are shared by more than one file in
 822            different subdirectories.
 823        """
 824
 825        for s in self._segments:
 826            if '+R' in s.locator:
 827                try:
 828                    loc = remote_blocks[s.locator]
 829                except KeyError:
 830                    loc = self.parent._my_keep().refresh_signature(s.locator)
 831                    remote_blocks[s.locator] = loc
 832                s.locator = loc
 833                self.parent.set_committed(False)
 834        return remote_blocks
 835
 836    @synchronized
 837    def segments(self):
 838        return copy.copy(self._segments)
 839
 840    @synchronized
 841    def clone(self, new_parent, new_name):
 842        """Make a copy of this file."""
 843        cp = ArvadosFile(new_parent, new_name)
 844        cp.replace_contents(self)
 845        return cp
 846
 847    @must_be_writable
 848    @synchronized
 849    def replace_contents(self, other):
 850        """Replace segments of this file with segments from another `ArvadosFile` object."""
 851
 852        eventtype = TOK if self == other else MOD
 853
 854        map_loc = {}
 855        self._segments = []
 856        for other_segment in other.segments():
 857            new_loc = other_segment.locator
 858            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 859                if other_segment.locator not in map_loc:
 860                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 861                    if bufferblock.state() != _BufferBlock.WRITABLE:
 862                        map_loc[other_segment.locator] = bufferblock.locator()
 863                    else:
 864                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 865                new_loc = map_loc[other_segment.locator]
 866
 867            self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 868
 869        self.set_committed(False)
 870        self.parent.notify(eventtype, self.parent, self.name, (self, self))
 871
 872    def __eq__(self, other):
 873        if other is self:
 874            return True
 875        if not isinstance(other, ArvadosFile):
 876            return False
 877
 878        othersegs = other.segments()
 879        with self.lock:
 880            if len(self._segments) != len(othersegs):
 881                return False
 882            for i in range(0, len(othersegs)):
 883                seg1 = self._segments[i]
 884                seg2 = othersegs[i]
 885                loc1 = seg1.locator
 886                loc2 = seg2.locator
 887
 888                if self.parent._my_block_manager().is_bufferblock(loc1):
 889                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 890
 891                if other.parent._my_block_manager().is_bufferblock(loc2):
 892                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 893
 894                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 895                    seg1.range_start != seg2.range_start or
 896                    seg1.range_size != seg2.range_size or
 897                    seg1.segment_offset != seg2.segment_offset):
 898                    return False
 899
 900        return True
 901
 902    def __ne__(self, other):
 903        return not self.__eq__(other)
 904
 905    @synchronized
 906    def set_segments(self, segs):
 907        self._segments = segs
 908
 909    @synchronized
 910    def set_committed(self, value=True):
 911        """Set committed flag.
 912
 913        If value is True, set committed to be True.
 914
 915        If value is False, set committed to be False for this and all parents.
 916        """
 917        if value == self._committed:
 918            return
 919        self._committed = value
 920        if self._committed is False and self.parent is not None:
 921            self.parent.set_committed(False)
 922
 923    @synchronized
 924    def committed(self):
 925        """Get whether this is committed or not."""
 926        return self._committed
 927
 928    @synchronized
 929    def add_writer(self, writer):
 930        """Add an ArvadosFileWriter reference to the list of writers"""
 931        if isinstance(writer, ArvadosFileWriter):
 932            self._writers.add(writer)
 933
 934    @synchronized
 935    def remove_writer(self, writer, flush):
 936        """
 937        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 938        and do some block maintenance tasks.
 939        """
 940        self._writers.remove(writer)
 941
 942        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 943            # File writer closed, not small enough for repacking
 944            self.flush()
 945        elif self.closed():
 946            # All writers closed and size is adequate for repacking
 947            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 948
 949    def closed(self):
 950        """
 951        Get whether this is closed or not. When the writers list is empty, the file
 952        is supposed to be closed.
 953        """
 954        return len(self._writers) == 0
 955
 956    @must_be_writable
 957    @synchronized
 958    def truncate(self, size):
 959        """Shrink or expand the size of the file.
 960
 961        If `size` is less than the size of the file, the file contents after
 962        `size` will be discarded.  If `size` is greater than the current size
 963        of the file, it will be filled with zero bytes.
 964
 965        """
 966        if size < self.size():
 967            new_segs = []
 968            for r in self._segments:
 969                range_end = r.range_start+r.range_size
 970                if r.range_start >= size:
 971                    # segment is past the trucate size, all done
 972                    break
 973                elif size < range_end:
 974                    nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
 975                    nr.segment_offset = r.segment_offset
 976                    new_segs.append(nr)
 977                    break
 978                else:
 979                    new_segs.append(r)
 980
 981            self._segments = new_segs
 982            self.set_committed(False)
 983        elif size > self.size():
 984            padding = self.parent._my_block_manager().get_padding_block()
 985            diff = size - self.size()
 986            while diff > config.KEEP_BLOCK_SIZE:
 987                self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
 988                diff -= config.KEEP_BLOCK_SIZE
 989            if diff > 0:
 990                self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
 991            self.set_committed(False)
 992        else:
 993            # size == self.size()
 994            pass
 995
 996    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
 997        """Read up to `size` bytes from the file starting at `offset`.
 998
 999        Arguments:
1000
1001        * exact: bool --- If False (default), return less data than
1002         requested if the read crosses a block boundary and the next
1003         block isn't cached.  If True, only return less data than
1004         requested when hitting EOF.
1005
1006        * return_memoryview: bool --- If False (default) return a
1007          `bytes` object, which may entail making a copy in some
1008          situations.  If True, return a `memoryview` object which may
1009          avoid making a copy, but may be incompatible with code
1010          expecting a `bytes` object.
1011
1012        """
1013
1014        with self.lock:
1015            if size == 0 or offset >= self.size():
1016                return memoryview(b'') if return_memoryview else b''
1017            readsegs = streams.locators_and_ranges(self._segments, offset, size)
1018
1019            prefetch = None
1020            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1021            if prefetch_lookahead:
1022                # Doing prefetch on every read() call is surprisingly expensive
1023                # when we're trying to deliver data at 600+ MiBps and want
1024                # the read() fast path to be as lightweight as possible.
1025                #
1026                # Only prefetching every 128 read operations
1027                # dramatically reduces the overhead while still
1028                # getting the benefit of prefetching (e.g. when
1029                # reading 128 KiB at a time, it checks for prefetch
1030                # every 16 MiB).
1031                self._read_counter = (self._read_counter+1) % 128
1032                if self._read_counter == 1:
1033                    prefetch = streams.locators_and_ranges(
1034                        self._segments,
1035                        offset + size,
1036                        config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1037                        limit=(1+prefetch_lookahead),
1038                    )
1039
1040        locs = set()
1041        data = []
1042        for lr in readsegs:
1043            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1044            if block:
1045                blockview = memoryview(block)
1046                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1047                locs.add(lr.locator)
1048            else:
1049                break
1050
1051        if prefetch:
1052            for lr in prefetch:
1053                if lr.locator not in locs:
1054                    self.parent._my_block_manager().block_prefetch(lr.locator)
1055                    locs.add(lr.locator)
1056
1057        if len(data) == 1:
1058            return data[0] if return_memoryview else data[0].tobytes()
1059        else:
1060            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1061
1062
1063    @must_be_writable
1064    @synchronized
1065    def writeto(self, offset, data, num_retries):
1066        """Write `data` to the file starting at `offset`.
1067
1068        This will update existing bytes and/or extend the size of the file as
1069        necessary.
1070
1071        """
1072        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1073            data = data.encode()
1074        if len(data) == 0:
1075            return
1076
1077        if offset > self.size():
1078            self.truncate(offset)
1079
1080        if len(data) > config.KEEP_BLOCK_SIZE:
1081            # Chunk it up into smaller writes
1082            n = 0
1083            dataview = memoryview(data)
1084            while n < len(data):
1085                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1086                n += config.KEEP_BLOCK_SIZE
1087            return
1088
1089        self.set_committed(False)
1090
1091        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1092            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1093
1094        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1095            self._current_bblock.repack_writes()
1096            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1097                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1098                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1099
1100        self._current_bblock.append(data)
1101        streams.replace_range(
1102            self._segments,
1103            offset,
1104            len(data),
1105            self._current_bblock.blockid,
1106            self._current_bblock.write_pointer - len(data),
1107        )
1108        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1109        return len(data)
1110
1111    @synchronized
1112    def flush(self, sync=True, num_retries=0):
1113        """Flush the current bufferblock to Keep.
1114
1115        :sync:
1116          If True, commit block synchronously, wait until buffer block has been written.
1117          If False, commit block asynchronously, return immediately after putting block into
1118          the keep put queue.
1119        """
1120        if self.committed():
1121            return
1122
1123        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1124            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1125                self._current_bblock.repack_writes()
1126            if self._current_bblock.state() != _BufferBlock.DELETED:
1127                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1128
1129        if sync:
1130            to_delete = set()
1131            for s in self._segments:
1132                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1133                if bb:
1134                    if bb.state() != _BufferBlock.COMMITTED:
1135                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1136                    to_delete.add(s.locator)
1137                    s.locator = bb.locator()
1138            for s in to_delete:
1139                # Don't delete the bufferblock if it's owned by many files. It'll be
1140                # deleted after all of its owners are flush()ed.
1141                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1142                    self.parent._my_block_manager().delete_bufferblock(s)
1143
1144        self.parent.notify(MOD, self.parent, self.name, (self, self))
1145
1146    @must_be_writable
1147    @synchronized
1148    def add_segment(self, blocks, pos, size):
1149        """Add a segment to the end of the file.
1150
1151        `pos` and `offset` reference a section of the stream described by
1152        `blocks` (a list of Range objects)
1153
1154        """
1155        self._add_segment(blocks, pos, size)
1156
1157    def _add_segment(self, blocks, pos, size):
1158        """Internal implementation of add_segment."""
1159        self.set_committed(False)
1160        for lr in streams.locators_and_ranges(blocks, pos, size):
1161            last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0)
1162            r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1163            self._segments.append(r)
1164
1165    @synchronized
1166    def size(self):
1167        """Get the file size."""
1168        if self._segments:
1169            n = self._segments[-1]
1170            return n.range_start + n.range_size
1171        else:
1172            return 0
1173
1174    @synchronized
1175    def manifest_text(self, stream_name=".", portable_locators=False,
1176                      normalize=False, only_committed=False):
1177        buf = ""
1178        filestream = []
1179        for segment in self._segments:
1180            loc = segment.locator
1181            if self.parent._my_block_manager().is_bufferblock(loc):
1182                if only_committed:
1183                    continue
1184                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1185            if portable_locators:
1186                loc = KeepLocator(loc).stripped()
1187            filestream.append(streams.LocatorAndRange(
1188                loc,
1189                KeepLocator(loc).size,
1190                segment.segment_offset,
1191                segment.range_size,
1192            ))
1193        buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1194        buf += "\n"
1195        return buf
1196
1197    @must_be_writable
1198    @synchronized
1199    def _reparent(self, newparent, newname):
1200        self.set_committed(False)
1201        self.flush(sync=True)
1202        self.parent.remove(self.name)
1203        self.parent = newparent
1204        self.name = newname
1205        self.lock = self.parent.root_collection().lock
1206
1207
1208class ArvadosFileReader(ArvadosFileReaderBase):
1209    """Wraps ArvadosFile in a file-like object supporting reading only.
1210
1211    Be aware that this class is NOT thread safe as there is no locking around
1212    updating file pointer.
1213
1214    """
1215
1216    def __init__(self, arvadosfile, mode="r", num_retries=None):
1217        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1218        self.arvadosfile = arvadosfile
1219
1220    def size(self):
1221        return self.arvadosfile.size()
1222
1223    def stream_name(self):
1224        return self.arvadosfile.parent.stream_name()
1225
1226    def readinto(self, b):
1227        data = self.read(len(b))
1228        b[:len(data)] = data
1229        return len(data)
1230
1231    @_FileLikeObjectBase._before_close
1232    @retry_method
1233    def read(self, size=-1, num_retries=None, return_memoryview=False):
1234        """Read up to `size` bytes from the file and return the result.
1235
1236        Starts at the current file position.  If `size` is negative or None,
1237        read the entire remainder of the file.
1238
1239        Returns None if the file pointer is at the end of the file.
1240
1241        Returns a `bytes` object, unless `return_memoryview` is True,
1242        in which case it returns a memory view, which may avoid an
1243        unnecessary data copy in some situations.
1244
1245        """
1246        if size < 0 or size is None:
1247            data = []
1248            #
1249            # specify exact=False, return_memoryview=True here so that we
1250            # only copy data once into the final buffer.
1251            #
1252            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1253            while rd:
1254                data.append(rd)
1255                self._filepos += len(rd)
1256                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1257            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1258        else:
1259            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1260            self._filepos += len(data)
1261            return data
1262
1263    @_FileLikeObjectBase._before_close
1264    @retry_method
1265    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1266        """Read up to `size` bytes from the stream, starting at the specified file offset.
1267
1268        This method does not change the file position.
1269
1270        Returns a `bytes` object, unless `return_memoryview` is True,
1271        in which case it returns a memory view, which may avoid an
1272        unnecessary data copy in some situations.
1273
1274        """
1275        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1276
1277    def flush(self):
1278        pass
1279
1280
1281class ArvadosFileWriter(ArvadosFileReader):
1282    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1283
1284    Be aware that this class is NOT thread safe as there is no locking around
1285    updating file pointer.
1286
1287    """
1288
1289    def __init__(self, arvadosfile, mode, num_retries=None):
1290        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1291        self.arvadosfile.add_writer(self)
1292
1293    def writable(self):
1294        return True
1295
1296    @_FileLikeObjectBase._before_close
1297    @retry_method
1298    def write(self, data, num_retries=None):
1299        if self.mode[0] == "a":
1300            self._filepos = self.size()
1301        self.arvadosfile.writeto(self._filepos, data, num_retries)
1302        self._filepos += len(data)
1303        return len(data)
1304
1305    @_FileLikeObjectBase._before_close
1306    @retry_method
1307    def writelines(self, seq, num_retries=None):
1308        for s in seq:
1309            self.write(s, num_retries=num_retries)
1310
1311    @_FileLikeObjectBase._before_close
1312    def truncate(self, size=None):
1313        if size is None:
1314            size = self._filepos
1315        self.arvadosfile.truncate(size)
1316
1317    @_FileLikeObjectBase._before_close
1318    def flush(self):
1319        self.arvadosfile.flush()
1320
1321    def close(self, flush=True):
1322        if not self.closed:
1323            self.arvadosfile.remove_writer(self, flush)
1324            super(ArvadosFileWriter, self).close()
1325
1326
1327class WrappableFile(object):
1328    """An interface to an Arvados file that's compatible with io wrappers.
1329
1330    """
1331    def __init__(self, f):
1332        self.f = f
1333        self.closed = False
1334    def close(self):
1335        self.closed = True
1336        return self.f.close()
1337    def flush(self):
1338        return self.f.flush()
1339    def read(self, *args, **kwargs):
1340        return self.f.read(*args, **kwargs)
1341    def readable(self):
1342        return self.f.readable()
1343    def readinto(self, *args, **kwargs):
1344        return self.f.readinto(*args, **kwargs)
1345    def seek(self, *args, **kwargs):
1346        return self.f.seek(*args, **kwargs)
1347    def seekable(self):
1348        return self.f.seekable()
1349    def tell(self):
1350        return self.f.tell()
1351    def writable(self):
1352        return self.f.writable()
1353    def write(self, *args, **kwargs):
1354        return self.f.write(*args, **kwargs)
ADD = 'add'

Argument value for Collection methods to represent an added item

DEL = 'del'

Argument value for Collection methods to represent a removed item

MOD = 'mod'

Argument value for Collection methods to represent a modified item

TOK = 'tok'

Argument value for Collection methods to represent an item with token differences

WRITE = 'write'

Argument value for Collection methods to represent that a file was written to

def split(path):
40def split(path):
41    """split(path) -> streamname, filename
42
43    Separate the stream name and file name in a /-separated stream path and
44    return a tuple (stream_name, file_name).  If no stream name is available,
45    assume '.'.
46
47    """
48    try:
49        stream_name, file_name = path.rsplit('/', 1)
50    except ValueError:  # No / in string
51        stream_name, file_name = '.', path
52    return stream_name, file_name

split(path) -> streamname, filename

Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.

class UnownedBlockError(builtins.Exception):
55class UnownedBlockError(Exception):
56    """Raised when there's an writable block without an owner on the BlockManager."""
57    pass

Raised when there’s an writable block without an owner on the BlockManager.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvadosFileReaderBase(_FileLikeObjectBase):
 89class ArvadosFileReaderBase(_FileLikeObjectBase):
 90    def __init__(self, name, mode, num_retries=None):
 91        super(ArvadosFileReaderBase, self).__init__(name, mode)
 92        self._filepos = 0
 93        self.num_retries = num_retries
 94        self._readline_cache = (None, None)
 95
 96    def __iter__(self):
 97        while True:
 98            data = self.readline()
 99            if not data:
100                break
101            yield data
102
103    def decompressed_name(self):
104        return re.sub(r'\.(bz2|gz)$', '', self.name)
105
106    @_FileLikeObjectBase._before_close
107    def seek(self, pos, whence=os.SEEK_SET):
108        if whence == os.SEEK_CUR:
109            pos += self._filepos
110        elif whence == os.SEEK_END:
111            pos += self.size()
112        if pos < 0:
113            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
114        self._filepos = pos
115        return self._filepos
116
117    def tell(self):
118        return self._filepos
119
120    def readable(self):
121        return True
122
123    def writable(self):
124        return False
125
126    def seekable(self):
127        return True
128
129    @_FileLikeObjectBase._before_close
130    @retry_method
131    def readall(self, size=2**20, num_retries=None):
132        while True:
133            data = self.read(size, num_retries=num_retries)
134            if len(data) == 0:
135                break
136            yield data
137
138    @_FileLikeObjectBase._before_close
139    @retry_method
140    def readline(self, size=float('inf'), num_retries=None):
141        cache_pos, cache_data = self._readline_cache
142        if self.tell() == cache_pos:
143            data = [cache_data]
144            self._filepos += len(cache_data)
145        else:
146            data = [b'']
147        data_size = len(data[-1])
148        while (data_size < size) and (b'\n' not in data[-1]):
149            next_read = self.read(2 ** 20, num_retries=num_retries)
150            if not next_read:
151                break
152            data.append(next_read)
153            data_size += len(next_read)
154        data = b''.join(data)
155        try:
156            nextline_index = data.index(b'\n') + 1
157        except ValueError:
158            nextline_index = len(data)
159        nextline_index = min(nextline_index, size)
160        self._filepos -= len(data) - nextline_index
161        self._readline_cache = (self.tell(), data[nextline_index:])
162        return data[:nextline_index].decode()
163
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def decompress(self, decompress, size, num_retries=None):
167        for segment in self.readall(size, num_retries=num_retries):
168            data = decompress(segment)
169            if data:
170                yield data
171
172    @_FileLikeObjectBase._before_close
173    @retry_method
174    def readall_decompressed(self, size=2**20, num_retries=None):
175        self.seek(0)
176        if self.name.endswith('.bz2'):
177            dc = bz2.BZ2Decompressor()
178            return self.decompress(dc.decompress, size,
179                                   num_retries=num_retries)
180        elif self.name.endswith('.gz'):
181            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
182            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
183                                   size, num_retries=num_retries)
184        else:
185            return self.readall(size, num_retries=num_retries)
186
187    @_FileLikeObjectBase._before_close
188    @retry_method
189    def readlines(self, sizehint=float('inf'), num_retries=None):
190        data = []
191        data_size = 0
192        for s in self.readall(num_retries=num_retries):
193            data.append(s)
194            data_size += len(s)
195            if data_size >= sizehint:
196                break
197        return b''.join(data).decode().splitlines(True)
198
199    def size(self):
200        raise IOError(errno.ENOSYS, "Not implemented")
201
202    def read(self, size, num_retries=None):
203        raise IOError(errno.ENOSYS, "Not implemented")
204
205    def readfrom(self, start, size, num_retries=None):
206        raise IOError(errno.ENOSYS, "Not implemented")
ArvadosFileReaderBase(name, mode, num_retries=None)
90    def __init__(self, name, mode, num_retries=None):
91        super(ArvadosFileReaderBase, self).__init__(name, mode)
92        self._filepos = 0
93        self.num_retries = num_retries
94        self._readline_cache = (None, None)
num_retries
def decompressed_name(self):
103    def decompressed_name(self):
104        return re.sub(r'\.(bz2|gz)$', '', self.name)
def seek(self, pos, whence=0):
106    @_FileLikeObjectBase._before_close
107    def seek(self, pos, whence=os.SEEK_SET):
108        if whence == os.SEEK_CUR:
109            pos += self._filepos
110        elif whence == os.SEEK_END:
111            pos += self.size()
112        if pos < 0:
113            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
114        self._filepos = pos
115        return self._filepos
def tell(self):
117    def tell(self):
118        return self._filepos
def readable(self):
120    def readable(self):
121        return True
def writable(self):
123    def writable(self):
124        return False
def seekable(self):
126    def seekable(self):
127        return True
@retry_method
def readall(self, size=1048576, num_retries=None):
129    @_FileLikeObjectBase._before_close
130    @retry_method
131    def readall(self, size=2**20, num_retries=None):
132        while True:
133            data = self.read(size, num_retries=num_retries)
134            if len(data) == 0:
135                break
136            yield data
@retry_method
def readline(self, size=inf, num_retries=None):
138    @_FileLikeObjectBase._before_close
139    @retry_method
140    def readline(self, size=float('inf'), num_retries=None):
141        cache_pos, cache_data = self._readline_cache
142        if self.tell() == cache_pos:
143            data = [cache_data]
144            self._filepos += len(cache_data)
145        else:
146            data = [b'']
147        data_size = len(data[-1])
148        while (data_size < size) and (b'\n' not in data[-1]):
149            next_read = self.read(2 ** 20, num_retries=num_retries)
150            if not next_read:
151                break
152            data.append(next_read)
153            data_size += len(next_read)
154        data = b''.join(data)
155        try:
156            nextline_index = data.index(b'\n') + 1
157        except ValueError:
158            nextline_index = len(data)
159        nextline_index = min(nextline_index, size)
160        self._filepos -= len(data) - nextline_index
161        self._readline_cache = (self.tell(), data[nextline_index:])
162        return data[:nextline_index].decode()
@retry_method
def decompress(self, decompress, size, num_retries=None):
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def decompress(self, decompress, size, num_retries=None):
167        for segment in self.readall(size, num_retries=num_retries):
168            data = decompress(segment)
169            if data:
170                yield data
@retry_method
def readall_decompressed(self, size=1048576, num_retries=None):
172    @_FileLikeObjectBase._before_close
173    @retry_method
174    def readall_decompressed(self, size=2**20, num_retries=None):
175        self.seek(0)
176        if self.name.endswith('.bz2'):
177            dc = bz2.BZ2Decompressor()
178            return self.decompress(dc.decompress, size,
179                                   num_retries=num_retries)
180        elif self.name.endswith('.gz'):
181            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
182            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
183                                   size, num_retries=num_retries)
184        else:
185            return self.readall(size, num_retries=num_retries)
@retry_method
def readlines(self, sizehint=inf, num_retries=None):
187    @_FileLikeObjectBase._before_close
188    @retry_method
189    def readlines(self, sizehint=float('inf'), num_retries=None):
190        data = []
191        data_size = 0
192        for s in self.readall(num_retries=num_retries):
193            data.append(s)
194            data_size += len(s)
195            if data_size >= sizehint:
196                break
197        return b''.join(data).decode().splitlines(True)
def size(self):
199    def size(self):
200        raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
202    def read(self, size, num_retries=None):
203        raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
205    def readfrom(self, start, size, num_retries=None):
206        raise IOError(errno.ENOSYS, "Not implemented")
def synchronized(orig_func):
209def synchronized(orig_func):
210    @functools.wraps(orig_func)
211    def synchronized_wrapper(self, *args, **kwargs):
212        with self.lock:
213            return orig_func(self, *args, **kwargs)
214    return synchronized_wrapper
class StateChangeError(builtins.Exception):
217class StateChangeError(Exception):
218    def __init__(self, message, state, nextstate):
219        super(StateChangeError, self).__init__(message)
220        self.state = state
221        self.nextstate = nextstate

Common base class for all non-exit exceptions.

StateChangeError(message, state, nextstate)
218    def __init__(self, message, state, nextstate):
219        super(StateChangeError, self).__init__(message)
220        self.state = state
221        self.nextstate = nextstate
state
nextstate
Inherited Members
builtins.BaseException
with_traceback
args
class NoopLock:
393class NoopLock(object):
394    def __enter__(self):
395        return self
396
397    def __exit__(self, exc_type, exc_value, traceback):
398        pass
399
400    def acquire(self, blocking=False):
401        pass
402
403    def release(self):
404        pass
def acquire(self, blocking=False):
400    def acquire(self, blocking=False):
401        pass
def release(self):
403    def release(self):
404        pass
def must_be_writable(orig_func):
407def must_be_writable(orig_func):
408    @functools.wraps(orig_func)
409    def must_be_writable_wrapper(self, *args, **kwargs):
410        if not self.writable():
411            raise IOError(errno.EROFS, "Collection is read-only.")
412        return orig_func(self, *args, **kwargs)
413    return must_be_writable_wrapper
class ArvadosFile:
 759class ArvadosFile(object):
 760    """Represent a file in a Collection.
 761
 762    ArvadosFile manages the underlying representation of a file in Keep as a
 763    sequence of segments spanning a set of blocks, and implements random
 764    read/write access.
 765
 766    This object may be accessed from multiple threads.
 767
 768    """
 769
 770    __slots__ = ('parent', 'name', '_writers', '_committed',
 771                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 772
 773    def __init__(self, parent, name, stream=[], segments=[]):
 774        """
 775        ArvadosFile constructor.
 776
 777        :stream:
 778          a list of Range objects representing a block stream
 779
 780        :segments:
 781          a list of Range objects representing segments
 782        """
 783        self.parent = parent
 784        self.name = name
 785        self._writers = set()
 786        self._committed = False
 787        self._segments = []
 788        self.lock = parent.root_collection().lock
 789        for s in segments:
 790            self._add_segment(stream, s.locator, s.range_size)
 791        self._current_bblock = None
 792        self._read_counter = 0
 793
 794    def writable(self):
 795        return self.parent.writable()
 796
 797    @synchronized
 798    def permission_expired(self, as_of_dt=None):
 799        """Returns True if any of the segment's locators is expired"""
 800        for r in self._segments:
 801            if KeepLocator(r.locator).permission_expired(as_of_dt):
 802                return True
 803        return False
 804
 805    @synchronized
 806    def has_remote_blocks(self):
 807        """Returns True if any of the segment's locators has a +R signature"""
 808
 809        for s in self._segments:
 810            if '+R' in s.locator:
 811                return True
 812        return False
 813
 814    @synchronized
 815    def _copy_remote_blocks(self, remote_blocks={}):
 816        """Ask Keep to copy remote blocks and point to their local copies.
 817
 818        This is called from the parent Collection.
 819
 820        :remote_blocks:
 821            Shared cache of remote to local block mappings. This is used to avoid
 822            doing extra work when blocks are shared by more than one file in
 823            different subdirectories.
 824        """
 825
 826        for s in self._segments:
 827            if '+R' in s.locator:
 828                try:
 829                    loc = remote_blocks[s.locator]
 830                except KeyError:
 831                    loc = self.parent._my_keep().refresh_signature(s.locator)
 832                    remote_blocks[s.locator] = loc
 833                s.locator = loc
 834                self.parent.set_committed(False)
 835        return remote_blocks
 836
 837    @synchronized
 838    def segments(self):
 839        return copy.copy(self._segments)
 840
 841    @synchronized
 842    def clone(self, new_parent, new_name):
 843        """Make a copy of this file."""
 844        cp = ArvadosFile(new_parent, new_name)
 845        cp.replace_contents(self)
 846        return cp
 847
 848    @must_be_writable
 849    @synchronized
 850    def replace_contents(self, other):
 851        """Replace segments of this file with segments from another `ArvadosFile` object."""
 852
 853        eventtype = TOK if self == other else MOD
 854
 855        map_loc = {}
 856        self._segments = []
 857        for other_segment in other.segments():
 858            new_loc = other_segment.locator
 859            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 860                if other_segment.locator not in map_loc:
 861                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 862                    if bufferblock.state() != _BufferBlock.WRITABLE:
 863                        map_loc[other_segment.locator] = bufferblock.locator()
 864                    else:
 865                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 866                new_loc = map_loc[other_segment.locator]
 867
 868            self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 869
 870        self.set_committed(False)
 871        self.parent.notify(eventtype, self.parent, self.name, (self, self))
 872
 873    def __eq__(self, other):
 874        if other is self:
 875            return True
 876        if not isinstance(other, ArvadosFile):
 877            return False
 878
 879        othersegs = other.segments()
 880        with self.lock:
 881            if len(self._segments) != len(othersegs):
 882                return False
 883            for i in range(0, len(othersegs)):
 884                seg1 = self._segments[i]
 885                seg2 = othersegs[i]
 886                loc1 = seg1.locator
 887                loc2 = seg2.locator
 888
 889                if self.parent._my_block_manager().is_bufferblock(loc1):
 890                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 891
 892                if other.parent._my_block_manager().is_bufferblock(loc2):
 893                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 894
 895                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 896                    seg1.range_start != seg2.range_start or
 897                    seg1.range_size != seg2.range_size or
 898                    seg1.segment_offset != seg2.segment_offset):
 899                    return False
 900
 901        return True
 902
 903    def __ne__(self, other):
 904        return not self.__eq__(other)
 905
 906    @synchronized
 907    def set_segments(self, segs):
 908        self._segments = segs
 909
 910    @synchronized
 911    def set_committed(self, value=True):
 912        """Set committed flag.
 913
 914        If value is True, set committed to be True.
 915
 916        If value is False, set committed to be False for this and all parents.
 917        """
 918        if value == self._committed:
 919            return
 920        self._committed = value
 921        if self._committed is False and self.parent is not None:
 922            self.parent.set_committed(False)
 923
 924    @synchronized
 925    def committed(self):
 926        """Get whether this is committed or not."""
 927        return self._committed
 928
 929    @synchronized
 930    def add_writer(self, writer):
 931        """Add an ArvadosFileWriter reference to the list of writers"""
 932        if isinstance(writer, ArvadosFileWriter):
 933            self._writers.add(writer)
 934
 935    @synchronized
 936    def remove_writer(self, writer, flush):
 937        """
 938        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 939        and do some block maintenance tasks.
 940        """
 941        self._writers.remove(writer)
 942
 943        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 944            # File writer closed, not small enough for repacking
 945            self.flush()
 946        elif self.closed():
 947            # All writers closed and size is adequate for repacking
 948            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 949
 950    def closed(self):
 951        """
 952        Get whether this is closed or not. When the writers list is empty, the file
 953        is supposed to be closed.
 954        """
 955        return len(self._writers) == 0
 956
 957    @must_be_writable
 958    @synchronized
 959    def truncate(self, size):
 960        """Shrink or expand the size of the file.
 961
 962        If `size` is less than the size of the file, the file contents after
 963        `size` will be discarded.  If `size` is greater than the current size
 964        of the file, it will be filled with zero bytes.
 965
 966        """
 967        if size < self.size():
 968            new_segs = []
 969            for r in self._segments:
 970                range_end = r.range_start+r.range_size
 971                if r.range_start >= size:
 972                    # segment is past the trucate size, all done
 973                    break
 974                elif size < range_end:
 975                    nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
 976                    nr.segment_offset = r.segment_offset
 977                    new_segs.append(nr)
 978                    break
 979                else:
 980                    new_segs.append(r)
 981
 982            self._segments = new_segs
 983            self.set_committed(False)
 984        elif size > self.size():
 985            padding = self.parent._my_block_manager().get_padding_block()
 986            diff = size - self.size()
 987            while diff > config.KEEP_BLOCK_SIZE:
 988                self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
 989                diff -= config.KEEP_BLOCK_SIZE
 990            if diff > 0:
 991                self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
 992            self.set_committed(False)
 993        else:
 994            # size == self.size()
 995            pass
 996
 997    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
 998        """Read up to `size` bytes from the file starting at `offset`.
 999
1000        Arguments:
1001
1002        * exact: bool --- If False (default), return less data than
1003         requested if the read crosses a block boundary and the next
1004         block isn't cached.  If True, only return less data than
1005         requested when hitting EOF.
1006
1007        * return_memoryview: bool --- If False (default) return a
1008          `bytes` object, which may entail making a copy in some
1009          situations.  If True, return a `memoryview` object which may
1010          avoid making a copy, but may be incompatible with code
1011          expecting a `bytes` object.
1012
1013        """
1014
1015        with self.lock:
1016            if size == 0 or offset >= self.size():
1017                return memoryview(b'') if return_memoryview else b''
1018            readsegs = streams.locators_and_ranges(self._segments, offset, size)
1019
1020            prefetch = None
1021            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1022            if prefetch_lookahead:
1023                # Doing prefetch on every read() call is surprisingly expensive
1024                # when we're trying to deliver data at 600+ MiBps and want
1025                # the read() fast path to be as lightweight as possible.
1026                #
1027                # Only prefetching every 128 read operations
1028                # dramatically reduces the overhead while still
1029                # getting the benefit of prefetching (e.g. when
1030                # reading 128 KiB at a time, it checks for prefetch
1031                # every 16 MiB).
1032                self._read_counter = (self._read_counter+1) % 128
1033                if self._read_counter == 1:
1034                    prefetch = streams.locators_and_ranges(
1035                        self._segments,
1036                        offset + size,
1037                        config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1038                        limit=(1+prefetch_lookahead),
1039                    )
1040
1041        locs = set()
1042        data = []
1043        for lr in readsegs:
1044            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1045            if block:
1046                blockview = memoryview(block)
1047                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1048                locs.add(lr.locator)
1049            else:
1050                break
1051
1052        if prefetch:
1053            for lr in prefetch:
1054                if lr.locator not in locs:
1055                    self.parent._my_block_manager().block_prefetch(lr.locator)
1056                    locs.add(lr.locator)
1057
1058        if len(data) == 1:
1059            return data[0] if return_memoryview else data[0].tobytes()
1060        else:
1061            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1062
1063
1064    @must_be_writable
1065    @synchronized
1066    def writeto(self, offset, data, num_retries):
1067        """Write `data` to the file starting at `offset`.
1068
1069        This will update existing bytes and/or extend the size of the file as
1070        necessary.
1071
1072        """
1073        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1074            data = data.encode()
1075        if len(data) == 0:
1076            return
1077
1078        if offset > self.size():
1079            self.truncate(offset)
1080
1081        if len(data) > config.KEEP_BLOCK_SIZE:
1082            # Chunk it up into smaller writes
1083            n = 0
1084            dataview = memoryview(data)
1085            while n < len(data):
1086                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1087                n += config.KEEP_BLOCK_SIZE
1088            return
1089
1090        self.set_committed(False)
1091
1092        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1093            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1094
1095        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1096            self._current_bblock.repack_writes()
1097            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1098                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1099                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1100
1101        self._current_bblock.append(data)
1102        streams.replace_range(
1103            self._segments,
1104            offset,
1105            len(data),
1106            self._current_bblock.blockid,
1107            self._current_bblock.write_pointer - len(data),
1108        )
1109        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1110        return len(data)
1111
1112    @synchronized
1113    def flush(self, sync=True, num_retries=0):
1114        """Flush the current bufferblock to Keep.
1115
1116        :sync:
1117          If True, commit block synchronously, wait until buffer block has been written.
1118          If False, commit block asynchronously, return immediately after putting block into
1119          the keep put queue.
1120        """
1121        if self.committed():
1122            return
1123
1124        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1125            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1126                self._current_bblock.repack_writes()
1127            if self._current_bblock.state() != _BufferBlock.DELETED:
1128                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1129
1130        if sync:
1131            to_delete = set()
1132            for s in self._segments:
1133                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1134                if bb:
1135                    if bb.state() != _BufferBlock.COMMITTED:
1136                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1137                    to_delete.add(s.locator)
1138                    s.locator = bb.locator()
1139            for s in to_delete:
1140                # Don't delete the bufferblock if it's owned by many files. It'll be
1141                # deleted after all of its owners are flush()ed.
1142                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1143                    self.parent._my_block_manager().delete_bufferblock(s)
1144
1145        self.parent.notify(MOD, self.parent, self.name, (self, self))
1146
1147    @must_be_writable
1148    @synchronized
1149    def add_segment(self, blocks, pos, size):
1150        """Add a segment to the end of the file.
1151
1152        `pos` and `offset` reference a section of the stream described by
1153        `blocks` (a list of Range objects)
1154
1155        """
1156        self._add_segment(blocks, pos, size)
1157
1158    def _add_segment(self, blocks, pos, size):
1159        """Internal implementation of add_segment."""
1160        self.set_committed(False)
1161        for lr in streams.locators_and_ranges(blocks, pos, size):
1162            last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0)
1163            r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1164            self._segments.append(r)
1165
1166    @synchronized
1167    def size(self):
1168        """Get the file size."""
1169        if self._segments:
1170            n = self._segments[-1]
1171            return n.range_start + n.range_size
1172        else:
1173            return 0
1174
1175    @synchronized
1176    def manifest_text(self, stream_name=".", portable_locators=False,
1177                      normalize=False, only_committed=False):
1178        buf = ""
1179        filestream = []
1180        for segment in self._segments:
1181            loc = segment.locator
1182            if self.parent._my_block_manager().is_bufferblock(loc):
1183                if only_committed:
1184                    continue
1185                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1186            if portable_locators:
1187                loc = KeepLocator(loc).stripped()
1188            filestream.append(streams.LocatorAndRange(
1189                loc,
1190                KeepLocator(loc).size,
1191                segment.segment_offset,
1192                segment.range_size,
1193            ))
1194        buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1195        buf += "\n"
1196        return buf
1197
1198    @must_be_writable
1199    @synchronized
1200    def _reparent(self, newparent, newname):
1201        self.set_committed(False)
1202        self.flush(sync=True)
1203        self.parent.remove(self.name)
1204        self.parent = newparent
1205        self.name = newname
1206        self.lock = self.parent.root_collection().lock

Represent a file in a Collection.

ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.

This object may be accessed from multiple threads.

ArvadosFile(parent, name, stream=[], segments=[])
773    def __init__(self, parent, name, stream=[], segments=[]):
774        """
775        ArvadosFile constructor.
776
777        :stream:
778          a list of Range objects representing a block stream
779
780        :segments:
781          a list of Range objects representing segments
782        """
783        self.parent = parent
784        self.name = name
785        self._writers = set()
786        self._committed = False
787        self._segments = []
788        self.lock = parent.root_collection().lock
789        for s in segments:
790            self._add_segment(stream, s.locator, s.range_size)
791        self._current_bblock = None
792        self._read_counter = 0

ArvadosFile constructor.

:stream: a list of Range objects representing a block stream

:segments: a list of Range objects representing segments

parent
name
lock
def writable(self):
794    def writable(self):
795        return self.parent.writable()
@synchronized
def permission_expired(self, as_of_dt=None):
797    @synchronized
798    def permission_expired(self, as_of_dt=None):
799        """Returns True if any of the segment's locators is expired"""
800        for r in self._segments:
801            if KeepLocator(r.locator).permission_expired(as_of_dt):
802                return True
803        return False

Returns True if any of the segment’s locators is expired

@synchronized
def has_remote_blocks(self):
805    @synchronized
806    def has_remote_blocks(self):
807        """Returns True if any of the segment's locators has a +R signature"""
808
809        for s in self._segments:
810            if '+R' in s.locator:
811                return True
812        return False

Returns True if any of the segment’s locators has a +R signature

@synchronized
def segments(self):
837    @synchronized
838    def segments(self):
839        return copy.copy(self._segments)
@synchronized
def clone(self, new_parent, new_name):
841    @synchronized
842    def clone(self, new_parent, new_name):
843        """Make a copy of this file."""
844        cp = ArvadosFile(new_parent, new_name)
845        cp.replace_contents(self)
846        return cp

Make a copy of this file.

@must_be_writable
@synchronized
def replace_contents(self, other):
848    @must_be_writable
849    @synchronized
850    def replace_contents(self, other):
851        """Replace segments of this file with segments from another `ArvadosFile` object."""
852
853        eventtype = TOK if self == other else MOD
854
855        map_loc = {}
856        self._segments = []
857        for other_segment in other.segments():
858            new_loc = other_segment.locator
859            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
860                if other_segment.locator not in map_loc:
861                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
862                    if bufferblock.state() != _BufferBlock.WRITABLE:
863                        map_loc[other_segment.locator] = bufferblock.locator()
864                    else:
865                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
866                new_loc = map_loc[other_segment.locator]
867
868            self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
869
870        self.set_committed(False)
871        self.parent.notify(eventtype, self.parent, self.name, (self, self))

Replace segments of this file with segments from another ArvadosFile object.

@synchronized
def set_segments(self, segs):
906    @synchronized
907    def set_segments(self, segs):
908        self._segments = segs
@synchronized
def set_committed(self, value=True):
910    @synchronized
911    def set_committed(self, value=True):
912        """Set committed flag.
913
914        If value is True, set committed to be True.
915
916        If value is False, set committed to be False for this and all parents.
917        """
918        if value == self._committed:
919            return
920        self._committed = value
921        if self._committed is False and self.parent is not None:
922            self.parent.set_committed(False)

Set committed flag.

If value is True, set committed to be True.

If value is False, set committed to be False for this and all parents.

@synchronized
def committed(self):
924    @synchronized
925    def committed(self):
926        """Get whether this is committed or not."""
927        return self._committed

Get whether this is committed or not.

@synchronized
def add_writer(self, writer):
929    @synchronized
930    def add_writer(self, writer):
931        """Add an ArvadosFileWriter reference to the list of writers"""
932        if isinstance(writer, ArvadosFileWriter):
933            self._writers.add(writer)

Add an ArvadosFileWriter reference to the list of writers

@synchronized
def remove_writer(self, writer, flush):
935    @synchronized
936    def remove_writer(self, writer, flush):
937        """
938        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
939        and do some block maintenance tasks.
940        """
941        self._writers.remove(writer)
942
943        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
944            # File writer closed, not small enough for repacking
945            self.flush()
946        elif self.closed():
947            # All writers closed and size is adequate for repacking
948            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())

Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.

def closed(self):
950    def closed(self):
951        """
952        Get whether this is closed or not. When the writers list is empty, the file
953        is supposed to be closed.
954        """
955        return len(self._writers) == 0

Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.

@must_be_writable
@synchronized
def truncate(self, size):
957    @must_be_writable
958    @synchronized
959    def truncate(self, size):
960        """Shrink or expand the size of the file.
961
962        If `size` is less than the size of the file, the file contents after
963        `size` will be discarded.  If `size` is greater than the current size
964        of the file, it will be filled with zero bytes.
965
966        """
967        if size < self.size():
968            new_segs = []
969            for r in self._segments:
970                range_end = r.range_start+r.range_size
971                if r.range_start >= size:
972                    # segment is past the trucate size, all done
973                    break
974                elif size < range_end:
975                    nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0)
976                    nr.segment_offset = r.segment_offset
977                    new_segs.append(nr)
978                    break
979                else:
980                    new_segs.append(r)
981
982            self._segments = new_segs
983            self.set_committed(False)
984        elif size > self.size():
985            padding = self.parent._my_block_manager().get_padding_block()
986            diff = size - self.size()
987            while diff > config.KEEP_BLOCK_SIZE:
988                self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
989                diff -= config.KEEP_BLOCK_SIZE
990            if diff > 0:
991                self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0))
992            self.set_committed(False)
993        else:
994            # size == self.size()
995            pass

Shrink or expand the size of the file.

If size is less than the size of the file, the file contents after size will be discarded. If size is greater than the current size of the file, it will be filled with zero bytes.

def readfrom( self, offset, size, num_retries, exact=False, return_memoryview=False):
 997    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
 998        """Read up to `size` bytes from the file starting at `offset`.
 999
1000        Arguments:
1001
1002        * exact: bool --- If False (default), return less data than
1003         requested if the read crosses a block boundary and the next
1004         block isn't cached.  If True, only return less data than
1005         requested when hitting EOF.
1006
1007        * return_memoryview: bool --- If False (default) return a
1008          `bytes` object, which may entail making a copy in some
1009          situations.  If True, return a `memoryview` object which may
1010          avoid making a copy, but may be incompatible with code
1011          expecting a `bytes` object.
1012
1013        """
1014
1015        with self.lock:
1016            if size == 0 or offset >= self.size():
1017                return memoryview(b'') if return_memoryview else b''
1018            readsegs = streams.locators_and_ranges(self._segments, offset, size)
1019
1020            prefetch = None
1021            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1022            if prefetch_lookahead:
1023                # Doing prefetch on every read() call is surprisingly expensive
1024                # when we're trying to deliver data at 600+ MiBps and want
1025                # the read() fast path to be as lightweight as possible.
1026                #
1027                # Only prefetching every 128 read operations
1028                # dramatically reduces the overhead while still
1029                # getting the benefit of prefetching (e.g. when
1030                # reading 128 KiB at a time, it checks for prefetch
1031                # every 16 MiB).
1032                self._read_counter = (self._read_counter+1) % 128
1033                if self._read_counter == 1:
1034                    prefetch = streams.locators_and_ranges(
1035                        self._segments,
1036                        offset + size,
1037                        config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1038                        limit=(1+prefetch_lookahead),
1039                    )
1040
1041        locs = set()
1042        data = []
1043        for lr in readsegs:
1044            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1045            if block:
1046                blockview = memoryview(block)
1047                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1048                locs.add(lr.locator)
1049            else:
1050                break
1051
1052        if prefetch:
1053            for lr in prefetch:
1054                if lr.locator not in locs:
1055                    self.parent._my_block_manager().block_prefetch(lr.locator)
1056                    locs.add(lr.locator)
1057
1058        if len(data) == 1:
1059            return data[0] if return_memoryview else data[0].tobytes()
1060        else:
1061            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)

Read up to size bytes from the file starting at offset.

Arguments:

  • exact: bool — If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.

  • return_memoryview: bool — If False (default) return a bytes object, which may entail making a copy in some situations. If True, return a memoryview object which may avoid making a copy, but may be incompatible with code expecting a bytes object.

@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
1064    @must_be_writable
1065    @synchronized
1066    def writeto(self, offset, data, num_retries):
1067        """Write `data` to the file starting at `offset`.
1068
1069        This will update existing bytes and/or extend the size of the file as
1070        necessary.
1071
1072        """
1073        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1074            data = data.encode()
1075        if len(data) == 0:
1076            return
1077
1078        if offset > self.size():
1079            self.truncate(offset)
1080
1081        if len(data) > config.KEEP_BLOCK_SIZE:
1082            # Chunk it up into smaller writes
1083            n = 0
1084            dataview = memoryview(data)
1085            while n < len(data):
1086                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1087                n += config.KEEP_BLOCK_SIZE
1088            return
1089
1090        self.set_committed(False)
1091
1092        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1093            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1094
1095        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1096            self._current_bblock.repack_writes()
1097            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1098                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1099                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1100
1101        self._current_bblock.append(data)
1102        streams.replace_range(
1103            self._segments,
1104            offset,
1105            len(data),
1106            self._current_bblock.blockid,
1107            self._current_bblock.write_pointer - len(data),
1108        )
1109        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1110        return len(data)

Write data to the file starting at offset.

This will update existing bytes and/or extend the size of the file as necessary.

@synchronized
def flush(self, sync=True, num_retries=0):
1112    @synchronized
1113    def flush(self, sync=True, num_retries=0):
1114        """Flush the current bufferblock to Keep.
1115
1116        :sync:
1117          If True, commit block synchronously, wait until buffer block has been written.
1118          If False, commit block asynchronously, return immediately after putting block into
1119          the keep put queue.
1120        """
1121        if self.committed():
1122            return
1123
1124        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1125            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1126                self._current_bblock.repack_writes()
1127            if self._current_bblock.state() != _BufferBlock.DELETED:
1128                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1129
1130        if sync:
1131            to_delete = set()
1132            for s in self._segments:
1133                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1134                if bb:
1135                    if bb.state() != _BufferBlock.COMMITTED:
1136                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1137                    to_delete.add(s.locator)
1138                    s.locator = bb.locator()
1139            for s in to_delete:
1140                # Don't delete the bufferblock if it's owned by many files. It'll be
1141                # deleted after all of its owners are flush()ed.
1142                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1143                    self.parent._my_block_manager().delete_bufferblock(s)
1144
1145        self.parent.notify(MOD, self.parent, self.name, (self, self))

Flush the current bufferblock to Keep.

:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.

@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
1147    @must_be_writable
1148    @synchronized
1149    def add_segment(self, blocks, pos, size):
1150        """Add a segment to the end of the file.
1151
1152        `pos` and `offset` reference a section of the stream described by
1153        `blocks` (a list of Range objects)
1154
1155        """
1156        self._add_segment(blocks, pos, size)

Add a segment to the end of the file.

pos and offset reference a section of the stream described by blocks (a list of Range objects)

@synchronized
def size(self):
1166    @synchronized
1167    def size(self):
1168        """Get the file size."""
1169        if self._segments:
1170            n = self._segments[-1]
1171            return n.range_start + n.range_size
1172        else:
1173            return 0

Get the file size.

@synchronized
def manifest_text( self, stream_name='.', portable_locators=False, normalize=False, only_committed=False):
1175    @synchronized
1176    def manifest_text(self, stream_name=".", portable_locators=False,
1177                      normalize=False, only_committed=False):
1178        buf = ""
1179        filestream = []
1180        for segment in self._segments:
1181            loc = segment.locator
1182            if self.parent._my_block_manager().is_bufferblock(loc):
1183                if only_committed:
1184                    continue
1185                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1186            if portable_locators:
1187                loc = KeepLocator(loc).stripped()
1188            filestream.append(streams.LocatorAndRange(
1189                loc,
1190                KeepLocator(loc).size,
1191                segment.segment_offset,
1192                segment.range_size,
1193            ))
1194        buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream}))
1195        buf += "\n"
1196        return buf
fuse_entry
class ArvadosFileReader(ArvadosFileReaderBase):
1209class ArvadosFileReader(ArvadosFileReaderBase):
1210    """Wraps ArvadosFile in a file-like object supporting reading only.
1211
1212    Be aware that this class is NOT thread safe as there is no locking around
1213    updating file pointer.
1214
1215    """
1216
1217    def __init__(self, arvadosfile, mode="r", num_retries=None):
1218        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1219        self.arvadosfile = arvadosfile
1220
1221    def size(self):
1222        return self.arvadosfile.size()
1223
1224    def stream_name(self):
1225        return self.arvadosfile.parent.stream_name()
1226
1227    def readinto(self, b):
1228        data = self.read(len(b))
1229        b[:len(data)] = data
1230        return len(data)
1231
1232    @_FileLikeObjectBase._before_close
1233    @retry_method
1234    def read(self, size=-1, num_retries=None, return_memoryview=False):
1235        """Read up to `size` bytes from the file and return the result.
1236
1237        Starts at the current file position.  If `size` is negative or None,
1238        read the entire remainder of the file.
1239
1240        Returns None if the file pointer is at the end of the file.
1241
1242        Returns a `bytes` object, unless `return_memoryview` is True,
1243        in which case it returns a memory view, which may avoid an
1244        unnecessary data copy in some situations.
1245
1246        """
1247        if size < 0 or size is None:
1248            data = []
1249            #
1250            # specify exact=False, return_memoryview=True here so that we
1251            # only copy data once into the final buffer.
1252            #
1253            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1254            while rd:
1255                data.append(rd)
1256                self._filepos += len(rd)
1257                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1258            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1259        else:
1260            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1261            self._filepos += len(data)
1262            return data
1263
1264    @_FileLikeObjectBase._before_close
1265    @retry_method
1266    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1267        """Read up to `size` bytes from the stream, starting at the specified file offset.
1268
1269        This method does not change the file position.
1270
1271        Returns a `bytes` object, unless `return_memoryview` is True,
1272        in which case it returns a memory view, which may avoid an
1273        unnecessary data copy in some situations.
1274
1275        """
1276        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1277
1278    def flush(self):
1279        pass

Wraps ArvadosFile in a file-like object supporting reading only.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileReader(arvadosfile, mode='r', num_retries=None)
1217    def __init__(self, arvadosfile, mode="r", num_retries=None):
1218        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1219        self.arvadosfile = arvadosfile
arvadosfile
def size(self):
1221    def size(self):
1222        return self.arvadosfile.size()
def stream_name(self):
1224    def stream_name(self):
1225        return self.arvadosfile.parent.stream_name()
def readinto(self, b):
1227    def readinto(self, b):
1228        data = self.read(len(b))
1229        b[:len(data)] = data
1230        return len(data)
@retry_method
def read(self, size=-1, num_retries=None, return_memoryview=False):
1232    @_FileLikeObjectBase._before_close
1233    @retry_method
1234    def read(self, size=-1, num_retries=None, return_memoryview=False):
1235        """Read up to `size` bytes from the file and return the result.
1236
1237        Starts at the current file position.  If `size` is negative or None,
1238        read the entire remainder of the file.
1239
1240        Returns None if the file pointer is at the end of the file.
1241
1242        Returns a `bytes` object, unless `return_memoryview` is True,
1243        in which case it returns a memory view, which may avoid an
1244        unnecessary data copy in some situations.
1245
1246        """
1247        if size < 0 or size is None:
1248            data = []
1249            #
1250            # specify exact=False, return_memoryview=True here so that we
1251            # only copy data once into the final buffer.
1252            #
1253            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1254            while rd:
1255                data.append(rd)
1256                self._filepos += len(rd)
1257                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1258            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1259        else:
1260            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1261            self._filepos += len(data)
1262            return data

Read up to size bytes from the file and return the result.

Starts at the current file position. If size is negative or None, read the entire remainder of the file.

Returns None if the file pointer is at the end of the file.

Returns a bytes object, unless return_memoryview is True, in which case it returns a memory view, which may avoid an unnecessary data copy in some situations.

@retry_method
def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1264    @_FileLikeObjectBase._before_close
1265    @retry_method
1266    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1267        """Read up to `size` bytes from the stream, starting at the specified file offset.
1268
1269        This method does not change the file position.
1270
1271        Returns a `bytes` object, unless `return_memoryview` is True,
1272        in which case it returns a memory view, which may avoid an
1273        unnecessary data copy in some situations.
1274
1275        """
1276        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)

Read up to size bytes from the stream, starting at the specified file offset.

This method does not change the file position.

Returns a bytes object, unless return_memoryview is True, in which case it returns a memory view, which may avoid an unnecessary data copy in some situations.

def flush(self):
1278    def flush(self):
1279        pass
class ArvadosFileWriter(ArvadosFileReader):
1282class ArvadosFileWriter(ArvadosFileReader):
1283    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1284
1285    Be aware that this class is NOT thread safe as there is no locking around
1286    updating file pointer.
1287
1288    """
1289
1290    def __init__(self, arvadosfile, mode, num_retries=None):
1291        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1292        self.arvadosfile.add_writer(self)
1293
1294    def writable(self):
1295        return True
1296
1297    @_FileLikeObjectBase._before_close
1298    @retry_method
1299    def write(self, data, num_retries=None):
1300        if self.mode[0] == "a":
1301            self._filepos = self.size()
1302        self.arvadosfile.writeto(self._filepos, data, num_retries)
1303        self._filepos += len(data)
1304        return len(data)
1305
1306    @_FileLikeObjectBase._before_close
1307    @retry_method
1308    def writelines(self, seq, num_retries=None):
1309        for s in seq:
1310            self.write(s, num_retries=num_retries)
1311
1312    @_FileLikeObjectBase._before_close
1313    def truncate(self, size=None):
1314        if size is None:
1315            size = self._filepos
1316        self.arvadosfile.truncate(size)
1317
1318    @_FileLikeObjectBase._before_close
1319    def flush(self):
1320        self.arvadosfile.flush()
1321
1322    def close(self, flush=True):
1323        if not self.closed:
1324            self.arvadosfile.remove_writer(self, flush)
1325            super(ArvadosFileWriter, self).close()

Wraps ArvadosFile in a file-like object supporting both reading and writing.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileWriter(arvadosfile, mode, num_retries=None)
1290    def __init__(self, arvadosfile, mode, num_retries=None):
1291        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1292        self.arvadosfile.add_writer(self)
def writable(self):
1294    def writable(self):
1295        return True
@retry_method
def write(self, data, num_retries=None):
1297    @_FileLikeObjectBase._before_close
1298    @retry_method
1299    def write(self, data, num_retries=None):
1300        if self.mode[0] == "a":
1301            self._filepos = self.size()
1302        self.arvadosfile.writeto(self._filepos, data, num_retries)
1303        self._filepos += len(data)
1304        return len(data)
@retry_method
def writelines(self, seq, num_retries=None):
1306    @_FileLikeObjectBase._before_close
1307    @retry_method
1308    def writelines(self, seq, num_retries=None):
1309        for s in seq:
1310            self.write(s, num_retries=num_retries)
def truncate(self, size=None):
1312    @_FileLikeObjectBase._before_close
1313    def truncate(self, size=None):
1314        if size is None:
1315            size = self._filepos
1316        self.arvadosfile.truncate(size)
def flush(self):
1318    @_FileLikeObjectBase._before_close
1319    def flush(self):
1320        self.arvadosfile.flush()
def close(self, flush=True):
1322    def close(self, flush=True):
1323        if not self.closed:
1324            self.arvadosfile.remove_writer(self, flush)
1325            super(ArvadosFileWriter, self).close()
class WrappableFile:
1328class WrappableFile(object):
1329    """An interface to an Arvados file that's compatible with io wrappers.
1330
1331    """
1332    def __init__(self, f):
1333        self.f = f
1334        self.closed = False
1335    def close(self):
1336        self.closed = True
1337        return self.f.close()
1338    def flush(self):
1339        return self.f.flush()
1340    def read(self, *args, **kwargs):
1341        return self.f.read(*args, **kwargs)
1342    def readable(self):
1343        return self.f.readable()
1344    def readinto(self, *args, **kwargs):
1345        return self.f.readinto(*args, **kwargs)
1346    def seek(self, *args, **kwargs):
1347        return self.f.seek(*args, **kwargs)
1348    def seekable(self):
1349        return self.f.seekable()
1350    def tell(self):
1351        return self.f.tell()
1352    def writable(self):
1353        return self.f.writable()
1354    def write(self, *args, **kwargs):
1355        return self.f.write(*args, **kwargs)

An interface to an Arvados file that’s compatible with io wrappers.

WrappableFile(f)
1332    def __init__(self, f):
1333        self.f = f
1334        self.closed = False
f
closed
def close(self):
1335    def close(self):
1336        self.closed = True
1337        return self.f.close()
def flush(self):
1338    def flush(self):
1339        return self.f.flush()
def read(self, *args, **kwargs):
1340    def read(self, *args, **kwargs):
1341        return self.f.read(*args, **kwargs)
def readable(self):
1342    def readable(self):
1343        return self.f.readable()
def readinto(self, *args, **kwargs):
1344    def readinto(self, *args, **kwargs):
1345        return self.f.readinto(*args, **kwargs)
def seek(self, *args, **kwargs):
1346    def seek(self, *args, **kwargs):
1347        return self.f.seek(*args, **kwargs)
def seekable(self):
1348    def seekable(self):
1349        return self.f.seekable()
def tell(self):
1350    def tell(self):
1351        return self.f.tell()
def writable(self):
1352    def writable(self):
1353        return self.f.writable()
def write(self, *args, **kwargs):
1354    def write(self, *args, **kwargs):
1355        return self.f.write(*args, **kwargs)