arvados.arvfile

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5from __future__ import absolute_import
   6from __future__ import division
   7from future import standard_library
   8from future.utils import listitems, listvalues
   9standard_library.install_aliases()
  10from builtins import range
  11from builtins import object
  12import bz2
  13import collections
  14import copy
  15import errno
  16import functools
  17import hashlib
  18import logging
  19import os
  20import queue
  21import re
  22import sys
  23import threading
  24import uuid
  25import zlib
  26
  27from . import config
  28from .errors import KeepWriteError, AssertionError, ArgumentError
  29from .keep import KeepLocator
  30from ._normalize_stream import normalize_stream
  31from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
  32from .retry import retry_method
  33
  34MOD = "mod"
  35WRITE = "write"
  36
  37_logger = logging.getLogger('arvados.arvfile')
  38
  39def split(path):
  40    """split(path) -> streamname, filename
  41
  42    Separate the stream name and file name in a /-separated stream path and
  43    return a tuple (stream_name, file_name).  If no stream name is available,
  44    assume '.'.
  45
  46    """
  47    try:
  48        stream_name, file_name = path.rsplit('/', 1)
  49    except ValueError:  # No / in string
  50        stream_name, file_name = '.', path
  51    return stream_name, file_name
  52
  53
  54class UnownedBlockError(Exception):
  55    """Raised when there's an writable block without an owner on the BlockManager."""
  56    pass
  57
  58
  59class _FileLikeObjectBase(object):
  60    def __init__(self, name, mode):
  61        self.name = name
  62        self.mode = mode
  63        self.closed = False
  64
  65    @staticmethod
  66    def _before_close(orig_func):
  67        @functools.wraps(orig_func)
  68        def before_close_wrapper(self, *args, **kwargs):
  69            if self.closed:
  70                raise ValueError("I/O operation on closed stream file")
  71            return orig_func(self, *args, **kwargs)
  72        return before_close_wrapper
  73
  74    def __enter__(self):
  75        return self
  76
  77    def __exit__(self, exc_type, exc_value, traceback):
  78        try:
  79            self.close()
  80        except Exception:
  81            if exc_type is None:
  82                raise
  83
  84    def close(self):
  85        self.closed = True
  86
  87
  88class ArvadosFileReaderBase(_FileLikeObjectBase):
  89    def __init__(self, name, mode, num_retries=None):
  90        super(ArvadosFileReaderBase, self).__init__(name, mode)
  91        self._filepos = 0
  92        self.num_retries = num_retries
  93        self._readline_cache = (None, None)
  94
  95    def __iter__(self):
  96        while True:
  97            data = self.readline()
  98            if not data:
  99                break
 100            yield data
 101
 102    def decompressed_name(self):
 103        return re.sub(r'\.(bz2|gz)$', '', self.name)
 104
 105    @_FileLikeObjectBase._before_close
 106    def seek(self, pos, whence=os.SEEK_SET):
 107        if whence == os.SEEK_CUR:
 108            pos += self._filepos
 109        elif whence == os.SEEK_END:
 110            pos += self.size()
 111        if pos < 0:
 112            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
 113        self._filepos = pos
 114        return self._filepos
 115
 116    def tell(self):
 117        return self._filepos
 118
 119    def readable(self):
 120        return True
 121
 122    def writable(self):
 123        return False
 124
 125    def seekable(self):
 126        return True
 127
 128    @_FileLikeObjectBase._before_close
 129    @retry_method
 130    def readall(self, size=2**20, num_retries=None):
 131        while True:
 132            data = self.read(size, num_retries=num_retries)
 133            if len(data) == 0:
 134                break
 135            yield data
 136
 137    @_FileLikeObjectBase._before_close
 138    @retry_method
 139    def readline(self, size=float('inf'), num_retries=None):
 140        cache_pos, cache_data = self._readline_cache
 141        if self.tell() == cache_pos:
 142            data = [cache_data]
 143            self._filepos += len(cache_data)
 144        else:
 145            data = [b'']
 146        data_size = len(data[-1])
 147        while (data_size < size) and (b'\n' not in data[-1]):
 148            next_read = self.read(2 ** 20, num_retries=num_retries)
 149            if not next_read:
 150                break
 151            data.append(next_read)
 152            data_size += len(next_read)
 153        data = b''.join(data)
 154        try:
 155            nextline_index = data.index(b'\n') + 1
 156        except ValueError:
 157            nextline_index = len(data)
 158        nextline_index = min(nextline_index, size)
 159        self._filepos -= len(data) - nextline_index
 160        self._readline_cache = (self.tell(), data[nextline_index:])
 161        return data[:nextline_index].decode()
 162
 163    @_FileLikeObjectBase._before_close
 164    @retry_method
 165    def decompress(self, decompress, size, num_retries=None):
 166        for segment in self.readall(size, num_retries=num_retries):
 167            data = decompress(segment)
 168            if data:
 169                yield data
 170
 171    @_FileLikeObjectBase._before_close
 172    @retry_method
 173    def readall_decompressed(self, size=2**20, num_retries=None):
 174        self.seek(0)
 175        if self.name.endswith('.bz2'):
 176            dc = bz2.BZ2Decompressor()
 177            return self.decompress(dc.decompress, size,
 178                                   num_retries=num_retries)
 179        elif self.name.endswith('.gz'):
 180            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
 181            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
 182                                   size, num_retries=num_retries)
 183        else:
 184            return self.readall(size, num_retries=num_retries)
 185
 186    @_FileLikeObjectBase._before_close
 187    @retry_method
 188    def readlines(self, sizehint=float('inf'), num_retries=None):
 189        data = []
 190        data_size = 0
 191        for s in self.readall(num_retries=num_retries):
 192            data.append(s)
 193            data_size += len(s)
 194            if data_size >= sizehint:
 195                break
 196        return b''.join(data).decode().splitlines(True)
 197
 198    def size(self):
 199        raise IOError(errno.ENOSYS, "Not implemented")
 200
 201    def read(self, size, num_retries=None):
 202        raise IOError(errno.ENOSYS, "Not implemented")
 203
 204    def readfrom(self, start, size, num_retries=None):
 205        raise IOError(errno.ENOSYS, "Not implemented")
 206
 207
 208class StreamFileReader(ArvadosFileReaderBase):
 209    class _NameAttribute(str):
 210        # The Python file API provides a plain .name attribute.
 211        # Older SDK provided a name() method.
 212        # This class provides both, for maximum compatibility.
 213        def __call__(self):
 214            return self
 215
 216    def __init__(self, stream, segments, name):
 217        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
 218        self._stream = stream
 219        self.segments = segments
 220
 221    def stream_name(self):
 222        return self._stream.name()
 223
 224    def size(self):
 225        n = self.segments[-1]
 226        return n.range_start + n.range_size
 227
 228    @_FileLikeObjectBase._before_close
 229    @retry_method
 230    def read(self, size, num_retries=None):
 231        """Read up to 'size' bytes from the stream, starting at the current file position"""
 232        if size == 0:
 233            return b''
 234
 235        data = b''
 236        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
 237        if available_chunks:
 238            lr = available_chunks[0]
 239            data = self._stream.readfrom(lr.locator+lr.segment_offset,
 240                                         lr.segment_size,
 241                                         num_retries=num_retries)
 242
 243        self._filepos += len(data)
 244        return data
 245
 246    @_FileLikeObjectBase._before_close
 247    @retry_method
 248    def readfrom(self, start, size, num_retries=None):
 249        """Read up to 'size' bytes from the stream, starting at 'start'"""
 250        if size == 0:
 251            return b''
 252
 253        data = []
 254        for lr in locators_and_ranges(self.segments, start, size):
 255            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
 256                                              num_retries=num_retries))
 257        return b''.join(data)
 258
 259    def as_manifest(self):
 260        segs = []
 261        for r in self.segments:
 262            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
 263        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
 264
 265
 266def synchronized(orig_func):
 267    @functools.wraps(orig_func)
 268    def synchronized_wrapper(self, *args, **kwargs):
 269        with self.lock:
 270            return orig_func(self, *args, **kwargs)
 271    return synchronized_wrapper
 272
 273
 274class StateChangeError(Exception):
 275    def __init__(self, message, state, nextstate):
 276        super(StateChangeError, self).__init__(message)
 277        self.state = state
 278        self.nextstate = nextstate
 279
 280class _BufferBlock(object):
 281    """A stand-in for a Keep block that is in the process of being written.
 282
 283    Writers can append to it, get the size, and compute the Keep locator.
 284    There are three valid states:
 285
 286    WRITABLE
 287      Can append to block.
 288
 289    PENDING
 290      Block is in the process of being uploaded to Keep, append is an error.
 291
 292    COMMITTED
 293      The block has been written to Keep, its internal buffer has been
 294      released, fetching the block will fetch it via keep client (since we
 295      discarded the internal copy), and identifiers referring to the BufferBlock
 296      can be replaced with the block locator.
 297
 298    """
 299
 300    WRITABLE = 0
 301    PENDING = 1
 302    COMMITTED = 2
 303    ERROR = 3
 304    DELETED = 4
 305
 306    def __init__(self, blockid, starting_capacity, owner):
 307        """
 308        :blockid:
 309          the identifier for this block
 310
 311        :starting_capacity:
 312          the initial buffer capacity
 313
 314        :owner:
 315          ArvadosFile that owns this block
 316
 317        """
 318        self.blockid = blockid
 319        self.buffer_block = bytearray(starting_capacity)
 320        self.buffer_view = memoryview(self.buffer_block)
 321        self.write_pointer = 0
 322        self._state = _BufferBlock.WRITABLE
 323        self._locator = None
 324        self.owner = owner
 325        self.lock = threading.Lock()
 326        self.wait_for_commit = threading.Event()
 327        self.error = None
 328
 329    @synchronized
 330    def append(self, data):
 331        """Append some data to the buffer.
 332
 333        Only valid if the block is in WRITABLE state.  Implements an expanding
 334        buffer, doubling capacity as needed to accomdate all the data.
 335
 336        """
 337        if self._state == _BufferBlock.WRITABLE:
 338            if not isinstance(data, bytes) and not isinstance(data, memoryview):
 339                data = data.encode()
 340            while (self.write_pointer+len(data)) > len(self.buffer_block):
 341                new_buffer_block = bytearray(len(self.buffer_block) * 2)
 342                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
 343                self.buffer_block = new_buffer_block
 344                self.buffer_view = memoryview(self.buffer_block)
 345            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
 346            self.write_pointer += len(data)
 347            self._locator = None
 348        else:
 349            raise AssertionError("Buffer block is not writable")
 350
 351    STATE_TRANSITIONS = frozenset([
 352            (WRITABLE, PENDING),
 353            (PENDING, COMMITTED),
 354            (PENDING, ERROR),
 355            (ERROR, PENDING)])
 356
 357    @synchronized
 358    def set_state(self, nextstate, val=None):
 359        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
 360            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
 361        self._state = nextstate
 362
 363        if self._state == _BufferBlock.PENDING:
 364            self.wait_for_commit.clear()
 365
 366        if self._state == _BufferBlock.COMMITTED:
 367            self._locator = val
 368            self.buffer_view = None
 369            self.buffer_block = None
 370            self.wait_for_commit.set()
 371
 372        if self._state == _BufferBlock.ERROR:
 373            self.error = val
 374            self.wait_for_commit.set()
 375
 376    @synchronized
 377    def state(self):
 378        return self._state
 379
 380    def size(self):
 381        """The amount of data written to the buffer."""
 382        return self.write_pointer
 383
 384    @synchronized
 385    def locator(self):
 386        """The Keep locator for this buffer's contents."""
 387        if self._locator is None:
 388            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
 389        return self._locator
 390
 391    @synchronized
 392    def clone(self, new_blockid, owner):
 393        if self._state == _BufferBlock.COMMITTED:
 394            raise AssertionError("Cannot duplicate committed buffer block")
 395        bufferblock = _BufferBlock(new_blockid, self.size(), owner)
 396        bufferblock.append(self.buffer_view[0:self.size()])
 397        return bufferblock
 398
 399    @synchronized
 400    def clear(self):
 401        self._state = _BufferBlock.DELETED
 402        self.owner = None
 403        self.buffer_block = None
 404        self.buffer_view = None
 405
 406    @synchronized
 407    def repack_writes(self):
 408        """Optimize buffer block by repacking segments in file sequence.
 409
 410        When the client makes random writes, they appear in the buffer block in
 411        the sequence they were written rather than the sequence they appear in
 412        the file.  This makes for inefficient, fragmented manifests.  Attempt
 413        to optimize by repacking writes in file sequence.
 414
 415        """
 416        if self._state != _BufferBlock.WRITABLE:
 417            raise AssertionError("Cannot repack non-writable block")
 418
 419        segs = self.owner.segments()
 420
 421        # Collect the segments that reference the buffer block.
 422        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
 423
 424        # Collect total data referenced by segments (could be smaller than
 425        # bufferblock size if a portion of the file was written and
 426        # then overwritten).
 427        write_total = sum([s.range_size for s in bufferblock_segs])
 428
 429        if write_total < self.size() or len(bufferblock_segs) > 1:
 430            # If there's more than one segment referencing this block, it is
 431            # due to out-of-order writes and will produce a fragmented
 432            # manifest, so try to optimize by re-packing into a new buffer.
 433            contents = self.buffer_view[0:self.write_pointer].tobytes()
 434            new_bb = _BufferBlock(None, write_total, None)
 435            for t in bufferblock_segs:
 436                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
 437                t.segment_offset = new_bb.size() - t.range_size
 438
 439            self.buffer_block = new_bb.buffer_block
 440            self.buffer_view = new_bb.buffer_view
 441            self.write_pointer = new_bb.write_pointer
 442            self._locator = None
 443            new_bb.clear()
 444            self.owner.set_segments(segs)
 445
 446    def __repr__(self):
 447        return "<BufferBlock %s>" % (self.blockid)
 448
 449
 450class NoopLock(object):
 451    def __enter__(self):
 452        return self
 453
 454    def __exit__(self, exc_type, exc_value, traceback):
 455        pass
 456
 457    def acquire(self, blocking=False):
 458        pass
 459
 460    def release(self):
 461        pass
 462
 463
 464def must_be_writable(orig_func):
 465    @functools.wraps(orig_func)
 466    def must_be_writable_wrapper(self, *args, **kwargs):
 467        if not self.writable():
 468            raise IOError(errno.EROFS, "Collection is read-only.")
 469        return orig_func(self, *args, **kwargs)
 470    return must_be_writable_wrapper
 471
 472
 473class _BlockManager(object):
 474    """BlockManager handles buffer blocks.
 475
 476    Also handles background block uploads, and background block prefetch for a
 477    Collection of ArvadosFiles.
 478
 479    """
 480
 481    DEFAULT_PUT_THREADS = 2
 482
 483    def __init__(self, keep,
 484                 copies=None,
 485                 put_threads=None,
 486                 num_retries=None,
 487                 storage_classes_func=None):
 488        """keep: KeepClient object to use"""
 489        self._keep = keep
 490        self._bufferblocks = collections.OrderedDict()
 491        self._put_queue = None
 492        self._put_threads = None
 493        self.lock = threading.Lock()
 494        self.prefetch_lookahead = self._keep.num_prefetch_threads
 495        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
 496        self.copies = copies
 497        self.storage_classes = storage_classes_func or (lambda: [])
 498        self._pending_write_size = 0
 499        self.threads_lock = threading.Lock()
 500        self.padding_block = None
 501        self.num_retries = num_retries
 502
 503    @synchronized
 504    def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 505        """Allocate a new, empty bufferblock in WRITABLE state and return it.
 506
 507        :blockid:
 508          optional block identifier, otherwise one will be automatically assigned
 509
 510        :starting_capacity:
 511          optional capacity, otherwise will use default capacity
 512
 513        :owner:
 514          ArvadosFile that owns this block
 515
 516        """
 517        return self._alloc_bufferblock(blockid, starting_capacity, owner)
 518
 519    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 520        if blockid is None:
 521            blockid = str(uuid.uuid4())
 522        bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
 523        self._bufferblocks[bufferblock.blockid] = bufferblock
 524        return bufferblock
 525
 526    @synchronized
 527    def dup_block(self, block, owner):
 528        """Create a new bufferblock initialized with the content of an existing bufferblock.
 529
 530        :block:
 531          the buffer block to copy.
 532
 533        :owner:
 534          ArvadosFile that owns the new block
 535
 536        """
 537        new_blockid = str(uuid.uuid4())
 538        bufferblock = block.clone(new_blockid, owner)
 539        self._bufferblocks[bufferblock.blockid] = bufferblock
 540        return bufferblock
 541
 542    @synchronized
 543    def is_bufferblock(self, locator):
 544        return locator in self._bufferblocks
 545
 546    def _commit_bufferblock_worker(self):
 547        """Background uploader thread."""
 548
 549        while True:
 550            try:
 551                bufferblock = self._put_queue.get()
 552                if bufferblock is None:
 553                    return
 554
 555                if self.copies is None:
 556                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 557                else:
 558                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 559                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 560            except Exception as e:
 561                bufferblock.set_state(_BufferBlock.ERROR, e)
 562            finally:
 563                if self._put_queue is not None:
 564                    self._put_queue.task_done()
 565
 566    def start_put_threads(self):
 567        with self.threads_lock:
 568            if self._put_threads is None:
 569                # Start uploader threads.
 570
 571                # If we don't limit the Queue size, the upload queue can quickly
 572                # grow to take up gigabytes of RAM if the writing process is
 573                # generating data more quickly than it can be sent to the Keep
 574                # servers.
 575                #
 576                # With two upload threads and a queue size of 2, this means up to 4
 577                # blocks pending.  If they are full 64 MiB blocks, that means up to
 578                # 256 MiB of internal buffering, which is the same size as the
 579                # default download block cache in KeepClient.
 580                self._put_queue = queue.Queue(maxsize=2)
 581
 582                self._put_threads = []
 583                for i in range(0, self.num_put_threads):
 584                    thread = threading.Thread(target=self._commit_bufferblock_worker)
 585                    self._put_threads.append(thread)
 586                    thread.daemon = True
 587                    thread.start()
 588
 589    @synchronized
 590    def stop_threads(self):
 591        """Shut down and wait for background upload and download threads to finish."""
 592
 593        if self._put_threads is not None:
 594            for t in self._put_threads:
 595                self._put_queue.put(None)
 596            for t in self._put_threads:
 597                t.join()
 598        self._put_threads = None
 599        self._put_queue = None
 600
 601    def __enter__(self):
 602        return self
 603
 604    def __exit__(self, exc_type, exc_value, traceback):
 605        self.stop_threads()
 606
 607    @synchronized
 608    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
 609        """Packs small blocks together before uploading"""
 610
 611        self._pending_write_size += closed_file_size
 612
 613        # Check if there are enough small blocks for filling up one in full
 614        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
 615            return
 616
 617        # Search blocks ready for getting packed together before being
 618        # committed to Keep.
 619        # A WRITABLE block always has an owner.
 620        # A WRITABLE block with its owner.closed() implies that its
 621        # size is <= KEEP_BLOCK_SIZE/2.
 622        try:
 623            small_blocks = [b for b in listvalues(self._bufferblocks)
 624                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
 625        except AttributeError:
 626            # Writable blocks without owner shouldn't exist.
 627            raise UnownedBlockError()
 628
 629        if len(small_blocks) <= 1:
 630            # Not enough small blocks for repacking
 631            return
 632
 633        for bb in small_blocks:
 634            bb.repack_writes()
 635
 636        # Update the pending write size count with its true value, just in case
 637        # some small file was opened, written and closed several times.
 638        self._pending_write_size = sum([b.size() for b in small_blocks])
 639
 640        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
 641            return
 642
 643        new_bb = self._alloc_bufferblock()
 644        new_bb.owner = []
 645        files = []
 646        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
 647            bb = small_blocks.pop(0)
 648            new_bb.owner.append(bb.owner)
 649            self._pending_write_size -= bb.size()
 650            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
 651            files.append((bb, new_bb.write_pointer - bb.size()))
 652
 653        self.commit_bufferblock(new_bb, sync=sync)
 654
 655        for bb, new_bb_segment_offset in files:
 656            newsegs = bb.owner.segments()
 657            for s in newsegs:
 658                if s.locator == bb.blockid:
 659                    s.locator = new_bb.blockid
 660                    s.segment_offset = new_bb_segment_offset+s.segment_offset
 661            bb.owner.set_segments(newsegs)
 662            self._delete_bufferblock(bb.blockid)
 663
 664    def commit_bufferblock(self, block, sync):
 665        """Initiate a background upload of a bufferblock.
 666
 667        :block:
 668          The block object to upload
 669
 670        :sync:
 671          If `sync` is True, upload the block synchronously.
 672          If `sync` is False, upload the block asynchronously.  This will
 673          return immediately unless the upload queue is at capacity, in
 674          which case it will wait on an upload queue slot.
 675
 676        """
 677        try:
 678            # Mark the block as PENDING so to disallow any more appends.
 679            block.set_state(_BufferBlock.PENDING)
 680        except StateChangeError as e:
 681            if e.state == _BufferBlock.PENDING:
 682                if sync:
 683                    block.wait_for_commit.wait()
 684                else:
 685                    return
 686            if block.state() == _BufferBlock.COMMITTED:
 687                return
 688            elif block.state() == _BufferBlock.ERROR:
 689                raise block.error
 690            else:
 691                raise
 692
 693        if sync:
 694            try:
 695                if self.copies is None:
 696                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 697                else:
 698                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 699                block.set_state(_BufferBlock.COMMITTED, loc)
 700            except Exception as e:
 701                block.set_state(_BufferBlock.ERROR, e)
 702                raise
 703        else:
 704            self.start_put_threads()
 705            self._put_queue.put(block)
 706
 707    @synchronized
 708    def get_bufferblock(self, locator):
 709        return self._bufferblocks.get(locator)
 710
 711    @synchronized
 712    def get_padding_block(self):
 713        """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
 714        when using truncate() to extend the size of a file.
 715
 716        For reference (and possible future optimization), the md5sum of the
 717        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
 718
 719        """
 720
 721        if self.padding_block is None:
 722            self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
 723            self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
 724            self.commit_bufferblock(self.padding_block, False)
 725        return self.padding_block
 726
 727    @synchronized
 728    def delete_bufferblock(self, locator):
 729        self._delete_bufferblock(locator)
 730
 731    def _delete_bufferblock(self, locator):
 732        if locator in self._bufferblocks:
 733            bb = self._bufferblocks[locator]
 734            bb.clear()
 735            del self._bufferblocks[locator]
 736
 737    def get_block_contents(self, locator, num_retries, cache_only=False):
 738        """Fetch a block.
 739
 740        First checks to see if the locator is a BufferBlock and return that, if
 741        not, passes the request through to KeepClient.get().
 742
 743        """
 744        with self.lock:
 745            if locator in self._bufferblocks:
 746                bufferblock = self._bufferblocks[locator]
 747                if bufferblock.state() != _BufferBlock.COMMITTED:
 748                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
 749                else:
 750                    locator = bufferblock._locator
 751        if cache_only:
 752            return self._keep.get_from_cache(locator)
 753        else:
 754            return self._keep.get(locator, num_retries=num_retries)
 755
 756    def commit_all(self):
 757        """Commit all outstanding buffer blocks.
 758
 759        This is a synchronous call, and will not return until all buffer blocks
 760        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 761
 762        """
 763        self.repack_small_blocks(force=True, sync=True)
 764
 765        with self.lock:
 766            items = listitems(self._bufferblocks)
 767
 768        for k,v in items:
 769            if v.state() != _BufferBlock.COMMITTED and v.owner:
 770                # Ignore blocks with a list of owners, as if they're not in COMMITTED
 771                # state, they're already being committed asynchronously.
 772                if isinstance(v.owner, ArvadosFile):
 773                    v.owner.flush(sync=False)
 774
 775        with self.lock:
 776            if self._put_queue is not None:
 777                self._put_queue.join()
 778
 779                err = []
 780                for k,v in items:
 781                    if v.state() == _BufferBlock.ERROR:
 782                        err.append((v.locator(), v.error))
 783                if err:
 784                    raise KeepWriteError("Error writing some blocks", err, label="block")
 785
 786        for k,v in items:
 787            # flush again with sync=True to remove committed bufferblocks from
 788            # the segments.
 789            if v.owner:
 790                if isinstance(v.owner, ArvadosFile):
 791                    v.owner.flush(sync=True)
 792                elif isinstance(v.owner, list) and len(v.owner) > 0:
 793                    # This bufferblock is referenced by many files as a result
 794                    # of repacking small blocks, so don't delete it when flushing
 795                    # its owners, just do it after flushing them all.
 796                    for owner in v.owner:
 797                        owner.flush(sync=True)
 798                    self.delete_bufferblock(k)
 799
 800        self.stop_threads()
 801
 802    def block_prefetch(self, locator):
 803        """Initiate a background download of a block.
 804        """
 805
 806        if not self.prefetch_lookahead:
 807            return
 808
 809        with self.lock:
 810            if locator in self._bufferblocks:
 811                return
 812
 813        self._keep.block_prefetch(locator)
 814
 815
 816class ArvadosFile(object):
 817    """Represent a file in a Collection.
 818
 819    ArvadosFile manages the underlying representation of a file in Keep as a
 820    sequence of segments spanning a set of blocks, and implements random
 821    read/write access.
 822
 823    This object may be accessed from multiple threads.
 824
 825    """
 826
 827    __slots__ = ('parent', 'name', '_writers', '_committed',
 828                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 829
 830    def __init__(self, parent, name, stream=[], segments=[]):
 831        """
 832        ArvadosFile constructor.
 833
 834        :stream:
 835          a list of Range objects representing a block stream
 836
 837        :segments:
 838          a list of Range objects representing segments
 839        """
 840        self.parent = parent
 841        self.name = name
 842        self._writers = set()
 843        self._committed = False
 844        self._segments = []
 845        self.lock = parent.root_collection().lock
 846        for s in segments:
 847            self._add_segment(stream, s.locator, s.range_size)
 848        self._current_bblock = None
 849        self._read_counter = 0
 850
 851    def writable(self):
 852        return self.parent.writable()
 853
 854    @synchronized
 855    def permission_expired(self, as_of_dt=None):
 856        """Returns True if any of the segment's locators is expired"""
 857        for r in self._segments:
 858            if KeepLocator(r.locator).permission_expired(as_of_dt):
 859                return True
 860        return False
 861
 862    @synchronized
 863    def has_remote_blocks(self):
 864        """Returns True if any of the segment's locators has a +R signature"""
 865
 866        for s in self._segments:
 867            if '+R' in s.locator:
 868                return True
 869        return False
 870
 871    @synchronized
 872    def _copy_remote_blocks(self, remote_blocks={}):
 873        """Ask Keep to copy remote blocks and point to their local copies.
 874
 875        This is called from the parent Collection.
 876
 877        :remote_blocks:
 878            Shared cache of remote to local block mappings. This is used to avoid
 879            doing extra work when blocks are shared by more than one file in
 880            different subdirectories.
 881        """
 882
 883        for s in self._segments:
 884            if '+R' in s.locator:
 885                try:
 886                    loc = remote_blocks[s.locator]
 887                except KeyError:
 888                    loc = self.parent._my_keep().refresh_signature(s.locator)
 889                    remote_blocks[s.locator] = loc
 890                s.locator = loc
 891                self.parent.set_committed(False)
 892        return remote_blocks
 893
 894    @synchronized
 895    def segments(self):
 896        return copy.copy(self._segments)
 897
 898    @synchronized
 899    def clone(self, new_parent, new_name):
 900        """Make a copy of this file."""
 901        cp = ArvadosFile(new_parent, new_name)
 902        cp.replace_contents(self)
 903        return cp
 904
 905    @must_be_writable
 906    @synchronized
 907    def replace_contents(self, other):
 908        """Replace segments of this file with segments from another `ArvadosFile` object."""
 909
 910        map_loc = {}
 911        self._segments = []
 912        for other_segment in other.segments():
 913            new_loc = other_segment.locator
 914            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 915                if other_segment.locator not in map_loc:
 916                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 917                    if bufferblock.state() != _BufferBlock.WRITABLE:
 918                        map_loc[other_segment.locator] = bufferblock.locator()
 919                    else:
 920                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 921                new_loc = map_loc[other_segment.locator]
 922
 923            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 924
 925        self.set_committed(False)
 926
 927    def __eq__(self, other):
 928        if other is self:
 929            return True
 930        if not isinstance(other, ArvadosFile):
 931            return False
 932
 933        othersegs = other.segments()
 934        with self.lock:
 935            if len(self._segments) != len(othersegs):
 936                return False
 937            for i in range(0, len(othersegs)):
 938                seg1 = self._segments[i]
 939                seg2 = othersegs[i]
 940                loc1 = seg1.locator
 941                loc2 = seg2.locator
 942
 943                if self.parent._my_block_manager().is_bufferblock(loc1):
 944                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 945
 946                if other.parent._my_block_manager().is_bufferblock(loc2):
 947                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 948
 949                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 950                    seg1.range_start != seg2.range_start or
 951                    seg1.range_size != seg2.range_size or
 952                    seg1.segment_offset != seg2.segment_offset):
 953                    return False
 954
 955        return True
 956
 957    def __ne__(self, other):
 958        return not self.__eq__(other)
 959
 960    @synchronized
 961    def set_segments(self, segs):
 962        self._segments = segs
 963
 964    @synchronized
 965    def set_committed(self, value=True):
 966        """Set committed flag.
 967
 968        If value is True, set committed to be True.
 969
 970        If value is False, set committed to be False for this and all parents.
 971        """
 972        if value == self._committed:
 973            return
 974        self._committed = value
 975        if self._committed is False and self.parent is not None:
 976            self.parent.set_committed(False)
 977
 978    @synchronized
 979    def committed(self):
 980        """Get whether this is committed or not."""
 981        return self._committed
 982
 983    @synchronized
 984    def add_writer(self, writer):
 985        """Add an ArvadosFileWriter reference to the list of writers"""
 986        if isinstance(writer, ArvadosFileWriter):
 987            self._writers.add(writer)
 988
 989    @synchronized
 990    def remove_writer(self, writer, flush):
 991        """
 992        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 993        and do some block maintenance tasks.
 994        """
 995        self._writers.remove(writer)
 996
 997        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 998            # File writer closed, not small enough for repacking
 999            self.flush()
1000        elif self.closed():
1001            # All writers closed and size is adequate for repacking
1002            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1003
1004    def closed(self):
1005        """
1006        Get whether this is closed or not. When the writers list is empty, the file
1007        is supposed to be closed.
1008        """
1009        return len(self._writers) == 0
1010
1011    @must_be_writable
1012    @synchronized
1013    def truncate(self, size):
1014        """Shrink or expand the size of the file.
1015
1016        If `size` is less than the size of the file, the file contents after
1017        `size` will be discarded.  If `size` is greater than the current size
1018        of the file, it will be filled with zero bytes.
1019
1020        """
1021        if size < self.size():
1022            new_segs = []
1023            for r in self._segments:
1024                range_end = r.range_start+r.range_size
1025                if r.range_start >= size:
1026                    # segment is past the trucate size, all done
1027                    break
1028                elif size < range_end:
1029                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1030                    nr.segment_offset = r.segment_offset
1031                    new_segs.append(nr)
1032                    break
1033                else:
1034                    new_segs.append(r)
1035
1036            self._segments = new_segs
1037            self.set_committed(False)
1038        elif size > self.size():
1039            padding = self.parent._my_block_manager().get_padding_block()
1040            diff = size - self.size()
1041            while diff > config.KEEP_BLOCK_SIZE:
1042                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1043                diff -= config.KEEP_BLOCK_SIZE
1044            if diff > 0:
1045                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1046            self.set_committed(False)
1047        else:
1048            # size == self.size()
1049            pass
1050
1051    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
1052        """Read up to `size` bytes from the file starting at `offset`.
1053
1054        Arguments:
1055
1056        * exact: bool --- If False (default), return less data than
1057         requested if the read crosses a block boundary and the next
1058         block isn't cached.  If True, only return less data than
1059         requested when hitting EOF.
1060
1061        * return_memoryview: bool --- If False (default) return a
1062          `bytes` object, which may entail making a copy in some
1063          situations.  If True, return a `memoryview` object which may
1064          avoid making a copy, but may be incompatible with code
1065          expecting a `bytes` object.
1066
1067        """
1068
1069        with self.lock:
1070            if size == 0 or offset >= self.size():
1071                return memoryview(b'') if return_memoryview else b''
1072            readsegs = locators_and_ranges(self._segments, offset, size)
1073
1074            prefetch = None
1075            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1076            if prefetch_lookahead:
1077                # Doing prefetch on every read() call is surprisingly expensive
1078                # when we're trying to deliver data at 600+ MiBps and want
1079                # the read() fast path to be as lightweight as possible.
1080                #
1081                # Only prefetching every 128 read operations
1082                # dramatically reduces the overhead while still
1083                # getting the benefit of prefetching (e.g. when
1084                # reading 128 KiB at a time, it checks for prefetch
1085                # every 16 MiB).
1086                self._read_counter = (self._read_counter+1) % 128
1087                if self._read_counter == 1:
1088                    prefetch = locators_and_ranges(self._segments,
1089                                                   offset + size,
1090                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1091                                                   limit=(1+prefetch_lookahead))
1092
1093        locs = set()
1094        data = []
1095        for lr in readsegs:
1096            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1097            if block:
1098                blockview = memoryview(block)
1099                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1100                locs.add(lr.locator)
1101            else:
1102                break
1103
1104        if prefetch:
1105            for lr in prefetch:
1106                if lr.locator not in locs:
1107                    self.parent._my_block_manager().block_prefetch(lr.locator)
1108                    locs.add(lr.locator)
1109
1110        if len(data) == 1:
1111            return data[0] if return_memoryview else data[0].tobytes()
1112        else:
1113            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1114
1115
1116    @must_be_writable
1117    @synchronized
1118    def writeto(self, offset, data, num_retries):
1119        """Write `data` to the file starting at `offset`.
1120
1121        This will update existing bytes and/or extend the size of the file as
1122        necessary.
1123
1124        """
1125        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1126            data = data.encode()
1127        if len(data) == 0:
1128            return
1129
1130        if offset > self.size():
1131            self.truncate(offset)
1132
1133        if len(data) > config.KEEP_BLOCK_SIZE:
1134            # Chunk it up into smaller writes
1135            n = 0
1136            dataview = memoryview(data)
1137            while n < len(data):
1138                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1139                n += config.KEEP_BLOCK_SIZE
1140            return
1141
1142        self.set_committed(False)
1143
1144        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1145            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1146
1147        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1148            self._current_bblock.repack_writes()
1149            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1150                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1151                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1152
1153        self._current_bblock.append(data)
1154
1155        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1156
1157        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1158
1159        return len(data)
1160
1161    @synchronized
1162    def flush(self, sync=True, num_retries=0):
1163        """Flush the current bufferblock to Keep.
1164
1165        :sync:
1166          If True, commit block synchronously, wait until buffer block has been written.
1167          If False, commit block asynchronously, return immediately after putting block into
1168          the keep put queue.
1169        """
1170        if self.committed():
1171            return
1172
1173        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1174            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1175                self._current_bblock.repack_writes()
1176            if self._current_bblock.state() != _BufferBlock.DELETED:
1177                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1178
1179        if sync:
1180            to_delete = set()
1181            for s in self._segments:
1182                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1183                if bb:
1184                    if bb.state() != _BufferBlock.COMMITTED:
1185                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1186                    to_delete.add(s.locator)
1187                    s.locator = bb.locator()
1188            for s in to_delete:
1189                # Don't delete the bufferblock if it's owned by many files. It'll be
1190                # deleted after all of its owners are flush()ed.
1191                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1192                    self.parent._my_block_manager().delete_bufferblock(s)
1193
1194        self.parent.notify(MOD, self.parent, self.name, (self, self))
1195
1196    @must_be_writable
1197    @synchronized
1198    def add_segment(self, blocks, pos, size):
1199        """Add a segment to the end of the file.
1200
1201        `pos` and `offset` reference a section of the stream described by
1202        `blocks` (a list of Range objects)
1203
1204        """
1205        self._add_segment(blocks, pos, size)
1206
1207    def _add_segment(self, blocks, pos, size):
1208        """Internal implementation of add_segment."""
1209        self.set_committed(False)
1210        for lr in locators_and_ranges(blocks, pos, size):
1211            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1212            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1213            self._segments.append(r)
1214
1215    @synchronized
1216    def size(self):
1217        """Get the file size."""
1218        if self._segments:
1219            n = self._segments[-1]
1220            return n.range_start + n.range_size
1221        else:
1222            return 0
1223
1224    @synchronized
1225    def manifest_text(self, stream_name=".", portable_locators=False,
1226                      normalize=False, only_committed=False):
1227        buf = ""
1228        filestream = []
1229        for segment in self._segments:
1230            loc = segment.locator
1231            if self.parent._my_block_manager().is_bufferblock(loc):
1232                if only_committed:
1233                    continue
1234                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1235            if portable_locators:
1236                loc = KeepLocator(loc).stripped()
1237            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1238                                 segment.segment_offset, segment.range_size))
1239        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1240        buf += "\n"
1241        return buf
1242
1243    @must_be_writable
1244    @synchronized
1245    def _reparent(self, newparent, newname):
1246        self.set_committed(False)
1247        self.flush(sync=True)
1248        self.parent.remove(self.name)
1249        self.parent = newparent
1250        self.name = newname
1251        self.lock = self.parent.root_collection().lock
1252
1253
1254class ArvadosFileReader(ArvadosFileReaderBase):
1255    """Wraps ArvadosFile in a file-like object supporting reading only.
1256
1257    Be aware that this class is NOT thread safe as there is no locking around
1258    updating file pointer.
1259
1260    """
1261
1262    def __init__(self, arvadosfile, mode="r", num_retries=None):
1263        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1264        self.arvadosfile = arvadosfile
1265
1266    def size(self):
1267        return self.arvadosfile.size()
1268
1269    def stream_name(self):
1270        return self.arvadosfile.parent.stream_name()
1271
1272    def readinto(self, b):
1273        data = self.read(len(b))
1274        b[:len(data)] = data
1275        return len(data)
1276
1277    @_FileLikeObjectBase._before_close
1278    @retry_method
1279    def read(self, size=-1, num_retries=None, return_memoryview=False):
1280        """Read up to `size` bytes from the file and return the result.
1281
1282        Starts at the current file position.  If `size` is negative or None,
1283        read the entire remainder of the file.
1284
1285        Returns None if the file pointer is at the end of the file.
1286
1287        Returns a `bytes` object, unless `return_memoryview` is True,
1288        in which case it returns a memory view, which may avoid an
1289        unnecessary data copy in some situations.
1290
1291        """
1292        if size < 0 or size is None:
1293            data = []
1294            #
1295            # specify exact=False, return_memoryview=True here so that we
1296            # only copy data once into the final buffer.
1297            #
1298            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1299            while rd:
1300                data.append(rd)
1301                self._filepos += len(rd)
1302                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1303            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1304        else:
1305            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1306            self._filepos += len(data)
1307            return data
1308
1309    @_FileLikeObjectBase._before_close
1310    @retry_method
1311    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1312        """Read up to `size` bytes from the stream, starting at the specified file offset.
1313
1314        This method does not change the file position.
1315
1316        Returns a `bytes` object, unless `return_memoryview` is True,
1317        in which case it returns a memory view, which may avoid an
1318        unnecessary data copy in some situations.
1319
1320        """
1321        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1322
1323    def flush(self):
1324        pass
1325
1326
1327class ArvadosFileWriter(ArvadosFileReader):
1328    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1329
1330    Be aware that this class is NOT thread safe as there is no locking around
1331    updating file pointer.
1332
1333    """
1334
1335    def __init__(self, arvadosfile, mode, num_retries=None):
1336        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1337        self.arvadosfile.add_writer(self)
1338
1339    def writable(self):
1340        return True
1341
1342    @_FileLikeObjectBase._before_close
1343    @retry_method
1344    def write(self, data, num_retries=None):
1345        if self.mode[0] == "a":
1346            self._filepos = self.size()
1347        self.arvadosfile.writeto(self._filepos, data, num_retries)
1348        self._filepos += len(data)
1349        return len(data)
1350
1351    @_FileLikeObjectBase._before_close
1352    @retry_method
1353    def writelines(self, seq, num_retries=None):
1354        for s in seq:
1355            self.write(s, num_retries=num_retries)
1356
1357    @_FileLikeObjectBase._before_close
1358    def truncate(self, size=None):
1359        if size is None:
1360            size = self._filepos
1361        self.arvadosfile.truncate(size)
1362
1363    @_FileLikeObjectBase._before_close
1364    def flush(self):
1365        self.arvadosfile.flush()
1366
1367    def close(self, flush=True):
1368        if not self.closed:
1369            self.arvadosfile.remove_writer(self, flush)
1370            super(ArvadosFileWriter, self).close()
1371
1372
1373class WrappableFile(object):
1374    """An interface to an Arvados file that's compatible with io wrappers.
1375
1376    """
1377    def __init__(self, f):
1378        self.f = f
1379        self.closed = False
1380    def close(self):
1381        self.closed = True
1382        return self.f.close()
1383    def flush(self):
1384        return self.f.flush()
1385    def read(self, *args, **kwargs):
1386        return self.f.read(*args, **kwargs)
1387    def readable(self):
1388        return self.f.readable()
1389    def readinto(self, *args, **kwargs):
1390        return self.f.readinto(*args, **kwargs)
1391    def seek(self, *args, **kwargs):
1392        return self.f.seek(*args, **kwargs)
1393    def seekable(self):
1394        return self.f.seekable()
1395    def tell(self):
1396        return self.f.tell()
1397    def writable(self):
1398        return self.f.writable()
1399    def write(self, *args, **kwargs):
1400        return self.f.write(*args, **kwargs)
MOD = 'mod'
WRITE = 'write'
def split(path):
40def split(path):
41    """split(path) -> streamname, filename
42
43    Separate the stream name and file name in a /-separated stream path and
44    return a tuple (stream_name, file_name).  If no stream name is available,
45    assume '.'.
46
47    """
48    try:
49        stream_name, file_name = path.rsplit('/', 1)
50    except ValueError:  # No / in string
51        stream_name, file_name = '.', path
52    return stream_name, file_name

split(path) -> streamname, filename

Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.

class UnownedBlockError(builtins.Exception):
55class UnownedBlockError(Exception):
56    """Raised when there's an writable block without an owner on the BlockManager."""
57    pass

Raised when there’s an writable block without an owner on the BlockManager.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvadosFileReaderBase(_FileLikeObjectBase):
 89class ArvadosFileReaderBase(_FileLikeObjectBase):
 90    def __init__(self, name, mode, num_retries=None):
 91        super(ArvadosFileReaderBase, self).__init__(name, mode)
 92        self._filepos = 0
 93        self.num_retries = num_retries
 94        self._readline_cache = (None, None)
 95
 96    def __iter__(self):
 97        while True:
 98            data = self.readline()
 99            if not data:
100                break
101            yield data
102
103    def decompressed_name(self):
104        return re.sub(r'\.(bz2|gz)$', '', self.name)
105
106    @_FileLikeObjectBase._before_close
107    def seek(self, pos, whence=os.SEEK_SET):
108        if whence == os.SEEK_CUR:
109            pos += self._filepos
110        elif whence == os.SEEK_END:
111            pos += self.size()
112        if pos < 0:
113            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
114        self._filepos = pos
115        return self._filepos
116
117    def tell(self):
118        return self._filepos
119
120    def readable(self):
121        return True
122
123    def writable(self):
124        return False
125
126    def seekable(self):
127        return True
128
129    @_FileLikeObjectBase._before_close
130    @retry_method
131    def readall(self, size=2**20, num_retries=None):
132        while True:
133            data = self.read(size, num_retries=num_retries)
134            if len(data) == 0:
135                break
136            yield data
137
138    @_FileLikeObjectBase._before_close
139    @retry_method
140    def readline(self, size=float('inf'), num_retries=None):
141        cache_pos, cache_data = self._readline_cache
142        if self.tell() == cache_pos:
143            data = [cache_data]
144            self._filepos += len(cache_data)
145        else:
146            data = [b'']
147        data_size = len(data[-1])
148        while (data_size < size) and (b'\n' not in data[-1]):
149            next_read = self.read(2 ** 20, num_retries=num_retries)
150            if not next_read:
151                break
152            data.append(next_read)
153            data_size += len(next_read)
154        data = b''.join(data)
155        try:
156            nextline_index = data.index(b'\n') + 1
157        except ValueError:
158            nextline_index = len(data)
159        nextline_index = min(nextline_index, size)
160        self._filepos -= len(data) - nextline_index
161        self._readline_cache = (self.tell(), data[nextline_index:])
162        return data[:nextline_index].decode()
163
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def decompress(self, decompress, size, num_retries=None):
167        for segment in self.readall(size, num_retries=num_retries):
168            data = decompress(segment)
169            if data:
170                yield data
171
172    @_FileLikeObjectBase._before_close
173    @retry_method
174    def readall_decompressed(self, size=2**20, num_retries=None):
175        self.seek(0)
176        if self.name.endswith('.bz2'):
177            dc = bz2.BZ2Decompressor()
178            return self.decompress(dc.decompress, size,
179                                   num_retries=num_retries)
180        elif self.name.endswith('.gz'):
181            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
182            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
183                                   size, num_retries=num_retries)
184        else:
185            return self.readall(size, num_retries=num_retries)
186
187    @_FileLikeObjectBase._before_close
188    @retry_method
189    def readlines(self, sizehint=float('inf'), num_retries=None):
190        data = []
191        data_size = 0
192        for s in self.readall(num_retries=num_retries):
193            data.append(s)
194            data_size += len(s)
195            if data_size >= sizehint:
196                break
197        return b''.join(data).decode().splitlines(True)
198
199    def size(self):
200        raise IOError(errno.ENOSYS, "Not implemented")
201
202    def read(self, size, num_retries=None):
203        raise IOError(errno.ENOSYS, "Not implemented")
204
205    def readfrom(self, start, size, num_retries=None):
206        raise IOError(errno.ENOSYS, "Not implemented")
ArvadosFileReaderBase(name, mode, num_retries=None)
90    def __init__(self, name, mode, num_retries=None):
91        super(ArvadosFileReaderBase, self).__init__(name, mode)
92        self._filepos = 0
93        self.num_retries = num_retries
94        self._readline_cache = (None, None)
num_retries
def decompressed_name(self):
103    def decompressed_name(self):
104        return re.sub(r'\.(bz2|gz)$', '', self.name)
def seek(self, pos, whence=0):
106    @_FileLikeObjectBase._before_close
107    def seek(self, pos, whence=os.SEEK_SET):
108        if whence == os.SEEK_CUR:
109            pos += self._filepos
110        elif whence == os.SEEK_END:
111            pos += self.size()
112        if pos < 0:
113            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
114        self._filepos = pos
115        return self._filepos
def tell(self):
117    def tell(self):
118        return self._filepos
def readable(self):
120    def readable(self):
121        return True
def writable(self):
123    def writable(self):
124        return False
def seekable(self):
126    def seekable(self):
127        return True
@retry_method
def readall(self, size=1048576, num_retries=None):
129    @_FileLikeObjectBase._before_close
130    @retry_method
131    def readall(self, size=2**20, num_retries=None):
132        while True:
133            data = self.read(size, num_retries=num_retries)
134            if len(data) == 0:
135                break
136            yield data
@retry_method
def readline(self, size=inf, num_retries=None):
138    @_FileLikeObjectBase._before_close
139    @retry_method
140    def readline(self, size=float('inf'), num_retries=None):
141        cache_pos, cache_data = self._readline_cache
142        if self.tell() == cache_pos:
143            data = [cache_data]
144            self._filepos += len(cache_data)
145        else:
146            data = [b'']
147        data_size = len(data[-1])
148        while (data_size < size) and (b'\n' not in data[-1]):
149            next_read = self.read(2 ** 20, num_retries=num_retries)
150            if not next_read:
151                break
152            data.append(next_read)
153            data_size += len(next_read)
154        data = b''.join(data)
155        try:
156            nextline_index = data.index(b'\n') + 1
157        except ValueError:
158            nextline_index = len(data)
159        nextline_index = min(nextline_index, size)
160        self._filepos -= len(data) - nextline_index
161        self._readline_cache = (self.tell(), data[nextline_index:])
162        return data[:nextline_index].decode()
@retry_method
def decompress(self, decompress, size, num_retries=None):
164    @_FileLikeObjectBase._before_close
165    @retry_method
166    def decompress(self, decompress, size, num_retries=None):
167        for segment in self.readall(size, num_retries=num_retries):
168            data = decompress(segment)
169            if data:
170                yield data
@retry_method
def readall_decompressed(self, size=1048576, num_retries=None):
172    @_FileLikeObjectBase._before_close
173    @retry_method
174    def readall_decompressed(self, size=2**20, num_retries=None):
175        self.seek(0)
176        if self.name.endswith('.bz2'):
177            dc = bz2.BZ2Decompressor()
178            return self.decompress(dc.decompress, size,
179                                   num_retries=num_retries)
180        elif self.name.endswith('.gz'):
181            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
182            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
183                                   size, num_retries=num_retries)
184        else:
185            return self.readall(size, num_retries=num_retries)
@retry_method
def readlines(self, sizehint=inf, num_retries=None):
187    @_FileLikeObjectBase._before_close
188    @retry_method
189    def readlines(self, sizehint=float('inf'), num_retries=None):
190        data = []
191        data_size = 0
192        for s in self.readall(num_retries=num_retries):
193            data.append(s)
194            data_size += len(s)
195            if data_size >= sizehint:
196                break
197        return b''.join(data).decode().splitlines(True)
def size(self):
199    def size(self):
200        raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
202    def read(self, size, num_retries=None):
203        raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
205    def readfrom(self, start, size, num_retries=None):
206        raise IOError(errno.ENOSYS, "Not implemented")
class StreamFileReader(ArvadosFileReaderBase):
209class StreamFileReader(ArvadosFileReaderBase):
210    class _NameAttribute(str):
211        # The Python file API provides a plain .name attribute.
212        # Older SDK provided a name() method.
213        # This class provides both, for maximum compatibility.
214        def __call__(self):
215            return self
216
217    def __init__(self, stream, segments, name):
218        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
219        self._stream = stream
220        self.segments = segments
221
222    def stream_name(self):
223        return self._stream.name()
224
225    def size(self):
226        n = self.segments[-1]
227        return n.range_start + n.range_size
228
229    @_FileLikeObjectBase._before_close
230    @retry_method
231    def read(self, size, num_retries=None):
232        """Read up to 'size' bytes from the stream, starting at the current file position"""
233        if size == 0:
234            return b''
235
236        data = b''
237        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
238        if available_chunks:
239            lr = available_chunks[0]
240            data = self._stream.readfrom(lr.locator+lr.segment_offset,
241                                         lr.segment_size,
242                                         num_retries=num_retries)
243
244        self._filepos += len(data)
245        return data
246
247    @_FileLikeObjectBase._before_close
248    @retry_method
249    def readfrom(self, start, size, num_retries=None):
250        """Read up to 'size' bytes from the stream, starting at 'start'"""
251        if size == 0:
252            return b''
253
254        data = []
255        for lr in locators_and_ranges(self.segments, start, size):
256            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
257                                              num_retries=num_retries))
258        return b''.join(data)
259
260    def as_manifest(self):
261        segs = []
262        for r in self.segments:
263            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
264        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
StreamFileReader(stream, segments, name)
217    def __init__(self, stream, segments, name):
218        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
219        self._stream = stream
220        self.segments = segments
segments
def stream_name(self):
222    def stream_name(self):
223        return self._stream.name()
def size(self):
225    def size(self):
226        n = self.segments[-1]
227        return n.range_start + n.range_size
@retry_method
def read(self, size, num_retries=None):
229    @_FileLikeObjectBase._before_close
230    @retry_method
231    def read(self, size, num_retries=None):
232        """Read up to 'size' bytes from the stream, starting at the current file position"""
233        if size == 0:
234            return b''
235
236        data = b''
237        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
238        if available_chunks:
239            lr = available_chunks[0]
240            data = self._stream.readfrom(lr.locator+lr.segment_offset,
241                                         lr.segment_size,
242                                         num_retries=num_retries)
243
244        self._filepos += len(data)
245        return data

Read up to ‘size’ bytes from the stream, starting at the current file position

@retry_method
def readfrom(self, start, size, num_retries=None):
247    @_FileLikeObjectBase._before_close
248    @retry_method
249    def readfrom(self, start, size, num_retries=None):
250        """Read up to 'size' bytes from the stream, starting at 'start'"""
251        if size == 0:
252            return b''
253
254        data = []
255        for lr in locators_and_ranges(self.segments, start, size):
256            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
257                                              num_retries=num_retries))
258        return b''.join(data)

Read up to ‘size’ bytes from the stream, starting at ‘start’

def as_manifest(self):
260    def as_manifest(self):
261        segs = []
262        for r in self.segments:
263            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
264        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
def synchronized(orig_func):
267def synchronized(orig_func):
268    @functools.wraps(orig_func)
269    def synchronized_wrapper(self, *args, **kwargs):
270        with self.lock:
271            return orig_func(self, *args, **kwargs)
272    return synchronized_wrapper
class StateChangeError(builtins.Exception):
275class StateChangeError(Exception):
276    def __init__(self, message, state, nextstate):
277        super(StateChangeError, self).__init__(message)
278        self.state = state
279        self.nextstate = nextstate

Common base class for all non-exit exceptions.

StateChangeError(message, state, nextstate)
276    def __init__(self, message, state, nextstate):
277        super(StateChangeError, self).__init__(message)
278        self.state = state
279        self.nextstate = nextstate
state
nextstate
Inherited Members
builtins.BaseException
with_traceback
args
class NoopLock:
451class NoopLock(object):
452    def __enter__(self):
453        return self
454
455    def __exit__(self, exc_type, exc_value, traceback):
456        pass
457
458    def acquire(self, blocking=False):
459        pass
460
461    def release(self):
462        pass
def acquire(self, blocking=False):
458    def acquire(self, blocking=False):
459        pass
def release(self):
461    def release(self):
462        pass
def must_be_writable(orig_func):
465def must_be_writable(orig_func):
466    @functools.wraps(orig_func)
467    def must_be_writable_wrapper(self, *args, **kwargs):
468        if not self.writable():
469            raise IOError(errno.EROFS, "Collection is read-only.")
470        return orig_func(self, *args, **kwargs)
471    return must_be_writable_wrapper
class ArvadosFile:
 817class ArvadosFile(object):
 818    """Represent a file in a Collection.
 819
 820    ArvadosFile manages the underlying representation of a file in Keep as a
 821    sequence of segments spanning a set of blocks, and implements random
 822    read/write access.
 823
 824    This object may be accessed from multiple threads.
 825
 826    """
 827
 828    __slots__ = ('parent', 'name', '_writers', '_committed',
 829                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 830
 831    def __init__(self, parent, name, stream=[], segments=[]):
 832        """
 833        ArvadosFile constructor.
 834
 835        :stream:
 836          a list of Range objects representing a block stream
 837
 838        :segments:
 839          a list of Range objects representing segments
 840        """
 841        self.parent = parent
 842        self.name = name
 843        self._writers = set()
 844        self._committed = False
 845        self._segments = []
 846        self.lock = parent.root_collection().lock
 847        for s in segments:
 848            self._add_segment(stream, s.locator, s.range_size)
 849        self._current_bblock = None
 850        self._read_counter = 0
 851
 852    def writable(self):
 853        return self.parent.writable()
 854
 855    @synchronized
 856    def permission_expired(self, as_of_dt=None):
 857        """Returns True if any of the segment's locators is expired"""
 858        for r in self._segments:
 859            if KeepLocator(r.locator).permission_expired(as_of_dt):
 860                return True
 861        return False
 862
 863    @synchronized
 864    def has_remote_blocks(self):
 865        """Returns True if any of the segment's locators has a +R signature"""
 866
 867        for s in self._segments:
 868            if '+R' in s.locator:
 869                return True
 870        return False
 871
 872    @synchronized
 873    def _copy_remote_blocks(self, remote_blocks={}):
 874        """Ask Keep to copy remote blocks and point to their local copies.
 875
 876        This is called from the parent Collection.
 877
 878        :remote_blocks:
 879            Shared cache of remote to local block mappings. This is used to avoid
 880            doing extra work when blocks are shared by more than one file in
 881            different subdirectories.
 882        """
 883
 884        for s in self._segments:
 885            if '+R' in s.locator:
 886                try:
 887                    loc = remote_blocks[s.locator]
 888                except KeyError:
 889                    loc = self.parent._my_keep().refresh_signature(s.locator)
 890                    remote_blocks[s.locator] = loc
 891                s.locator = loc
 892                self.parent.set_committed(False)
 893        return remote_blocks
 894
 895    @synchronized
 896    def segments(self):
 897        return copy.copy(self._segments)
 898
 899    @synchronized
 900    def clone(self, new_parent, new_name):
 901        """Make a copy of this file."""
 902        cp = ArvadosFile(new_parent, new_name)
 903        cp.replace_contents(self)
 904        return cp
 905
 906    @must_be_writable
 907    @synchronized
 908    def replace_contents(self, other):
 909        """Replace segments of this file with segments from another `ArvadosFile` object."""
 910
 911        map_loc = {}
 912        self._segments = []
 913        for other_segment in other.segments():
 914            new_loc = other_segment.locator
 915            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 916                if other_segment.locator not in map_loc:
 917                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 918                    if bufferblock.state() != _BufferBlock.WRITABLE:
 919                        map_loc[other_segment.locator] = bufferblock.locator()
 920                    else:
 921                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 922                new_loc = map_loc[other_segment.locator]
 923
 924            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 925
 926        self.set_committed(False)
 927
 928    def __eq__(self, other):
 929        if other is self:
 930            return True
 931        if not isinstance(other, ArvadosFile):
 932            return False
 933
 934        othersegs = other.segments()
 935        with self.lock:
 936            if len(self._segments) != len(othersegs):
 937                return False
 938            for i in range(0, len(othersegs)):
 939                seg1 = self._segments[i]
 940                seg2 = othersegs[i]
 941                loc1 = seg1.locator
 942                loc2 = seg2.locator
 943
 944                if self.parent._my_block_manager().is_bufferblock(loc1):
 945                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 946
 947                if other.parent._my_block_manager().is_bufferblock(loc2):
 948                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 949
 950                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 951                    seg1.range_start != seg2.range_start or
 952                    seg1.range_size != seg2.range_size or
 953                    seg1.segment_offset != seg2.segment_offset):
 954                    return False
 955
 956        return True
 957
 958    def __ne__(self, other):
 959        return not self.__eq__(other)
 960
 961    @synchronized
 962    def set_segments(self, segs):
 963        self._segments = segs
 964
 965    @synchronized
 966    def set_committed(self, value=True):
 967        """Set committed flag.
 968
 969        If value is True, set committed to be True.
 970
 971        If value is False, set committed to be False for this and all parents.
 972        """
 973        if value == self._committed:
 974            return
 975        self._committed = value
 976        if self._committed is False and self.parent is not None:
 977            self.parent.set_committed(False)
 978
 979    @synchronized
 980    def committed(self):
 981        """Get whether this is committed or not."""
 982        return self._committed
 983
 984    @synchronized
 985    def add_writer(self, writer):
 986        """Add an ArvadosFileWriter reference to the list of writers"""
 987        if isinstance(writer, ArvadosFileWriter):
 988            self._writers.add(writer)
 989
 990    @synchronized
 991    def remove_writer(self, writer, flush):
 992        """
 993        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 994        and do some block maintenance tasks.
 995        """
 996        self._writers.remove(writer)
 997
 998        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 999            # File writer closed, not small enough for repacking
1000            self.flush()
1001        elif self.closed():
1002            # All writers closed and size is adequate for repacking
1003            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1004
1005    def closed(self):
1006        """
1007        Get whether this is closed or not. When the writers list is empty, the file
1008        is supposed to be closed.
1009        """
1010        return len(self._writers) == 0
1011
1012    @must_be_writable
1013    @synchronized
1014    def truncate(self, size):
1015        """Shrink or expand the size of the file.
1016
1017        If `size` is less than the size of the file, the file contents after
1018        `size` will be discarded.  If `size` is greater than the current size
1019        of the file, it will be filled with zero bytes.
1020
1021        """
1022        if size < self.size():
1023            new_segs = []
1024            for r in self._segments:
1025                range_end = r.range_start+r.range_size
1026                if r.range_start >= size:
1027                    # segment is past the trucate size, all done
1028                    break
1029                elif size < range_end:
1030                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1031                    nr.segment_offset = r.segment_offset
1032                    new_segs.append(nr)
1033                    break
1034                else:
1035                    new_segs.append(r)
1036
1037            self._segments = new_segs
1038            self.set_committed(False)
1039        elif size > self.size():
1040            padding = self.parent._my_block_manager().get_padding_block()
1041            diff = size - self.size()
1042            while diff > config.KEEP_BLOCK_SIZE:
1043                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1044                diff -= config.KEEP_BLOCK_SIZE
1045            if diff > 0:
1046                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1047            self.set_committed(False)
1048        else:
1049            # size == self.size()
1050            pass
1051
1052    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
1053        """Read up to `size` bytes from the file starting at `offset`.
1054
1055        Arguments:
1056
1057        * exact: bool --- If False (default), return less data than
1058         requested if the read crosses a block boundary and the next
1059         block isn't cached.  If True, only return less data than
1060         requested when hitting EOF.
1061
1062        * return_memoryview: bool --- If False (default) return a
1063          `bytes` object, which may entail making a copy in some
1064          situations.  If True, return a `memoryview` object which may
1065          avoid making a copy, but may be incompatible with code
1066          expecting a `bytes` object.
1067
1068        """
1069
1070        with self.lock:
1071            if size == 0 or offset >= self.size():
1072                return memoryview(b'') if return_memoryview else b''
1073            readsegs = locators_and_ranges(self._segments, offset, size)
1074
1075            prefetch = None
1076            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1077            if prefetch_lookahead:
1078                # Doing prefetch on every read() call is surprisingly expensive
1079                # when we're trying to deliver data at 600+ MiBps and want
1080                # the read() fast path to be as lightweight as possible.
1081                #
1082                # Only prefetching every 128 read operations
1083                # dramatically reduces the overhead while still
1084                # getting the benefit of prefetching (e.g. when
1085                # reading 128 KiB at a time, it checks for prefetch
1086                # every 16 MiB).
1087                self._read_counter = (self._read_counter+1) % 128
1088                if self._read_counter == 1:
1089                    prefetch = locators_and_ranges(self._segments,
1090                                                   offset + size,
1091                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1092                                                   limit=(1+prefetch_lookahead))
1093
1094        locs = set()
1095        data = []
1096        for lr in readsegs:
1097            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1098            if block:
1099                blockview = memoryview(block)
1100                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1101                locs.add(lr.locator)
1102            else:
1103                break
1104
1105        if prefetch:
1106            for lr in prefetch:
1107                if lr.locator not in locs:
1108                    self.parent._my_block_manager().block_prefetch(lr.locator)
1109                    locs.add(lr.locator)
1110
1111        if len(data) == 1:
1112            return data[0] if return_memoryview else data[0].tobytes()
1113        else:
1114            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1115
1116
1117    @must_be_writable
1118    @synchronized
1119    def writeto(self, offset, data, num_retries):
1120        """Write `data` to the file starting at `offset`.
1121
1122        This will update existing bytes and/or extend the size of the file as
1123        necessary.
1124
1125        """
1126        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1127            data = data.encode()
1128        if len(data) == 0:
1129            return
1130
1131        if offset > self.size():
1132            self.truncate(offset)
1133
1134        if len(data) > config.KEEP_BLOCK_SIZE:
1135            # Chunk it up into smaller writes
1136            n = 0
1137            dataview = memoryview(data)
1138            while n < len(data):
1139                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1140                n += config.KEEP_BLOCK_SIZE
1141            return
1142
1143        self.set_committed(False)
1144
1145        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1146            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1147
1148        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1149            self._current_bblock.repack_writes()
1150            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1151                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1152                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1153
1154        self._current_bblock.append(data)
1155
1156        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1157
1158        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1159
1160        return len(data)
1161
1162    @synchronized
1163    def flush(self, sync=True, num_retries=0):
1164        """Flush the current bufferblock to Keep.
1165
1166        :sync:
1167          If True, commit block synchronously, wait until buffer block has been written.
1168          If False, commit block asynchronously, return immediately after putting block into
1169          the keep put queue.
1170        """
1171        if self.committed():
1172            return
1173
1174        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1175            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1176                self._current_bblock.repack_writes()
1177            if self._current_bblock.state() != _BufferBlock.DELETED:
1178                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1179
1180        if sync:
1181            to_delete = set()
1182            for s in self._segments:
1183                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1184                if bb:
1185                    if bb.state() != _BufferBlock.COMMITTED:
1186                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1187                    to_delete.add(s.locator)
1188                    s.locator = bb.locator()
1189            for s in to_delete:
1190                # Don't delete the bufferblock if it's owned by many files. It'll be
1191                # deleted after all of its owners are flush()ed.
1192                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1193                    self.parent._my_block_manager().delete_bufferblock(s)
1194
1195        self.parent.notify(MOD, self.parent, self.name, (self, self))
1196
1197    @must_be_writable
1198    @synchronized
1199    def add_segment(self, blocks, pos, size):
1200        """Add a segment to the end of the file.
1201
1202        `pos` and `offset` reference a section of the stream described by
1203        `blocks` (a list of Range objects)
1204
1205        """
1206        self._add_segment(blocks, pos, size)
1207
1208    def _add_segment(self, blocks, pos, size):
1209        """Internal implementation of add_segment."""
1210        self.set_committed(False)
1211        for lr in locators_and_ranges(blocks, pos, size):
1212            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1213            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1214            self._segments.append(r)
1215
1216    @synchronized
1217    def size(self):
1218        """Get the file size."""
1219        if self._segments:
1220            n = self._segments[-1]
1221            return n.range_start + n.range_size
1222        else:
1223            return 0
1224
1225    @synchronized
1226    def manifest_text(self, stream_name=".", portable_locators=False,
1227                      normalize=False, only_committed=False):
1228        buf = ""
1229        filestream = []
1230        for segment in self._segments:
1231            loc = segment.locator
1232            if self.parent._my_block_manager().is_bufferblock(loc):
1233                if only_committed:
1234                    continue
1235                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1236            if portable_locators:
1237                loc = KeepLocator(loc).stripped()
1238            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1239                                 segment.segment_offset, segment.range_size))
1240        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1241        buf += "\n"
1242        return buf
1243
1244    @must_be_writable
1245    @synchronized
1246    def _reparent(self, newparent, newname):
1247        self.set_committed(False)
1248        self.flush(sync=True)
1249        self.parent.remove(self.name)
1250        self.parent = newparent
1251        self.name = newname
1252        self.lock = self.parent.root_collection().lock

Represent a file in a Collection.

ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.

This object may be accessed from multiple threads.

ArvadosFile(parent, name, stream=[], segments=[])
831    def __init__(self, parent, name, stream=[], segments=[]):
832        """
833        ArvadosFile constructor.
834
835        :stream:
836          a list of Range objects representing a block stream
837
838        :segments:
839          a list of Range objects representing segments
840        """
841        self.parent = parent
842        self.name = name
843        self._writers = set()
844        self._committed = False
845        self._segments = []
846        self.lock = parent.root_collection().lock
847        for s in segments:
848            self._add_segment(stream, s.locator, s.range_size)
849        self._current_bblock = None
850        self._read_counter = 0

ArvadosFile constructor.

:stream: a list of Range objects representing a block stream

:segments: a list of Range objects representing segments

parent
name
lock
def writable(self):
852    def writable(self):
853        return self.parent.writable()
@synchronized
def permission_expired(self, as_of_dt=None):
855    @synchronized
856    def permission_expired(self, as_of_dt=None):
857        """Returns True if any of the segment's locators is expired"""
858        for r in self._segments:
859            if KeepLocator(r.locator).permission_expired(as_of_dt):
860                return True
861        return False

Returns True if any of the segment’s locators is expired

@synchronized
def has_remote_blocks(self):
863    @synchronized
864    def has_remote_blocks(self):
865        """Returns True if any of the segment's locators has a +R signature"""
866
867        for s in self._segments:
868            if '+R' in s.locator:
869                return True
870        return False

Returns True if any of the segment’s locators has a +R signature

@synchronized
def segments(self):
895    @synchronized
896    def segments(self):
897        return copy.copy(self._segments)
@synchronized
def clone(self, new_parent, new_name):
899    @synchronized
900    def clone(self, new_parent, new_name):
901        """Make a copy of this file."""
902        cp = ArvadosFile(new_parent, new_name)
903        cp.replace_contents(self)
904        return cp

Make a copy of this file.

@must_be_writable
@synchronized
def replace_contents(self, other):
906    @must_be_writable
907    @synchronized
908    def replace_contents(self, other):
909        """Replace segments of this file with segments from another `ArvadosFile` object."""
910
911        map_loc = {}
912        self._segments = []
913        for other_segment in other.segments():
914            new_loc = other_segment.locator
915            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
916                if other_segment.locator not in map_loc:
917                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
918                    if bufferblock.state() != _BufferBlock.WRITABLE:
919                        map_loc[other_segment.locator] = bufferblock.locator()
920                    else:
921                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
922                new_loc = map_loc[other_segment.locator]
923
924            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
925
926        self.set_committed(False)

Replace segments of this file with segments from another ArvadosFile object.

@synchronized
def set_segments(self, segs):
961    @synchronized
962    def set_segments(self, segs):
963        self._segments = segs
@synchronized
def set_committed(self, value=True):
965    @synchronized
966    def set_committed(self, value=True):
967        """Set committed flag.
968
969        If value is True, set committed to be True.
970
971        If value is False, set committed to be False for this and all parents.
972        """
973        if value == self._committed:
974            return
975        self._committed = value
976        if self._committed is False and self.parent is not None:
977            self.parent.set_committed(False)

Set committed flag.

If value is True, set committed to be True.

If value is False, set committed to be False for this and all parents.

@synchronized
def committed(self):
979    @synchronized
980    def committed(self):
981        """Get whether this is committed or not."""
982        return self._committed

Get whether this is committed or not.

@synchronized
def add_writer(self, writer):
984    @synchronized
985    def add_writer(self, writer):
986        """Add an ArvadosFileWriter reference to the list of writers"""
987        if isinstance(writer, ArvadosFileWriter):
988            self._writers.add(writer)

Add an ArvadosFileWriter reference to the list of writers

@synchronized
def remove_writer(self, writer, flush):
 990    @synchronized
 991    def remove_writer(self, writer, flush):
 992        """
 993        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 994        and do some block maintenance tasks.
 995        """
 996        self._writers.remove(writer)
 997
 998        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 999            # File writer closed, not small enough for repacking
1000            self.flush()
1001        elif self.closed():
1002            # All writers closed and size is adequate for repacking
1003            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())

Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.

def closed(self):
1005    def closed(self):
1006        """
1007        Get whether this is closed or not. When the writers list is empty, the file
1008        is supposed to be closed.
1009        """
1010        return len(self._writers) == 0

Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.

@must_be_writable
@synchronized
def truncate(self, size):
1012    @must_be_writable
1013    @synchronized
1014    def truncate(self, size):
1015        """Shrink or expand the size of the file.
1016
1017        If `size` is less than the size of the file, the file contents after
1018        `size` will be discarded.  If `size` is greater than the current size
1019        of the file, it will be filled with zero bytes.
1020
1021        """
1022        if size < self.size():
1023            new_segs = []
1024            for r in self._segments:
1025                range_end = r.range_start+r.range_size
1026                if r.range_start >= size:
1027                    # segment is past the trucate size, all done
1028                    break
1029                elif size < range_end:
1030                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1031                    nr.segment_offset = r.segment_offset
1032                    new_segs.append(nr)
1033                    break
1034                else:
1035                    new_segs.append(r)
1036
1037            self._segments = new_segs
1038            self.set_committed(False)
1039        elif size > self.size():
1040            padding = self.parent._my_block_manager().get_padding_block()
1041            diff = size - self.size()
1042            while diff > config.KEEP_BLOCK_SIZE:
1043                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1044                diff -= config.KEEP_BLOCK_SIZE
1045            if diff > 0:
1046                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1047            self.set_committed(False)
1048        else:
1049            # size == self.size()
1050            pass

Shrink or expand the size of the file.

If size is less than the size of the file, the file contents after size will be discarded. If size is greater than the current size of the file, it will be filled with zero bytes.

def readfrom( self, offset, size, num_retries, exact=False, return_memoryview=False):
1052    def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False):
1053        """Read up to `size` bytes from the file starting at `offset`.
1054
1055        Arguments:
1056
1057        * exact: bool --- If False (default), return less data than
1058         requested if the read crosses a block boundary and the next
1059         block isn't cached.  If True, only return less data than
1060         requested when hitting EOF.
1061
1062        * return_memoryview: bool --- If False (default) return a
1063          `bytes` object, which may entail making a copy in some
1064          situations.  If True, return a `memoryview` object which may
1065          avoid making a copy, but may be incompatible with code
1066          expecting a `bytes` object.
1067
1068        """
1069
1070        with self.lock:
1071            if size == 0 or offset >= self.size():
1072                return memoryview(b'') if return_memoryview else b''
1073            readsegs = locators_and_ranges(self._segments, offset, size)
1074
1075            prefetch = None
1076            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1077            if prefetch_lookahead:
1078                # Doing prefetch on every read() call is surprisingly expensive
1079                # when we're trying to deliver data at 600+ MiBps and want
1080                # the read() fast path to be as lightweight as possible.
1081                #
1082                # Only prefetching every 128 read operations
1083                # dramatically reduces the overhead while still
1084                # getting the benefit of prefetching (e.g. when
1085                # reading 128 KiB at a time, it checks for prefetch
1086                # every 16 MiB).
1087                self._read_counter = (self._read_counter+1) % 128
1088                if self._read_counter == 1:
1089                    prefetch = locators_and_ranges(self._segments,
1090                                                   offset + size,
1091                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1092                                                   limit=(1+prefetch_lookahead))
1093
1094        locs = set()
1095        data = []
1096        for lr in readsegs:
1097            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1098            if block:
1099                blockview = memoryview(block)
1100                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1101                locs.add(lr.locator)
1102            else:
1103                break
1104
1105        if prefetch:
1106            for lr in prefetch:
1107                if lr.locator not in locs:
1108                    self.parent._my_block_manager().block_prefetch(lr.locator)
1109                    locs.add(lr.locator)
1110
1111        if len(data) == 1:
1112            return data[0] if return_memoryview else data[0].tobytes()
1113        else:
1114            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)

Read up to size bytes from the file starting at offset.

Arguments:

  • exact: bool — If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.

  • return_memoryview: bool — If False (default) return a bytes object, which may entail making a copy in some situations. If True, return a memoryview object which may avoid making a copy, but may be incompatible with code expecting a bytes object.

@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
1117    @must_be_writable
1118    @synchronized
1119    def writeto(self, offset, data, num_retries):
1120        """Write `data` to the file starting at `offset`.
1121
1122        This will update existing bytes and/or extend the size of the file as
1123        necessary.
1124
1125        """
1126        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1127            data = data.encode()
1128        if len(data) == 0:
1129            return
1130
1131        if offset > self.size():
1132            self.truncate(offset)
1133
1134        if len(data) > config.KEEP_BLOCK_SIZE:
1135            # Chunk it up into smaller writes
1136            n = 0
1137            dataview = memoryview(data)
1138            while n < len(data):
1139                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1140                n += config.KEEP_BLOCK_SIZE
1141            return
1142
1143        self.set_committed(False)
1144
1145        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1146            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1147
1148        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1149            self._current_bblock.repack_writes()
1150            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1151                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1152                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1153
1154        self._current_bblock.append(data)
1155
1156        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1157
1158        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1159
1160        return len(data)

Write data to the file starting at offset.

This will update existing bytes and/or extend the size of the file as necessary.

@synchronized
def flush(self, sync=True, num_retries=0):
1162    @synchronized
1163    def flush(self, sync=True, num_retries=0):
1164        """Flush the current bufferblock to Keep.
1165
1166        :sync:
1167          If True, commit block synchronously, wait until buffer block has been written.
1168          If False, commit block asynchronously, return immediately after putting block into
1169          the keep put queue.
1170        """
1171        if self.committed():
1172            return
1173
1174        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1175            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1176                self._current_bblock.repack_writes()
1177            if self._current_bblock.state() != _BufferBlock.DELETED:
1178                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1179
1180        if sync:
1181            to_delete = set()
1182            for s in self._segments:
1183                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1184                if bb:
1185                    if bb.state() != _BufferBlock.COMMITTED:
1186                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1187                    to_delete.add(s.locator)
1188                    s.locator = bb.locator()
1189            for s in to_delete:
1190                # Don't delete the bufferblock if it's owned by many files. It'll be
1191                # deleted after all of its owners are flush()ed.
1192                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1193                    self.parent._my_block_manager().delete_bufferblock(s)
1194
1195        self.parent.notify(MOD, self.parent, self.name, (self, self))

Flush the current bufferblock to Keep.

:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.

@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
1197    @must_be_writable
1198    @synchronized
1199    def add_segment(self, blocks, pos, size):
1200        """Add a segment to the end of the file.
1201
1202        `pos` and `offset` reference a section of the stream described by
1203        `blocks` (a list of Range objects)
1204
1205        """
1206        self._add_segment(blocks, pos, size)

Add a segment to the end of the file.

pos and offset reference a section of the stream described by blocks (a list of Range objects)

@synchronized
def size(self):
1216    @synchronized
1217    def size(self):
1218        """Get the file size."""
1219        if self._segments:
1220            n = self._segments[-1]
1221            return n.range_start + n.range_size
1222        else:
1223            return 0

Get the file size.

@synchronized
def manifest_text( self, stream_name='.', portable_locators=False, normalize=False, only_committed=False):
1225    @synchronized
1226    def manifest_text(self, stream_name=".", portable_locators=False,
1227                      normalize=False, only_committed=False):
1228        buf = ""
1229        filestream = []
1230        for segment in self._segments:
1231            loc = segment.locator
1232            if self.parent._my_block_manager().is_bufferblock(loc):
1233                if only_committed:
1234                    continue
1235                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1236            if portable_locators:
1237                loc = KeepLocator(loc).stripped()
1238            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1239                                 segment.segment_offset, segment.range_size))
1240        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1241        buf += "\n"
1242        return buf
fuse_entry
class ArvadosFileReader(ArvadosFileReaderBase):
1255class ArvadosFileReader(ArvadosFileReaderBase):
1256    """Wraps ArvadosFile in a file-like object supporting reading only.
1257
1258    Be aware that this class is NOT thread safe as there is no locking around
1259    updating file pointer.
1260
1261    """
1262
1263    def __init__(self, arvadosfile, mode="r", num_retries=None):
1264        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1265        self.arvadosfile = arvadosfile
1266
1267    def size(self):
1268        return self.arvadosfile.size()
1269
1270    def stream_name(self):
1271        return self.arvadosfile.parent.stream_name()
1272
1273    def readinto(self, b):
1274        data = self.read(len(b))
1275        b[:len(data)] = data
1276        return len(data)
1277
1278    @_FileLikeObjectBase._before_close
1279    @retry_method
1280    def read(self, size=-1, num_retries=None, return_memoryview=False):
1281        """Read up to `size` bytes from the file and return the result.
1282
1283        Starts at the current file position.  If `size` is negative or None,
1284        read the entire remainder of the file.
1285
1286        Returns None if the file pointer is at the end of the file.
1287
1288        Returns a `bytes` object, unless `return_memoryview` is True,
1289        in which case it returns a memory view, which may avoid an
1290        unnecessary data copy in some situations.
1291
1292        """
1293        if size < 0 or size is None:
1294            data = []
1295            #
1296            # specify exact=False, return_memoryview=True here so that we
1297            # only copy data once into the final buffer.
1298            #
1299            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1300            while rd:
1301                data.append(rd)
1302                self._filepos += len(rd)
1303                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1304            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1305        else:
1306            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1307            self._filepos += len(data)
1308            return data
1309
1310    @_FileLikeObjectBase._before_close
1311    @retry_method
1312    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1313        """Read up to `size` bytes from the stream, starting at the specified file offset.
1314
1315        This method does not change the file position.
1316
1317        Returns a `bytes` object, unless `return_memoryview` is True,
1318        in which case it returns a memory view, which may avoid an
1319        unnecessary data copy in some situations.
1320
1321        """
1322        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
1323
1324    def flush(self):
1325        pass

Wraps ArvadosFile in a file-like object supporting reading only.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileReader(arvadosfile, mode='r', num_retries=None)
1263    def __init__(self, arvadosfile, mode="r", num_retries=None):
1264        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1265        self.arvadosfile = arvadosfile
arvadosfile
def size(self):
1267    def size(self):
1268        return self.arvadosfile.size()
def stream_name(self):
1270    def stream_name(self):
1271        return self.arvadosfile.parent.stream_name()
def readinto(self, b):
1273    def readinto(self, b):
1274        data = self.read(len(b))
1275        b[:len(data)] = data
1276        return len(data)
@retry_method
def read(self, size=-1, num_retries=None, return_memoryview=False):
1278    @_FileLikeObjectBase._before_close
1279    @retry_method
1280    def read(self, size=-1, num_retries=None, return_memoryview=False):
1281        """Read up to `size` bytes from the file and return the result.
1282
1283        Starts at the current file position.  If `size` is negative or None,
1284        read the entire remainder of the file.
1285
1286        Returns None if the file pointer is at the end of the file.
1287
1288        Returns a `bytes` object, unless `return_memoryview` is True,
1289        in which case it returns a memory view, which may avoid an
1290        unnecessary data copy in some situations.
1291
1292        """
1293        if size < 0 or size is None:
1294            data = []
1295            #
1296            # specify exact=False, return_memoryview=True here so that we
1297            # only copy data once into the final buffer.
1298            #
1299            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1300            while rd:
1301                data.append(rd)
1302                self._filepos += len(rd)
1303                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True)
1304            return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
1305        else:
1306            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview)
1307            self._filepos += len(data)
1308            return data

Read up to size bytes from the file and return the result.

Starts at the current file position. If size is negative or None, read the entire remainder of the file.

Returns None if the file pointer is at the end of the file.

Returns a bytes object, unless return_memoryview is True, in which case it returns a memory view, which may avoid an unnecessary data copy in some situations.

@retry_method
def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1310    @_FileLikeObjectBase._before_close
1311    @retry_method
1312    def readfrom(self, offset, size, num_retries=None, return_memoryview=False):
1313        """Read up to `size` bytes from the stream, starting at the specified file offset.
1314
1315        This method does not change the file position.
1316
1317        Returns a `bytes` object, unless `return_memoryview` is True,
1318        in which case it returns a memory view, which may avoid an
1319        unnecessary data copy in some situations.
1320
1321        """
1322        return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)

Read up to size bytes from the stream, starting at the specified file offset.

This method does not change the file position.

Returns a bytes object, unless return_memoryview is True, in which case it returns a memory view, which may avoid an unnecessary data copy in some situations.

def flush(self):
1324    def flush(self):
1325        pass
class ArvadosFileWriter(ArvadosFileReader):
1328class ArvadosFileWriter(ArvadosFileReader):
1329    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1330
1331    Be aware that this class is NOT thread safe as there is no locking around
1332    updating file pointer.
1333
1334    """
1335
1336    def __init__(self, arvadosfile, mode, num_retries=None):
1337        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1338        self.arvadosfile.add_writer(self)
1339
1340    def writable(self):
1341        return True
1342
1343    @_FileLikeObjectBase._before_close
1344    @retry_method
1345    def write(self, data, num_retries=None):
1346        if self.mode[0] == "a":
1347            self._filepos = self.size()
1348        self.arvadosfile.writeto(self._filepos, data, num_retries)
1349        self._filepos += len(data)
1350        return len(data)
1351
1352    @_FileLikeObjectBase._before_close
1353    @retry_method
1354    def writelines(self, seq, num_retries=None):
1355        for s in seq:
1356            self.write(s, num_retries=num_retries)
1357
1358    @_FileLikeObjectBase._before_close
1359    def truncate(self, size=None):
1360        if size is None:
1361            size = self._filepos
1362        self.arvadosfile.truncate(size)
1363
1364    @_FileLikeObjectBase._before_close
1365    def flush(self):
1366        self.arvadosfile.flush()
1367
1368    def close(self, flush=True):
1369        if not self.closed:
1370            self.arvadosfile.remove_writer(self, flush)
1371            super(ArvadosFileWriter, self).close()

Wraps ArvadosFile in a file-like object supporting both reading and writing.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileWriter(arvadosfile, mode, num_retries=None)
1336    def __init__(self, arvadosfile, mode, num_retries=None):
1337        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1338        self.arvadosfile.add_writer(self)
def writable(self):
1340    def writable(self):
1341        return True
@retry_method
def write(self, data, num_retries=None):
1343    @_FileLikeObjectBase._before_close
1344    @retry_method
1345    def write(self, data, num_retries=None):
1346        if self.mode[0] == "a":
1347            self._filepos = self.size()
1348        self.arvadosfile.writeto(self._filepos, data, num_retries)
1349        self._filepos += len(data)
1350        return len(data)
@retry_method
def writelines(self, seq, num_retries=None):
1352    @_FileLikeObjectBase._before_close
1353    @retry_method
1354    def writelines(self, seq, num_retries=None):
1355        for s in seq:
1356            self.write(s, num_retries=num_retries)
def truncate(self, size=None):
1358    @_FileLikeObjectBase._before_close
1359    def truncate(self, size=None):
1360        if size is None:
1361            size = self._filepos
1362        self.arvadosfile.truncate(size)
def flush(self):
1364    @_FileLikeObjectBase._before_close
1365    def flush(self):
1366        self.arvadosfile.flush()
def close(self, flush=True):
1368    def close(self, flush=True):
1369        if not self.closed:
1370            self.arvadosfile.remove_writer(self, flush)
1371            super(ArvadosFileWriter, self).close()
class WrappableFile:
1374class WrappableFile(object):
1375    """An interface to an Arvados file that's compatible with io wrappers.
1376
1377    """
1378    def __init__(self, f):
1379        self.f = f
1380        self.closed = False
1381    def close(self):
1382        self.closed = True
1383        return self.f.close()
1384    def flush(self):
1385        return self.f.flush()
1386    def read(self, *args, **kwargs):
1387        return self.f.read(*args, **kwargs)
1388    def readable(self):
1389        return self.f.readable()
1390    def readinto(self, *args, **kwargs):
1391        return self.f.readinto(*args, **kwargs)
1392    def seek(self, *args, **kwargs):
1393        return self.f.seek(*args, **kwargs)
1394    def seekable(self):
1395        return self.f.seekable()
1396    def tell(self):
1397        return self.f.tell()
1398    def writable(self):
1399        return self.f.writable()
1400    def write(self, *args, **kwargs):
1401        return self.f.write(*args, **kwargs)

An interface to an Arvados file that’s compatible with io wrappers.

WrappableFile(f)
1378    def __init__(self, f):
1379        self.f = f
1380        self.closed = False
f
closed
def close(self):
1381    def close(self):
1382        self.closed = True
1383        return self.f.close()
def flush(self):
1384    def flush(self):
1385        return self.f.flush()
def read(self, *args, **kwargs):
1386    def read(self, *args, **kwargs):
1387        return self.f.read(*args, **kwargs)
def readable(self):
1388    def readable(self):
1389        return self.f.readable()
def readinto(self, *args, **kwargs):
1390    def readinto(self, *args, **kwargs):
1391        return self.f.readinto(*args, **kwargs)
def seek(self, *args, **kwargs):
1392    def seek(self, *args, **kwargs):
1393        return self.f.seek(*args, **kwargs)
def seekable(self):
1394    def seekable(self):
1395        return self.f.seekable()
def tell(self):
1396    def tell(self):
1397        return self.f.tell()
def writable(self):
1398    def writable(self):
1399        return self.f.writable()
def write(self, *args, **kwargs):
1400    def write(self, *args, **kwargs):
1401        return self.f.write(*args, **kwargs)