arvados.arvfile

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import bz2
   6import collections
   7import copy
   8import errno
   9import functools
  10import hashlib
  11import logging
  12import os
  13import queue
  14import re
  15import sys
  16import threading
  17import uuid
  18import zlib
  19
  20from . import config
  21from .errors import KeepWriteError, AssertionError, ArgumentError
  22from .keep import KeepLocator
  23from ._normalize_stream import normalize_stream
  24from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
  25from .retry import retry_method
  26
  27MOD = "mod"
  28WRITE = "write"
  29
  30_logger = logging.getLogger('arvados.arvfile')
  31
  32def split(path):
  33    """split(path) -> streamname, filename
  34
  35    Separate the stream name and file name in a /-separated stream path and
  36    return a tuple (stream_name, file_name).  If no stream name is available,
  37    assume '.'.
  38
  39    """
  40    try:
  41        stream_name, file_name = path.rsplit('/', 1)
  42    except ValueError:  # No / in string
  43        stream_name, file_name = '.', path
  44    return stream_name, file_name
  45
  46
  47class UnownedBlockError(Exception):
  48    """Raised when there's an writable block without an owner on the BlockManager."""
  49    pass
  50
  51
  52class _FileLikeObjectBase(object):
  53    def __init__(self, name, mode):
  54        self.name = name
  55        self.mode = mode
  56        self.closed = False
  57
  58    @staticmethod
  59    def _before_close(orig_func):
  60        @functools.wraps(orig_func)
  61        def before_close_wrapper(self, *args, **kwargs):
  62            if self.closed:
  63                raise ValueError("I/O operation on closed stream file")
  64            return orig_func(self, *args, **kwargs)
  65        return before_close_wrapper
  66
  67    def __enter__(self):
  68        return self
  69
  70    def __exit__(self, exc_type, exc_value, traceback):
  71        try:
  72            self.close()
  73        except Exception:
  74            if exc_type is None:
  75                raise
  76
  77    def close(self):
  78        self.closed = True
  79
  80
  81class ArvadosFileReaderBase(_FileLikeObjectBase):
  82    def __init__(self, name, mode, num_retries=None):
  83        super(ArvadosFileReaderBase, self).__init__(name, mode)
  84        self._filepos = 0
  85        self.num_retries = num_retries
  86        self._readline_cache = (None, None)
  87
  88    def __iter__(self):
  89        while True:
  90            data = self.readline()
  91            if not data:
  92                break
  93            yield data
  94
  95    def decompressed_name(self):
  96        return re.sub(r'\.(bz2|gz)$', '', self.name)
  97
  98    @_FileLikeObjectBase._before_close
  99    def seek(self, pos, whence=os.SEEK_SET):
 100        if whence == os.SEEK_CUR:
 101            pos += self._filepos
 102        elif whence == os.SEEK_END:
 103            pos += self.size()
 104        if pos < 0:
 105            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
 106        self._filepos = pos
 107        return self._filepos
 108
 109    def tell(self):
 110        return self._filepos
 111
 112    def readable(self):
 113        return True
 114
 115    def writable(self):
 116        return False
 117
 118    def seekable(self):
 119        return True
 120
 121    @_FileLikeObjectBase._before_close
 122    @retry_method
 123    def readall(self, size=2**20, num_retries=None):
 124        while True:
 125            data = self.read(size, num_retries=num_retries)
 126            if len(data) == 0:
 127                break
 128            yield data
 129
 130    @_FileLikeObjectBase._before_close
 131    @retry_method
 132    def readline(self, size=float('inf'), num_retries=None):
 133        cache_pos, cache_data = self._readline_cache
 134        if self.tell() == cache_pos:
 135            data = [cache_data]
 136            self._filepos += len(cache_data)
 137        else:
 138            data = [b'']
 139        data_size = len(data[-1])
 140        while (data_size < size) and (b'\n' not in data[-1]):
 141            next_read = self.read(2 ** 20, num_retries=num_retries)
 142            if not next_read:
 143                break
 144            data.append(next_read)
 145            data_size += len(next_read)
 146        data = b''.join(data)
 147        try:
 148            nextline_index = data.index(b'\n') + 1
 149        except ValueError:
 150            nextline_index = len(data)
 151        nextline_index = min(nextline_index, size)
 152        self._filepos -= len(data) - nextline_index
 153        self._readline_cache = (self.tell(), data[nextline_index:])
 154        return data[:nextline_index].decode()
 155
 156    @_FileLikeObjectBase._before_close
 157    @retry_method
 158    def decompress(self, decompress, size, num_retries=None):
 159        for segment in self.readall(size, num_retries=num_retries):
 160            data = decompress(segment)
 161            if data:
 162                yield data
 163
 164    @_FileLikeObjectBase._before_close
 165    @retry_method
 166    def readall_decompressed(self, size=2**20, num_retries=None):
 167        self.seek(0)
 168        if self.name.endswith('.bz2'):
 169            dc = bz2.BZ2Decompressor()
 170            return self.decompress(dc.decompress, size,
 171                                   num_retries=num_retries)
 172        elif self.name.endswith('.gz'):
 173            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
 174            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
 175                                   size, num_retries=num_retries)
 176        else:
 177            return self.readall(size, num_retries=num_retries)
 178
 179    @_FileLikeObjectBase._before_close
 180    @retry_method
 181    def readlines(self, sizehint=float('inf'), num_retries=None):
 182        data = []
 183        data_size = 0
 184        for s in self.readall(num_retries=num_retries):
 185            data.append(s)
 186            data_size += len(s)
 187            if data_size >= sizehint:
 188                break
 189        return b''.join(data).decode().splitlines(True)
 190
 191    def size(self):
 192        raise IOError(errno.ENOSYS, "Not implemented")
 193
 194    def read(self, size, num_retries=None):
 195        raise IOError(errno.ENOSYS, "Not implemented")
 196
 197    def readfrom(self, start, size, num_retries=None):
 198        raise IOError(errno.ENOSYS, "Not implemented")
 199
 200
 201class StreamFileReader(ArvadosFileReaderBase):
 202    class _NameAttribute(str):
 203        # The Python file API provides a plain .name attribute.
 204        # Older SDK provided a name() method.
 205        # This class provides both, for maximum compatibility.
 206        def __call__(self):
 207            return self
 208
 209    def __init__(self, stream, segments, name):
 210        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
 211        self._stream = stream
 212        self.segments = segments
 213
 214    def stream_name(self):
 215        return self._stream.name()
 216
 217    def size(self):
 218        n = self.segments[-1]
 219        return n.range_start + n.range_size
 220
 221    @_FileLikeObjectBase._before_close
 222    @retry_method
 223    def read(self, size, num_retries=None):
 224        """Read up to 'size' bytes from the stream, starting at the current file position"""
 225        if size == 0:
 226            return b''
 227
 228        data = b''
 229        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
 230        if available_chunks:
 231            lr = available_chunks[0]
 232            data = self._stream.readfrom(lr.locator+lr.segment_offset,
 233                                         lr.segment_size,
 234                                         num_retries=num_retries)
 235
 236        self._filepos += len(data)
 237        return data
 238
 239    @_FileLikeObjectBase._before_close
 240    @retry_method
 241    def readfrom(self, start, size, num_retries=None):
 242        """Read up to 'size' bytes from the stream, starting at 'start'"""
 243        if size == 0:
 244            return b''
 245
 246        data = []
 247        for lr in locators_and_ranges(self.segments, start, size):
 248            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
 249                                              num_retries=num_retries))
 250        return b''.join(data)
 251
 252    def as_manifest(self):
 253        segs = []
 254        for r in self.segments:
 255            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
 256        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
 257
 258
 259def synchronized(orig_func):
 260    @functools.wraps(orig_func)
 261    def synchronized_wrapper(self, *args, **kwargs):
 262        with self.lock:
 263            return orig_func(self, *args, **kwargs)
 264    return synchronized_wrapper
 265
 266
 267class StateChangeError(Exception):
 268    def __init__(self, message, state, nextstate):
 269        super(StateChangeError, self).__init__(message)
 270        self.state = state
 271        self.nextstate = nextstate
 272
 273class _BufferBlock(object):
 274    """A stand-in for a Keep block that is in the process of being written.
 275
 276    Writers can append to it, get the size, and compute the Keep locator.
 277    There are three valid states:
 278
 279    WRITABLE
 280      Can append to block.
 281
 282    PENDING
 283      Block is in the process of being uploaded to Keep, append is an error.
 284
 285    COMMITTED
 286      The block has been written to Keep, its internal buffer has been
 287      released, fetching the block will fetch it via keep client (since we
 288      discarded the internal copy), and identifiers referring to the BufferBlock
 289      can be replaced with the block locator.
 290
 291    """
 292
 293    WRITABLE = 0
 294    PENDING = 1
 295    COMMITTED = 2
 296    ERROR = 3
 297    DELETED = 4
 298
 299    def __init__(self, blockid, starting_capacity, owner):
 300        """
 301        :blockid:
 302          the identifier for this block
 303
 304        :starting_capacity:
 305          the initial buffer capacity
 306
 307        :owner:
 308          ArvadosFile that owns this block
 309
 310        """
 311        self.blockid = blockid
 312        self.buffer_block = bytearray(starting_capacity)
 313        self.buffer_view = memoryview(self.buffer_block)
 314        self.write_pointer = 0
 315        self._state = _BufferBlock.WRITABLE
 316        self._locator = None
 317        self.owner = owner
 318        self.lock = threading.Lock()
 319        self.wait_for_commit = threading.Event()
 320        self.error = None
 321
 322    @synchronized
 323    def append(self, data):
 324        """Append some data to the buffer.
 325
 326        Only valid if the block is in WRITABLE state.  Implements an expanding
 327        buffer, doubling capacity as needed to accomdate all the data.
 328
 329        """
 330        if self._state == _BufferBlock.WRITABLE:
 331            if not isinstance(data, bytes) and not isinstance(data, memoryview):
 332                data = data.encode()
 333            while (self.write_pointer+len(data)) > len(self.buffer_block):
 334                new_buffer_block = bytearray(len(self.buffer_block) * 2)
 335                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
 336                self.buffer_block = new_buffer_block
 337                self.buffer_view = memoryview(self.buffer_block)
 338            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
 339            self.write_pointer += len(data)
 340            self._locator = None
 341        else:
 342            raise AssertionError("Buffer block is not writable")
 343
 344    STATE_TRANSITIONS = frozenset([
 345            (WRITABLE, PENDING),
 346            (PENDING, COMMITTED),
 347            (PENDING, ERROR),
 348            (ERROR, PENDING)])
 349
 350    @synchronized
 351    def set_state(self, nextstate, val=None):
 352        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
 353            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
 354        self._state = nextstate
 355
 356        if self._state == _BufferBlock.PENDING:
 357            self.wait_for_commit.clear()
 358
 359        if self._state == _BufferBlock.COMMITTED:
 360            self._locator = val
 361            self.buffer_view = None
 362            self.buffer_block = None
 363            self.wait_for_commit.set()
 364
 365        if self._state == _BufferBlock.ERROR:
 366            self.error = val
 367            self.wait_for_commit.set()
 368
 369    @synchronized
 370    def state(self):
 371        return self._state
 372
 373    def size(self):
 374        """The amount of data written to the buffer."""
 375        return self.write_pointer
 376
 377    @synchronized
 378    def locator(self):
 379        """The Keep locator for this buffer's contents."""
 380        if self._locator is None:
 381            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
 382        return self._locator
 383
 384    @synchronized
 385    def clone(self, new_blockid, owner):
 386        if self._state == _BufferBlock.COMMITTED:
 387            raise AssertionError("Cannot duplicate committed buffer block")
 388        bufferblock = _BufferBlock(new_blockid, self.size(), owner)
 389        bufferblock.append(self.buffer_view[0:self.size()])
 390        return bufferblock
 391
 392    @synchronized
 393    def clear(self):
 394        self._state = _BufferBlock.DELETED
 395        self.owner = None
 396        self.buffer_block = None
 397        self.buffer_view = None
 398
 399    @synchronized
 400    def repack_writes(self):
 401        """Optimize buffer block by repacking segments in file sequence.
 402
 403        When the client makes random writes, they appear in the buffer block in
 404        the sequence they were written rather than the sequence they appear in
 405        the file.  This makes for inefficient, fragmented manifests.  Attempt
 406        to optimize by repacking writes in file sequence.
 407
 408        """
 409        if self._state != _BufferBlock.WRITABLE:
 410            raise AssertionError("Cannot repack non-writable block")
 411
 412        segs = self.owner.segments()
 413
 414        # Collect the segments that reference the buffer block.
 415        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
 416
 417        # Collect total data referenced by segments (could be smaller than
 418        # bufferblock size if a portion of the file was written and
 419        # then overwritten).
 420        write_total = sum([s.range_size for s in bufferblock_segs])
 421
 422        if write_total < self.size() or len(bufferblock_segs) > 1:
 423            # If there's more than one segment referencing this block, it is
 424            # due to out-of-order writes and will produce a fragmented
 425            # manifest, so try to optimize by re-packing into a new buffer.
 426            contents = self.buffer_view[0:self.write_pointer].tobytes()
 427            new_bb = _BufferBlock(None, write_total, None)
 428            for t in bufferblock_segs:
 429                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
 430                t.segment_offset = new_bb.size() - t.range_size
 431
 432            self.buffer_block = new_bb.buffer_block
 433            self.buffer_view = new_bb.buffer_view
 434            self.write_pointer = new_bb.write_pointer
 435            self._locator = None
 436            new_bb.clear()
 437            self.owner.set_segments(segs)
 438
 439    def __repr__(self):
 440        return "<BufferBlock %s>" % (self.blockid)
 441
 442
 443class NoopLock(object):
 444    def __enter__(self):
 445        return self
 446
 447    def __exit__(self, exc_type, exc_value, traceback):
 448        pass
 449
 450    def acquire(self, blocking=False):
 451        pass
 452
 453    def release(self):
 454        pass
 455
 456
 457def must_be_writable(orig_func):
 458    @functools.wraps(orig_func)
 459    def must_be_writable_wrapper(self, *args, **kwargs):
 460        if not self.writable():
 461            raise IOError(errno.EROFS, "Collection is read-only.")
 462        return orig_func(self, *args, **kwargs)
 463    return must_be_writable_wrapper
 464
 465
 466class _BlockManager(object):
 467    """BlockManager handles buffer blocks.
 468
 469    Also handles background block uploads, and background block prefetch for a
 470    Collection of ArvadosFiles.
 471
 472    """
 473
 474    DEFAULT_PUT_THREADS = 2
 475
 476    def __init__(self, keep,
 477                 copies=None,
 478                 put_threads=None,
 479                 num_retries=None,
 480                 storage_classes_func=None):
 481        """keep: KeepClient object to use"""
 482        self._keep = keep
 483        self._bufferblocks = collections.OrderedDict()
 484        self._put_queue = None
 485        self._put_threads = None
 486        self.lock = threading.Lock()
 487        self.prefetch_lookahead = self._keep.num_prefetch_threads
 488        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
 489        self.copies = copies
 490        self.storage_classes = storage_classes_func or (lambda: [])
 491        self._pending_write_size = 0
 492        self.threads_lock = threading.Lock()
 493        self.padding_block = None
 494        self.num_retries = num_retries
 495
 496    @synchronized
 497    def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 498        """Allocate a new, empty bufferblock in WRITABLE state and return it.
 499
 500        :blockid:
 501          optional block identifier, otherwise one will be automatically assigned
 502
 503        :starting_capacity:
 504          optional capacity, otherwise will use default capacity
 505
 506        :owner:
 507          ArvadosFile that owns this block
 508
 509        """
 510        return self._alloc_bufferblock(blockid, starting_capacity, owner)
 511
 512    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 513        if blockid is None:
 514            blockid = str(uuid.uuid4())
 515        bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
 516        self._bufferblocks[bufferblock.blockid] = bufferblock
 517        return bufferblock
 518
 519    @synchronized
 520    def dup_block(self, block, owner):
 521        """Create a new bufferblock initialized with the content of an existing bufferblock.
 522
 523        :block:
 524          the buffer block to copy.
 525
 526        :owner:
 527          ArvadosFile that owns the new block
 528
 529        """
 530        new_blockid = str(uuid.uuid4())
 531        bufferblock = block.clone(new_blockid, owner)
 532        self._bufferblocks[bufferblock.blockid] = bufferblock
 533        return bufferblock
 534
 535    @synchronized
 536    def is_bufferblock(self, locator):
 537        return locator in self._bufferblocks
 538
 539    def _commit_bufferblock_worker(self):
 540        """Background uploader thread."""
 541
 542        while True:
 543            try:
 544                bufferblock = self._put_queue.get()
 545                if bufferblock is None:
 546                    return
 547
 548                if self.copies is None:
 549                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 550                else:
 551                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 552                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 553            except Exception as e:
 554                bufferblock.set_state(_BufferBlock.ERROR, e)
 555            finally:
 556                if self._put_queue is not None:
 557                    self._put_queue.task_done()
 558
 559    def start_put_threads(self):
 560        with self.threads_lock:
 561            if self._put_threads is None:
 562                # Start uploader threads.
 563
 564                # If we don't limit the Queue size, the upload queue can quickly
 565                # grow to take up gigabytes of RAM if the writing process is
 566                # generating data more quickly than it can be sent to the Keep
 567                # servers.
 568                #
 569                # With two upload threads and a queue size of 2, this means up to 4
 570                # blocks pending.  If they are full 64 MiB blocks, that means up to
 571                # 256 MiB of internal buffering, which is the same size as the
 572                # default download block cache in KeepClient.
 573                self._put_queue = queue.Queue(maxsize=2)
 574
 575                self._put_threads = []
 576                for i in range(0, self.num_put_threads):
 577                    thread = threading.Thread(target=self._commit_bufferblock_worker)
 578                    self._put_threads.append(thread)
 579                    thread.daemon = True
 580                    thread.start()
 581
 582    @synchronized
 583    def stop_threads(self):
 584        """Shut down and wait for background upload and download threads to finish."""
 585
 586        if self._put_threads is not None:
 587            for t in self._put_threads:
 588                self._put_queue.put(None)
 589            for t in self._put_threads:
 590                t.join()
 591        self._put_threads = None
 592        self._put_queue = None
 593
 594    def __enter__(self):
 595        return self
 596
 597    def __exit__(self, exc_type, exc_value, traceback):
 598        self.stop_threads()
 599
 600    @synchronized
 601    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
 602        """Packs small blocks together before uploading"""
 603
 604        self._pending_write_size += closed_file_size
 605
 606        # Check if there are enough small blocks for filling up one in full
 607        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
 608            return
 609
 610        # Search blocks ready for getting packed together before being
 611        # committed to Keep.
 612        # A WRITABLE block always has an owner.
 613        # A WRITABLE block with its owner.closed() implies that its
 614        # size is <= KEEP_BLOCK_SIZE/2.
 615        try:
 616            small_blocks = [b for b in self._bufferblocks.values()
 617                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
 618        except AttributeError:
 619            # Writable blocks without owner shouldn't exist.
 620            raise UnownedBlockError()
 621
 622        if len(small_blocks) <= 1:
 623            # Not enough small blocks for repacking
 624            return
 625
 626        for bb in small_blocks:
 627            bb.repack_writes()
 628
 629        # Update the pending write size count with its true value, just in case
 630        # some small file was opened, written and closed several times.
 631        self._pending_write_size = sum([b.size() for b in small_blocks])
 632
 633        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
 634            return
 635
 636        new_bb = self._alloc_bufferblock()
 637        new_bb.owner = []
 638        files = []
 639        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
 640            bb = small_blocks.pop(0)
 641            new_bb.owner.append(bb.owner)
 642            self._pending_write_size -= bb.size()
 643            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
 644            files.append((bb, new_bb.write_pointer - bb.size()))
 645
 646        self.commit_bufferblock(new_bb, sync=sync)
 647
 648        for bb, new_bb_segment_offset in files:
 649            newsegs = bb.owner.segments()
 650            for s in newsegs:
 651                if s.locator == bb.blockid:
 652                    s.locator = new_bb.blockid
 653                    s.segment_offset = new_bb_segment_offset+s.segment_offset
 654            bb.owner.set_segments(newsegs)
 655            self._delete_bufferblock(bb.blockid)
 656
 657    def commit_bufferblock(self, block, sync):
 658        """Initiate a background upload of a bufferblock.
 659
 660        :block:
 661          The block object to upload
 662
 663        :sync:
 664          If `sync` is True, upload the block synchronously.
 665          If `sync` is False, upload the block asynchronously.  This will
 666          return immediately unless the upload queue is at capacity, in
 667          which case it will wait on an upload queue slot.
 668
 669        """
 670        try:
 671            # Mark the block as PENDING so to disallow any more appends.
 672            block.set_state(_BufferBlock.PENDING)
 673        except StateChangeError as e:
 674            if e.state == _BufferBlock.PENDING:
 675                if sync:
 676                    block.wait_for_commit.wait()
 677                else:
 678                    return
 679            if block.state() == _BufferBlock.COMMITTED:
 680                return
 681            elif block.state() == _BufferBlock.ERROR:
 682                raise block.error
 683            else:
 684                raise
 685
 686        if sync:
 687            try:
 688                if self.copies is None:
 689                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
 690                else:
 691                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
 692                block.set_state(_BufferBlock.COMMITTED, loc)
 693            except Exception as e:
 694                block.set_state(_BufferBlock.ERROR, e)
 695                raise
 696        else:
 697            self.start_put_threads()
 698            self._put_queue.put(block)
 699
 700    @synchronized
 701    def get_bufferblock(self, locator):
 702        return self._bufferblocks.get(locator)
 703
 704    @synchronized
 705    def get_padding_block(self):
 706        """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
 707        when using truncate() to extend the size of a file.
 708
 709        For reference (and possible future optimization), the md5sum of the
 710        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
 711
 712        """
 713
 714        if self.padding_block is None:
 715            self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
 716            self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
 717            self.commit_bufferblock(self.padding_block, False)
 718        return self.padding_block
 719
 720    @synchronized
 721    def delete_bufferblock(self, locator):
 722        self._delete_bufferblock(locator)
 723
 724    def _delete_bufferblock(self, locator):
 725        if locator in self._bufferblocks:
 726            bb = self._bufferblocks[locator]
 727            bb.clear()
 728            del self._bufferblocks[locator]
 729
 730    def get_block_contents(self, locator, num_retries, cache_only=False):
 731        """Fetch a block.
 732
 733        First checks to see if the locator is a BufferBlock and return that, if
 734        not, passes the request through to KeepClient.get().
 735
 736        """
 737        with self.lock:
 738            if locator in self._bufferblocks:
 739                bufferblock = self._bufferblocks[locator]
 740                if bufferblock.state() != _BufferBlock.COMMITTED:
 741                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
 742                else:
 743                    locator = bufferblock._locator
 744        if cache_only:
 745            return self._keep.get_from_cache(locator)
 746        else:
 747            return self._keep.get(locator, num_retries=num_retries)
 748
 749    def commit_all(self):
 750        """Commit all outstanding buffer blocks.
 751
 752        This is a synchronous call, and will not return until all buffer blocks
 753        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 754
 755        """
 756        self.repack_small_blocks(force=True, sync=True)
 757
 758        with self.lock:
 759            items = list(self._bufferblocks.items())
 760
 761        for k,v in items:
 762            if v.state() != _BufferBlock.COMMITTED and v.owner:
 763                # Ignore blocks with a list of owners, as if they're not in COMMITTED
 764                # state, they're already being committed asynchronously.
 765                if isinstance(v.owner, ArvadosFile):
 766                    v.owner.flush(sync=False)
 767
 768        with self.lock:
 769            if self._put_queue is not None:
 770                self._put_queue.join()
 771
 772                err = []
 773                for k,v in items:
 774                    if v.state() == _BufferBlock.ERROR:
 775                        err.append((v.locator(), v.error))
 776                if err:
 777                    raise KeepWriteError("Error writing some blocks", err, label="block")
 778
 779        for k,v in items:
 780            # flush again with sync=True to remove committed bufferblocks from
 781            # the segments.
 782            if v.owner:
 783                if isinstance(v.owner, ArvadosFile):
 784                    v.owner.flush(sync=True)
 785                elif isinstance(v.owner, list) and len(v.owner) > 0:
 786                    # This bufferblock is referenced by many files as a result
 787                    # of repacking small blocks, so don't delete it when flushing
 788                    # its owners, just do it after flushing them all.
 789                    for owner in v.owner:
 790                        owner.flush(sync=True)
 791                    self.delete_bufferblock(k)
 792
 793        self.stop_threads()
 794
 795    def block_prefetch(self, locator):
 796        """Initiate a background download of a block.
 797        """
 798
 799        if not self.prefetch_lookahead:
 800            return
 801
 802        with self.lock:
 803            if locator in self._bufferblocks:
 804                return
 805
 806        self._keep.block_prefetch(locator)
 807
 808
 809class ArvadosFile(object):
 810    """Represent a file in a Collection.
 811
 812    ArvadosFile manages the underlying representation of a file in Keep as a
 813    sequence of segments spanning a set of blocks, and implements random
 814    read/write access.
 815
 816    This object may be accessed from multiple threads.
 817
 818    """
 819
 820    __slots__ = ('parent', 'name', '_writers', '_committed',
 821                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 822
 823    def __init__(self, parent, name, stream=[], segments=[]):
 824        """
 825        ArvadosFile constructor.
 826
 827        :stream:
 828          a list of Range objects representing a block stream
 829
 830        :segments:
 831          a list of Range objects representing segments
 832        """
 833        self.parent = parent
 834        self.name = name
 835        self._writers = set()
 836        self._committed = False
 837        self._segments = []
 838        self.lock = parent.root_collection().lock
 839        for s in segments:
 840            self._add_segment(stream, s.locator, s.range_size)
 841        self._current_bblock = None
 842        self._read_counter = 0
 843
 844    def writable(self):
 845        return self.parent.writable()
 846
 847    @synchronized
 848    def permission_expired(self, as_of_dt=None):
 849        """Returns True if any of the segment's locators is expired"""
 850        for r in self._segments:
 851            if KeepLocator(r.locator).permission_expired(as_of_dt):
 852                return True
 853        return False
 854
 855    @synchronized
 856    def has_remote_blocks(self):
 857        """Returns True if any of the segment's locators has a +R signature"""
 858
 859        for s in self._segments:
 860            if '+R' in s.locator:
 861                return True
 862        return False
 863
 864    @synchronized
 865    def _copy_remote_blocks(self, remote_blocks={}):
 866        """Ask Keep to copy remote blocks and point to their local copies.
 867
 868        This is called from the parent Collection.
 869
 870        :remote_blocks:
 871            Shared cache of remote to local block mappings. This is used to avoid
 872            doing extra work when blocks are shared by more than one file in
 873            different subdirectories.
 874        """
 875
 876        for s in self._segments:
 877            if '+R' in s.locator:
 878                try:
 879                    loc = remote_blocks[s.locator]
 880                except KeyError:
 881                    loc = self.parent._my_keep().refresh_signature(s.locator)
 882                    remote_blocks[s.locator] = loc
 883                s.locator = loc
 884                self.parent.set_committed(False)
 885        return remote_blocks
 886
 887    @synchronized
 888    def segments(self):
 889        return copy.copy(self._segments)
 890
 891    @synchronized
 892    def clone(self, new_parent, new_name):
 893        """Make a copy of this file."""
 894        cp = ArvadosFile(new_parent, new_name)
 895        cp.replace_contents(self)
 896        return cp
 897
 898    @must_be_writable
 899    @synchronized
 900    def replace_contents(self, other):
 901        """Replace segments of this file with segments from another `ArvadosFile` object."""
 902
 903        map_loc = {}
 904        self._segments = []
 905        for other_segment in other.segments():
 906            new_loc = other_segment.locator
 907            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 908                if other_segment.locator not in map_loc:
 909                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 910                    if bufferblock.state() != _BufferBlock.WRITABLE:
 911                        map_loc[other_segment.locator] = bufferblock.locator()
 912                    else:
 913                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 914                new_loc = map_loc[other_segment.locator]
 915
 916            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 917
 918        self.set_committed(False)
 919
 920    def __eq__(self, other):
 921        if other is self:
 922            return True
 923        if not isinstance(other, ArvadosFile):
 924            return False
 925
 926        othersegs = other.segments()
 927        with self.lock:
 928            if len(self._segments) != len(othersegs):
 929                return False
 930            for i in range(0, len(othersegs)):
 931                seg1 = self._segments[i]
 932                seg2 = othersegs[i]
 933                loc1 = seg1.locator
 934                loc2 = seg2.locator
 935
 936                if self.parent._my_block_manager().is_bufferblock(loc1):
 937                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 938
 939                if other.parent._my_block_manager().is_bufferblock(loc2):
 940                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 941
 942                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 943                    seg1.range_start != seg2.range_start or
 944                    seg1.range_size != seg2.range_size or
 945                    seg1.segment_offset != seg2.segment_offset):
 946                    return False
 947
 948        return True
 949
 950    def __ne__(self, other):
 951        return not self.__eq__(other)
 952
 953    @synchronized
 954    def set_segments(self, segs):
 955        self._segments = segs
 956
 957    @synchronized
 958    def set_committed(self, value=True):
 959        """Set committed flag.
 960
 961        If value is True, set committed to be True.
 962
 963        If value is False, set committed to be False for this and all parents.
 964        """
 965        if value == self._committed:
 966            return
 967        self._committed = value
 968        if self._committed is False and self.parent is not None:
 969            self.parent.set_committed(False)
 970
 971    @synchronized
 972    def committed(self):
 973        """Get whether this is committed or not."""
 974        return self._committed
 975
 976    @synchronized
 977    def add_writer(self, writer):
 978        """Add an ArvadosFileWriter reference to the list of writers"""
 979        if isinstance(writer, ArvadosFileWriter):
 980            self._writers.add(writer)
 981
 982    @synchronized
 983    def remove_writer(self, writer, flush):
 984        """
 985        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 986        and do some block maintenance tasks.
 987        """
 988        self._writers.remove(writer)
 989
 990        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 991            # File writer closed, not small enough for repacking
 992            self.flush()
 993        elif self.closed():
 994            # All writers closed and size is adequate for repacking
 995            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 996
 997    def closed(self):
 998        """
 999        Get whether this is closed or not. When the writers list is empty, the file
1000        is supposed to be closed.
1001        """
1002        return len(self._writers) == 0
1003
1004    @must_be_writable
1005    @synchronized
1006    def truncate(self, size):
1007        """Shrink or expand the size of the file.
1008
1009        If `size` is less than the size of the file, the file contents after
1010        `size` will be discarded.  If `size` is greater than the current size
1011        of the file, it will be filled with zero bytes.
1012
1013        """
1014        if size < self.size():
1015            new_segs = []
1016            for r in self._segments:
1017                range_end = r.range_start+r.range_size
1018                if r.range_start >= size:
1019                    # segment is past the trucate size, all done
1020                    break
1021                elif size < range_end:
1022                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1023                    nr.segment_offset = r.segment_offset
1024                    new_segs.append(nr)
1025                    break
1026                else:
1027                    new_segs.append(r)
1028
1029            self._segments = new_segs
1030            self.set_committed(False)
1031        elif size > self.size():
1032            padding = self.parent._my_block_manager().get_padding_block()
1033            diff = size - self.size()
1034            while diff > config.KEEP_BLOCK_SIZE:
1035                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1036                diff -= config.KEEP_BLOCK_SIZE
1037            if diff > 0:
1038                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1039            self.set_committed(False)
1040        else:
1041            # size == self.size()
1042            pass
1043
1044    def readfrom(self, offset, size, num_retries, exact=False):
1045        """Read up to `size` bytes from the file starting at `offset`.
1046
1047        :exact:
1048         If False (default), return less data than requested if the read
1049         crosses a block boundary and the next block isn't cached.  If True,
1050         only return less data than requested when hitting EOF.
1051        """
1052
1053        with self.lock:
1054            if size == 0 or offset >= self.size():
1055                return b''
1056            readsegs = locators_and_ranges(self._segments, offset, size)
1057
1058            prefetch = None
1059            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1060            if prefetch_lookahead:
1061                # Doing prefetch on every read() call is surprisingly expensive
1062                # when we're trying to deliver data at 600+ MiBps and want
1063                # the read() fast path to be as lightweight as possible.
1064                #
1065                # Only prefetching every 128 read operations
1066                # dramatically reduces the overhead while still
1067                # getting the benefit of prefetching (e.g. when
1068                # reading 128 KiB at a time, it checks for prefetch
1069                # every 16 MiB).
1070                self._read_counter = (self._read_counter+1) % 128
1071                if self._read_counter == 1:
1072                    prefetch = locators_and_ranges(self._segments,
1073                                                   offset + size,
1074                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1075                                                   limit=(1+prefetch_lookahead))
1076
1077        locs = set()
1078        data = []
1079        for lr in readsegs:
1080            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1081            if block:
1082                blockview = memoryview(block)
1083                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1084                locs.add(lr.locator)
1085            else:
1086                break
1087
1088        if prefetch:
1089            for lr in prefetch:
1090                if lr.locator not in locs:
1091                    self.parent._my_block_manager().block_prefetch(lr.locator)
1092                    locs.add(lr.locator)
1093
1094        if len(data) == 1:
1095            return data[0]
1096        else:
1097            return b''.join(data)
1098
1099    @must_be_writable
1100    @synchronized
1101    def writeto(self, offset, data, num_retries):
1102        """Write `data` to the file starting at `offset`.
1103
1104        This will update existing bytes and/or extend the size of the file as
1105        necessary.
1106
1107        """
1108        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1109            data = data.encode()
1110        if len(data) == 0:
1111            return
1112
1113        if offset > self.size():
1114            self.truncate(offset)
1115
1116        if len(data) > config.KEEP_BLOCK_SIZE:
1117            # Chunk it up into smaller writes
1118            n = 0
1119            dataview = memoryview(data)
1120            while n < len(data):
1121                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1122                n += config.KEEP_BLOCK_SIZE
1123            return
1124
1125        self.set_committed(False)
1126
1127        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1128            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1129
1130        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1131            self._current_bblock.repack_writes()
1132            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1133                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1134                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1135
1136        self._current_bblock.append(data)
1137
1138        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1139
1140        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1141
1142        return len(data)
1143
1144    @synchronized
1145    def flush(self, sync=True, num_retries=0):
1146        """Flush the current bufferblock to Keep.
1147
1148        :sync:
1149          If True, commit block synchronously, wait until buffer block has been written.
1150          If False, commit block asynchronously, return immediately after putting block into
1151          the keep put queue.
1152        """
1153        if self.committed():
1154            return
1155
1156        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1157            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1158                self._current_bblock.repack_writes()
1159            if self._current_bblock.state() != _BufferBlock.DELETED:
1160                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1161
1162        if sync:
1163            to_delete = set()
1164            for s in self._segments:
1165                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1166                if bb:
1167                    if bb.state() != _BufferBlock.COMMITTED:
1168                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1169                    to_delete.add(s.locator)
1170                    s.locator = bb.locator()
1171            for s in to_delete:
1172                # Don't delete the bufferblock if it's owned by many files. It'll be
1173                # deleted after all of its owners are flush()ed.
1174                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1175                    self.parent._my_block_manager().delete_bufferblock(s)
1176
1177        self.parent.notify(MOD, self.parent, self.name, (self, self))
1178
1179    @must_be_writable
1180    @synchronized
1181    def add_segment(self, blocks, pos, size):
1182        """Add a segment to the end of the file.
1183
1184        `pos` and `offset` reference a section of the stream described by
1185        `blocks` (a list of Range objects)
1186
1187        """
1188        self._add_segment(blocks, pos, size)
1189
1190    def _add_segment(self, blocks, pos, size):
1191        """Internal implementation of add_segment."""
1192        self.set_committed(False)
1193        for lr in locators_and_ranges(blocks, pos, size):
1194            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1195            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1196            self._segments.append(r)
1197
1198    @synchronized
1199    def size(self):
1200        """Get the file size."""
1201        if self._segments:
1202            n = self._segments[-1]
1203            return n.range_start + n.range_size
1204        else:
1205            return 0
1206
1207    @synchronized
1208    def manifest_text(self, stream_name=".", portable_locators=False,
1209                      normalize=False, only_committed=False):
1210        buf = ""
1211        filestream = []
1212        for segment in self._segments:
1213            loc = segment.locator
1214            if self.parent._my_block_manager().is_bufferblock(loc):
1215                if only_committed:
1216                    continue
1217                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1218            if portable_locators:
1219                loc = KeepLocator(loc).stripped()
1220            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1221                                 segment.segment_offset, segment.range_size))
1222        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1223        buf += "\n"
1224        return buf
1225
1226    @must_be_writable
1227    @synchronized
1228    def _reparent(self, newparent, newname):
1229        self.set_committed(False)
1230        self.flush(sync=True)
1231        self.parent.remove(self.name)
1232        self.parent = newparent
1233        self.name = newname
1234        self.lock = self.parent.root_collection().lock
1235
1236
1237class ArvadosFileReader(ArvadosFileReaderBase):
1238    """Wraps ArvadosFile in a file-like object supporting reading only.
1239
1240    Be aware that this class is NOT thread safe as there is no locking around
1241    updating file pointer.
1242
1243    """
1244
1245    def __init__(self, arvadosfile, mode="r", num_retries=None):
1246        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1247        self.arvadosfile = arvadosfile
1248
1249    def size(self):
1250        return self.arvadosfile.size()
1251
1252    def stream_name(self):
1253        return self.arvadosfile.parent.stream_name()
1254
1255    def readinto(self, b):
1256        data = self.read(len(b))
1257        b[:len(data)] = data
1258        return len(data)
1259
1260    @_FileLikeObjectBase._before_close
1261    @retry_method
1262    def read(self, size=None, num_retries=None):
1263        """Read up to `size` bytes from the file and return the result.
1264
1265        Starts at the current file position.  If `size` is None, read the
1266        entire remainder of the file.
1267        """
1268        if size is None:
1269            data = []
1270            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1271            while rd:
1272                data.append(rd)
1273                self._filepos += len(rd)
1274                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1275            return b''.join(data)
1276        else:
1277            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1278            self._filepos += len(data)
1279            return data
1280
1281    @_FileLikeObjectBase._before_close
1282    @retry_method
1283    def readfrom(self, offset, size, num_retries=None):
1284        """Read up to `size` bytes from the stream, starting at the specified file offset.
1285
1286        This method does not change the file position.
1287        """
1288        return self.arvadosfile.readfrom(offset, size, num_retries)
1289
1290    def flush(self):
1291        pass
1292
1293
1294class ArvadosFileWriter(ArvadosFileReader):
1295    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1296
1297    Be aware that this class is NOT thread safe as there is no locking around
1298    updating file pointer.
1299
1300    """
1301
1302    def __init__(self, arvadosfile, mode, num_retries=None):
1303        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1304        self.arvadosfile.add_writer(self)
1305
1306    def writable(self):
1307        return True
1308
1309    @_FileLikeObjectBase._before_close
1310    @retry_method
1311    def write(self, data, num_retries=None):
1312        if self.mode[0] == "a":
1313            self._filepos = self.size()
1314        self.arvadosfile.writeto(self._filepos, data, num_retries)
1315        self._filepos += len(data)
1316        return len(data)
1317
1318    @_FileLikeObjectBase._before_close
1319    @retry_method
1320    def writelines(self, seq, num_retries=None):
1321        for s in seq:
1322            self.write(s, num_retries=num_retries)
1323
1324    @_FileLikeObjectBase._before_close
1325    def truncate(self, size=None):
1326        if size is None:
1327            size = self._filepos
1328        self.arvadosfile.truncate(size)
1329
1330    @_FileLikeObjectBase._before_close
1331    def flush(self):
1332        self.arvadosfile.flush()
1333
1334    def close(self, flush=True):
1335        if not self.closed:
1336            self.arvadosfile.remove_writer(self, flush)
1337            super(ArvadosFileWriter, self).close()
1338
1339
1340class WrappableFile(object):
1341    """An interface to an Arvados file that's compatible with io wrappers.
1342
1343    """
1344    def __init__(self, f):
1345        self.f = f
1346        self.closed = False
1347    def close(self):
1348        self.closed = True
1349        return self.f.close()
1350    def flush(self):
1351        return self.f.flush()
1352    def read(self, *args, **kwargs):
1353        return self.f.read(*args, **kwargs)
1354    def readable(self):
1355        return self.f.readable()
1356    def readinto(self, *args, **kwargs):
1357        return self.f.readinto(*args, **kwargs)
1358    def seek(self, *args, **kwargs):
1359        return self.f.seek(*args, **kwargs)
1360    def seekable(self):
1361        return self.f.seekable()
1362    def tell(self):
1363        return self.f.tell()
1364    def writable(self):
1365        return self.f.writable()
1366    def write(self, *args, **kwargs):
1367        return self.f.write(*args, **kwargs)
MOD = 'mod'
WRITE = 'write'
def split(path):
33def split(path):
34    """split(path) -> streamname, filename
35
36    Separate the stream name and file name in a /-separated stream path and
37    return a tuple (stream_name, file_name).  If no stream name is available,
38    assume '.'.
39
40    """
41    try:
42        stream_name, file_name = path.rsplit('/', 1)
43    except ValueError:  # No / in string
44        stream_name, file_name = '.', path
45    return stream_name, file_name

split(path) -> streamname, filename

Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.

class UnownedBlockError(builtins.Exception):
48class UnownedBlockError(Exception):
49    """Raised when there's an writable block without an owner on the BlockManager."""
50    pass

Raised when there’s an writable block without an owner on the BlockManager.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ArvadosFileReaderBase(_FileLikeObjectBase):
 82class ArvadosFileReaderBase(_FileLikeObjectBase):
 83    def __init__(self, name, mode, num_retries=None):
 84        super(ArvadosFileReaderBase, self).__init__(name, mode)
 85        self._filepos = 0
 86        self.num_retries = num_retries
 87        self._readline_cache = (None, None)
 88
 89    def __iter__(self):
 90        while True:
 91            data = self.readline()
 92            if not data:
 93                break
 94            yield data
 95
 96    def decompressed_name(self):
 97        return re.sub(r'\.(bz2|gz)$', '', self.name)
 98
 99    @_FileLikeObjectBase._before_close
100    def seek(self, pos, whence=os.SEEK_SET):
101        if whence == os.SEEK_CUR:
102            pos += self._filepos
103        elif whence == os.SEEK_END:
104            pos += self.size()
105        if pos < 0:
106            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
107        self._filepos = pos
108        return self._filepos
109
110    def tell(self):
111        return self._filepos
112
113    def readable(self):
114        return True
115
116    def writable(self):
117        return False
118
119    def seekable(self):
120        return True
121
122    @_FileLikeObjectBase._before_close
123    @retry_method
124    def readall(self, size=2**20, num_retries=None):
125        while True:
126            data = self.read(size, num_retries=num_retries)
127            if len(data) == 0:
128                break
129            yield data
130
131    @_FileLikeObjectBase._before_close
132    @retry_method
133    def readline(self, size=float('inf'), num_retries=None):
134        cache_pos, cache_data = self._readline_cache
135        if self.tell() == cache_pos:
136            data = [cache_data]
137            self._filepos += len(cache_data)
138        else:
139            data = [b'']
140        data_size = len(data[-1])
141        while (data_size < size) and (b'\n' not in data[-1]):
142            next_read = self.read(2 ** 20, num_retries=num_retries)
143            if not next_read:
144                break
145            data.append(next_read)
146            data_size += len(next_read)
147        data = b''.join(data)
148        try:
149            nextline_index = data.index(b'\n') + 1
150        except ValueError:
151            nextline_index = len(data)
152        nextline_index = min(nextline_index, size)
153        self._filepos -= len(data) - nextline_index
154        self._readline_cache = (self.tell(), data[nextline_index:])
155        return data[:nextline_index].decode()
156
157    @_FileLikeObjectBase._before_close
158    @retry_method
159    def decompress(self, decompress, size, num_retries=None):
160        for segment in self.readall(size, num_retries=num_retries):
161            data = decompress(segment)
162            if data:
163                yield data
164
165    @_FileLikeObjectBase._before_close
166    @retry_method
167    def readall_decompressed(self, size=2**20, num_retries=None):
168        self.seek(0)
169        if self.name.endswith('.bz2'):
170            dc = bz2.BZ2Decompressor()
171            return self.decompress(dc.decompress, size,
172                                   num_retries=num_retries)
173        elif self.name.endswith('.gz'):
174            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
175            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
176                                   size, num_retries=num_retries)
177        else:
178            return self.readall(size, num_retries=num_retries)
179
180    @_FileLikeObjectBase._before_close
181    @retry_method
182    def readlines(self, sizehint=float('inf'), num_retries=None):
183        data = []
184        data_size = 0
185        for s in self.readall(num_retries=num_retries):
186            data.append(s)
187            data_size += len(s)
188            if data_size >= sizehint:
189                break
190        return b''.join(data).decode().splitlines(True)
191
192    def size(self):
193        raise IOError(errno.ENOSYS, "Not implemented")
194
195    def read(self, size, num_retries=None):
196        raise IOError(errno.ENOSYS, "Not implemented")
197
198    def readfrom(self, start, size, num_retries=None):
199        raise IOError(errno.ENOSYS, "Not implemented")
ArvadosFileReaderBase(name, mode, num_retries=None)
83    def __init__(self, name, mode, num_retries=None):
84        super(ArvadosFileReaderBase, self).__init__(name, mode)
85        self._filepos = 0
86        self.num_retries = num_retries
87        self._readline_cache = (None, None)
num_retries
def decompressed_name(self):
96    def decompressed_name(self):
97        return re.sub(r'\.(bz2|gz)$', '', self.name)
def seek(self, pos, whence=0):
 99    @_FileLikeObjectBase._before_close
100    def seek(self, pos, whence=os.SEEK_SET):
101        if whence == os.SEEK_CUR:
102            pos += self._filepos
103        elif whence == os.SEEK_END:
104            pos += self.size()
105        if pos < 0:
106            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
107        self._filepos = pos
108        return self._filepos
def tell(self):
110    def tell(self):
111        return self._filepos
def readable(self):
113    def readable(self):
114        return True
def writable(self):
116    def writable(self):
117        return False
def seekable(self):
119    def seekable(self):
120        return True
@retry_method
def readall(self, size=1048576, num_retries=None):
122    @_FileLikeObjectBase._before_close
123    @retry_method
124    def readall(self, size=2**20, num_retries=None):
125        while True:
126            data = self.read(size, num_retries=num_retries)
127            if len(data) == 0:
128                break
129            yield data
@retry_method
def readline(self, size=inf, num_retries=None):
131    @_FileLikeObjectBase._before_close
132    @retry_method
133    def readline(self, size=float('inf'), num_retries=None):
134        cache_pos, cache_data = self._readline_cache
135        if self.tell() == cache_pos:
136            data = [cache_data]
137            self._filepos += len(cache_data)
138        else:
139            data = [b'']
140        data_size = len(data[-1])
141        while (data_size < size) and (b'\n' not in data[-1]):
142            next_read = self.read(2 ** 20, num_retries=num_retries)
143            if not next_read:
144                break
145            data.append(next_read)
146            data_size += len(next_read)
147        data = b''.join(data)
148        try:
149            nextline_index = data.index(b'\n') + 1
150        except ValueError:
151            nextline_index = len(data)
152        nextline_index = min(nextline_index, size)
153        self._filepos -= len(data) - nextline_index
154        self._readline_cache = (self.tell(), data[nextline_index:])
155        return data[:nextline_index].decode()
@retry_method
def decompress(self, decompress, size, num_retries=None):
157    @_FileLikeObjectBase._before_close
158    @retry_method
159    def decompress(self, decompress, size, num_retries=None):
160        for segment in self.readall(size, num_retries=num_retries):
161            data = decompress(segment)
162            if data:
163                yield data
@retry_method
def readall_decompressed(self, size=1048576, num_retries=None):
165    @_FileLikeObjectBase._before_close
166    @retry_method
167    def readall_decompressed(self, size=2**20, num_retries=None):
168        self.seek(0)
169        if self.name.endswith('.bz2'):
170            dc = bz2.BZ2Decompressor()
171            return self.decompress(dc.decompress, size,
172                                   num_retries=num_retries)
173        elif self.name.endswith('.gz'):
174            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
175            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
176                                   size, num_retries=num_retries)
177        else:
178            return self.readall(size, num_retries=num_retries)
@retry_method
def readlines(self, sizehint=inf, num_retries=None):
180    @_FileLikeObjectBase._before_close
181    @retry_method
182    def readlines(self, sizehint=float('inf'), num_retries=None):
183        data = []
184        data_size = 0
185        for s in self.readall(num_retries=num_retries):
186            data.append(s)
187            data_size += len(s)
188            if data_size >= sizehint:
189                break
190        return b''.join(data).decode().splitlines(True)
def size(self):
192    def size(self):
193        raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
195    def read(self, size, num_retries=None):
196        raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
198    def readfrom(self, start, size, num_retries=None):
199        raise IOError(errno.ENOSYS, "Not implemented")
class StreamFileReader(ArvadosFileReaderBase):
202class StreamFileReader(ArvadosFileReaderBase):
203    class _NameAttribute(str):
204        # The Python file API provides a plain .name attribute.
205        # Older SDK provided a name() method.
206        # This class provides both, for maximum compatibility.
207        def __call__(self):
208            return self
209
210    def __init__(self, stream, segments, name):
211        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
212        self._stream = stream
213        self.segments = segments
214
215    def stream_name(self):
216        return self._stream.name()
217
218    def size(self):
219        n = self.segments[-1]
220        return n.range_start + n.range_size
221
222    @_FileLikeObjectBase._before_close
223    @retry_method
224    def read(self, size, num_retries=None):
225        """Read up to 'size' bytes from the stream, starting at the current file position"""
226        if size == 0:
227            return b''
228
229        data = b''
230        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
231        if available_chunks:
232            lr = available_chunks[0]
233            data = self._stream.readfrom(lr.locator+lr.segment_offset,
234                                         lr.segment_size,
235                                         num_retries=num_retries)
236
237        self._filepos += len(data)
238        return data
239
240    @_FileLikeObjectBase._before_close
241    @retry_method
242    def readfrom(self, start, size, num_retries=None):
243        """Read up to 'size' bytes from the stream, starting at 'start'"""
244        if size == 0:
245            return b''
246
247        data = []
248        for lr in locators_and_ranges(self.segments, start, size):
249            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
250                                              num_retries=num_retries))
251        return b''.join(data)
252
253    def as_manifest(self):
254        segs = []
255        for r in self.segments:
256            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
257        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
StreamFileReader(stream, segments, name)
210    def __init__(self, stream, segments, name):
211        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
212        self._stream = stream
213        self.segments = segments
segments
def stream_name(self):
215    def stream_name(self):
216        return self._stream.name()
def size(self):
218    def size(self):
219        n = self.segments[-1]
220        return n.range_start + n.range_size
@retry_method
def read(self, size, num_retries=None):
222    @_FileLikeObjectBase._before_close
223    @retry_method
224    def read(self, size, num_retries=None):
225        """Read up to 'size' bytes from the stream, starting at the current file position"""
226        if size == 0:
227            return b''
228
229        data = b''
230        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
231        if available_chunks:
232            lr = available_chunks[0]
233            data = self._stream.readfrom(lr.locator+lr.segment_offset,
234                                         lr.segment_size,
235                                         num_retries=num_retries)
236
237        self._filepos += len(data)
238        return data

Read up to ‘size’ bytes from the stream, starting at the current file position

@retry_method
def readfrom(self, start, size, num_retries=None):
240    @_FileLikeObjectBase._before_close
241    @retry_method
242    def readfrom(self, start, size, num_retries=None):
243        """Read up to 'size' bytes from the stream, starting at 'start'"""
244        if size == 0:
245            return b''
246
247        data = []
248        for lr in locators_and_ranges(self.segments, start, size):
249            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
250                                              num_retries=num_retries))
251        return b''.join(data)

Read up to ‘size’ bytes from the stream, starting at ‘start’

def as_manifest(self):
253    def as_manifest(self):
254        segs = []
255        for r in self.segments:
256            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
257        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
def synchronized(orig_func):
260def synchronized(orig_func):
261    @functools.wraps(orig_func)
262    def synchronized_wrapper(self, *args, **kwargs):
263        with self.lock:
264            return orig_func(self, *args, **kwargs)
265    return synchronized_wrapper
class StateChangeError(builtins.Exception):
268class StateChangeError(Exception):
269    def __init__(self, message, state, nextstate):
270        super(StateChangeError, self).__init__(message)
271        self.state = state
272        self.nextstate = nextstate

Common base class for all non-exit exceptions.

StateChangeError(message, state, nextstate)
269    def __init__(self, message, state, nextstate):
270        super(StateChangeError, self).__init__(message)
271        self.state = state
272        self.nextstate = nextstate
state
nextstate
Inherited Members
builtins.BaseException
with_traceback
args
class NoopLock:
444class NoopLock(object):
445    def __enter__(self):
446        return self
447
448    def __exit__(self, exc_type, exc_value, traceback):
449        pass
450
451    def acquire(self, blocking=False):
452        pass
453
454    def release(self):
455        pass
def acquire(self, blocking=False):
451    def acquire(self, blocking=False):
452        pass
def release(self):
454    def release(self):
455        pass
def must_be_writable(orig_func):
458def must_be_writable(orig_func):
459    @functools.wraps(orig_func)
460    def must_be_writable_wrapper(self, *args, **kwargs):
461        if not self.writable():
462            raise IOError(errno.EROFS, "Collection is read-only.")
463        return orig_func(self, *args, **kwargs)
464    return must_be_writable_wrapper
class ArvadosFile:
 810class ArvadosFile(object):
 811    """Represent a file in a Collection.
 812
 813    ArvadosFile manages the underlying representation of a file in Keep as a
 814    sequence of segments spanning a set of blocks, and implements random
 815    read/write access.
 816
 817    This object may be accessed from multiple threads.
 818
 819    """
 820
 821    __slots__ = ('parent', 'name', '_writers', '_committed',
 822                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 823
 824    def __init__(self, parent, name, stream=[], segments=[]):
 825        """
 826        ArvadosFile constructor.
 827
 828        :stream:
 829          a list of Range objects representing a block stream
 830
 831        :segments:
 832          a list of Range objects representing segments
 833        """
 834        self.parent = parent
 835        self.name = name
 836        self._writers = set()
 837        self._committed = False
 838        self._segments = []
 839        self.lock = parent.root_collection().lock
 840        for s in segments:
 841            self._add_segment(stream, s.locator, s.range_size)
 842        self._current_bblock = None
 843        self._read_counter = 0
 844
 845    def writable(self):
 846        return self.parent.writable()
 847
 848    @synchronized
 849    def permission_expired(self, as_of_dt=None):
 850        """Returns True if any of the segment's locators is expired"""
 851        for r in self._segments:
 852            if KeepLocator(r.locator).permission_expired(as_of_dt):
 853                return True
 854        return False
 855
 856    @synchronized
 857    def has_remote_blocks(self):
 858        """Returns True if any of the segment's locators has a +R signature"""
 859
 860        for s in self._segments:
 861            if '+R' in s.locator:
 862                return True
 863        return False
 864
 865    @synchronized
 866    def _copy_remote_blocks(self, remote_blocks={}):
 867        """Ask Keep to copy remote blocks and point to their local copies.
 868
 869        This is called from the parent Collection.
 870
 871        :remote_blocks:
 872            Shared cache of remote to local block mappings. This is used to avoid
 873            doing extra work when blocks are shared by more than one file in
 874            different subdirectories.
 875        """
 876
 877        for s in self._segments:
 878            if '+R' in s.locator:
 879                try:
 880                    loc = remote_blocks[s.locator]
 881                except KeyError:
 882                    loc = self.parent._my_keep().refresh_signature(s.locator)
 883                    remote_blocks[s.locator] = loc
 884                s.locator = loc
 885                self.parent.set_committed(False)
 886        return remote_blocks
 887
 888    @synchronized
 889    def segments(self):
 890        return copy.copy(self._segments)
 891
 892    @synchronized
 893    def clone(self, new_parent, new_name):
 894        """Make a copy of this file."""
 895        cp = ArvadosFile(new_parent, new_name)
 896        cp.replace_contents(self)
 897        return cp
 898
 899    @must_be_writable
 900    @synchronized
 901    def replace_contents(self, other):
 902        """Replace segments of this file with segments from another `ArvadosFile` object."""
 903
 904        map_loc = {}
 905        self._segments = []
 906        for other_segment in other.segments():
 907            new_loc = other_segment.locator
 908            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
 909                if other_segment.locator not in map_loc:
 910                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
 911                    if bufferblock.state() != _BufferBlock.WRITABLE:
 912                        map_loc[other_segment.locator] = bufferblock.locator()
 913                    else:
 914                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
 915                new_loc = map_loc[other_segment.locator]
 916
 917            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 918
 919        self.set_committed(False)
 920
 921    def __eq__(self, other):
 922        if other is self:
 923            return True
 924        if not isinstance(other, ArvadosFile):
 925            return False
 926
 927        othersegs = other.segments()
 928        with self.lock:
 929            if len(self._segments) != len(othersegs):
 930                return False
 931            for i in range(0, len(othersegs)):
 932                seg1 = self._segments[i]
 933                seg2 = othersegs[i]
 934                loc1 = seg1.locator
 935                loc2 = seg2.locator
 936
 937                if self.parent._my_block_manager().is_bufferblock(loc1):
 938                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
 939
 940                if other.parent._my_block_manager().is_bufferblock(loc2):
 941                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
 942
 943                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
 944                    seg1.range_start != seg2.range_start or
 945                    seg1.range_size != seg2.range_size or
 946                    seg1.segment_offset != seg2.segment_offset):
 947                    return False
 948
 949        return True
 950
 951    def __ne__(self, other):
 952        return not self.__eq__(other)
 953
 954    @synchronized
 955    def set_segments(self, segs):
 956        self._segments = segs
 957
 958    @synchronized
 959    def set_committed(self, value=True):
 960        """Set committed flag.
 961
 962        If value is True, set committed to be True.
 963
 964        If value is False, set committed to be False for this and all parents.
 965        """
 966        if value == self._committed:
 967            return
 968        self._committed = value
 969        if self._committed is False and self.parent is not None:
 970            self.parent.set_committed(False)
 971
 972    @synchronized
 973    def committed(self):
 974        """Get whether this is committed or not."""
 975        return self._committed
 976
 977    @synchronized
 978    def add_writer(self, writer):
 979        """Add an ArvadosFileWriter reference to the list of writers"""
 980        if isinstance(writer, ArvadosFileWriter):
 981            self._writers.add(writer)
 982
 983    @synchronized
 984    def remove_writer(self, writer, flush):
 985        """
 986        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
 987        and do some block maintenance tasks.
 988        """
 989        self._writers.remove(writer)
 990
 991        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
 992            # File writer closed, not small enough for repacking
 993            self.flush()
 994        elif self.closed():
 995            # All writers closed and size is adequate for repacking
 996            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 997
 998    def closed(self):
 999        """
1000        Get whether this is closed or not. When the writers list is empty, the file
1001        is supposed to be closed.
1002        """
1003        return len(self._writers) == 0
1004
1005    @must_be_writable
1006    @synchronized
1007    def truncate(self, size):
1008        """Shrink or expand the size of the file.
1009
1010        If `size` is less than the size of the file, the file contents after
1011        `size` will be discarded.  If `size` is greater than the current size
1012        of the file, it will be filled with zero bytes.
1013
1014        """
1015        if size < self.size():
1016            new_segs = []
1017            for r in self._segments:
1018                range_end = r.range_start+r.range_size
1019                if r.range_start >= size:
1020                    # segment is past the trucate size, all done
1021                    break
1022                elif size < range_end:
1023                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1024                    nr.segment_offset = r.segment_offset
1025                    new_segs.append(nr)
1026                    break
1027                else:
1028                    new_segs.append(r)
1029
1030            self._segments = new_segs
1031            self.set_committed(False)
1032        elif size > self.size():
1033            padding = self.parent._my_block_manager().get_padding_block()
1034            diff = size - self.size()
1035            while diff > config.KEEP_BLOCK_SIZE:
1036                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1037                diff -= config.KEEP_BLOCK_SIZE
1038            if diff > 0:
1039                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1040            self.set_committed(False)
1041        else:
1042            # size == self.size()
1043            pass
1044
1045    def readfrom(self, offset, size, num_retries, exact=False):
1046        """Read up to `size` bytes from the file starting at `offset`.
1047
1048        :exact:
1049         If False (default), return less data than requested if the read
1050         crosses a block boundary and the next block isn't cached.  If True,
1051         only return less data than requested when hitting EOF.
1052        """
1053
1054        with self.lock:
1055            if size == 0 or offset >= self.size():
1056                return b''
1057            readsegs = locators_and_ranges(self._segments, offset, size)
1058
1059            prefetch = None
1060            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1061            if prefetch_lookahead:
1062                # Doing prefetch on every read() call is surprisingly expensive
1063                # when we're trying to deliver data at 600+ MiBps and want
1064                # the read() fast path to be as lightweight as possible.
1065                #
1066                # Only prefetching every 128 read operations
1067                # dramatically reduces the overhead while still
1068                # getting the benefit of prefetching (e.g. when
1069                # reading 128 KiB at a time, it checks for prefetch
1070                # every 16 MiB).
1071                self._read_counter = (self._read_counter+1) % 128
1072                if self._read_counter == 1:
1073                    prefetch = locators_and_ranges(self._segments,
1074                                                   offset + size,
1075                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1076                                                   limit=(1+prefetch_lookahead))
1077
1078        locs = set()
1079        data = []
1080        for lr in readsegs:
1081            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1082            if block:
1083                blockview = memoryview(block)
1084                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1085                locs.add(lr.locator)
1086            else:
1087                break
1088
1089        if prefetch:
1090            for lr in prefetch:
1091                if lr.locator not in locs:
1092                    self.parent._my_block_manager().block_prefetch(lr.locator)
1093                    locs.add(lr.locator)
1094
1095        if len(data) == 1:
1096            return data[0]
1097        else:
1098            return b''.join(data)
1099
1100    @must_be_writable
1101    @synchronized
1102    def writeto(self, offset, data, num_retries):
1103        """Write `data` to the file starting at `offset`.
1104
1105        This will update existing bytes and/or extend the size of the file as
1106        necessary.
1107
1108        """
1109        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1110            data = data.encode()
1111        if len(data) == 0:
1112            return
1113
1114        if offset > self.size():
1115            self.truncate(offset)
1116
1117        if len(data) > config.KEEP_BLOCK_SIZE:
1118            # Chunk it up into smaller writes
1119            n = 0
1120            dataview = memoryview(data)
1121            while n < len(data):
1122                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1123                n += config.KEEP_BLOCK_SIZE
1124            return
1125
1126        self.set_committed(False)
1127
1128        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1129            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1130
1131        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1132            self._current_bblock.repack_writes()
1133            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1134                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1135                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1136
1137        self._current_bblock.append(data)
1138
1139        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1140
1141        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1142
1143        return len(data)
1144
1145    @synchronized
1146    def flush(self, sync=True, num_retries=0):
1147        """Flush the current bufferblock to Keep.
1148
1149        :sync:
1150          If True, commit block synchronously, wait until buffer block has been written.
1151          If False, commit block asynchronously, return immediately after putting block into
1152          the keep put queue.
1153        """
1154        if self.committed():
1155            return
1156
1157        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1158            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1159                self._current_bblock.repack_writes()
1160            if self._current_bblock.state() != _BufferBlock.DELETED:
1161                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1162
1163        if sync:
1164            to_delete = set()
1165            for s in self._segments:
1166                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1167                if bb:
1168                    if bb.state() != _BufferBlock.COMMITTED:
1169                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1170                    to_delete.add(s.locator)
1171                    s.locator = bb.locator()
1172            for s in to_delete:
1173                # Don't delete the bufferblock if it's owned by many files. It'll be
1174                # deleted after all of its owners are flush()ed.
1175                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1176                    self.parent._my_block_manager().delete_bufferblock(s)
1177
1178        self.parent.notify(MOD, self.parent, self.name, (self, self))
1179
1180    @must_be_writable
1181    @synchronized
1182    def add_segment(self, blocks, pos, size):
1183        """Add a segment to the end of the file.
1184
1185        `pos` and `offset` reference a section of the stream described by
1186        `blocks` (a list of Range objects)
1187
1188        """
1189        self._add_segment(blocks, pos, size)
1190
1191    def _add_segment(self, blocks, pos, size):
1192        """Internal implementation of add_segment."""
1193        self.set_committed(False)
1194        for lr in locators_and_ranges(blocks, pos, size):
1195            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1196            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1197            self._segments.append(r)
1198
1199    @synchronized
1200    def size(self):
1201        """Get the file size."""
1202        if self._segments:
1203            n = self._segments[-1]
1204            return n.range_start + n.range_size
1205        else:
1206            return 0
1207
1208    @synchronized
1209    def manifest_text(self, stream_name=".", portable_locators=False,
1210                      normalize=False, only_committed=False):
1211        buf = ""
1212        filestream = []
1213        for segment in self._segments:
1214            loc = segment.locator
1215            if self.parent._my_block_manager().is_bufferblock(loc):
1216                if only_committed:
1217                    continue
1218                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1219            if portable_locators:
1220                loc = KeepLocator(loc).stripped()
1221            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1222                                 segment.segment_offset, segment.range_size))
1223        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1224        buf += "\n"
1225        return buf
1226
1227    @must_be_writable
1228    @synchronized
1229    def _reparent(self, newparent, newname):
1230        self.set_committed(False)
1231        self.flush(sync=True)
1232        self.parent.remove(self.name)
1233        self.parent = newparent
1234        self.name = newname
1235        self.lock = self.parent.root_collection().lock

Represent a file in a Collection.

ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.

This object may be accessed from multiple threads.

ArvadosFile(parent, name, stream=[], segments=[])
824    def __init__(self, parent, name, stream=[], segments=[]):
825        """
826        ArvadosFile constructor.
827
828        :stream:
829          a list of Range objects representing a block stream
830
831        :segments:
832          a list of Range objects representing segments
833        """
834        self.parent = parent
835        self.name = name
836        self._writers = set()
837        self._committed = False
838        self._segments = []
839        self.lock = parent.root_collection().lock
840        for s in segments:
841            self._add_segment(stream, s.locator, s.range_size)
842        self._current_bblock = None
843        self._read_counter = 0

ArvadosFile constructor.

:stream: a list of Range objects representing a block stream

:segments: a list of Range objects representing segments

parent
name
lock
def writable(self):
845    def writable(self):
846        return self.parent.writable()
@synchronized
def permission_expired(self, as_of_dt=None):
848    @synchronized
849    def permission_expired(self, as_of_dt=None):
850        """Returns True if any of the segment's locators is expired"""
851        for r in self._segments:
852            if KeepLocator(r.locator).permission_expired(as_of_dt):
853                return True
854        return False

Returns True if any of the segment’s locators is expired

@synchronized
def has_remote_blocks(self):
856    @synchronized
857    def has_remote_blocks(self):
858        """Returns True if any of the segment's locators has a +R signature"""
859
860        for s in self._segments:
861            if '+R' in s.locator:
862                return True
863        return False

Returns True if any of the segment’s locators has a +R signature

@synchronized
def segments(self):
888    @synchronized
889    def segments(self):
890        return copy.copy(self._segments)
@synchronized
def clone(self, new_parent, new_name):
892    @synchronized
893    def clone(self, new_parent, new_name):
894        """Make a copy of this file."""
895        cp = ArvadosFile(new_parent, new_name)
896        cp.replace_contents(self)
897        return cp

Make a copy of this file.

@must_be_writable
@synchronized
def replace_contents(self, other):
899    @must_be_writable
900    @synchronized
901    def replace_contents(self, other):
902        """Replace segments of this file with segments from another `ArvadosFile` object."""
903
904        map_loc = {}
905        self._segments = []
906        for other_segment in other.segments():
907            new_loc = other_segment.locator
908            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
909                if other_segment.locator not in map_loc:
910                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
911                    if bufferblock.state() != _BufferBlock.WRITABLE:
912                        map_loc[other_segment.locator] = bufferblock.locator()
913                    else:
914                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
915                new_loc = map_loc[other_segment.locator]
916
917            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
918
919        self.set_committed(False)

Replace segments of this file with segments from another ArvadosFile object.

@synchronized
def set_segments(self, segs):
954    @synchronized
955    def set_segments(self, segs):
956        self._segments = segs
@synchronized
def set_committed(self, value=True):
958    @synchronized
959    def set_committed(self, value=True):
960        """Set committed flag.
961
962        If value is True, set committed to be True.
963
964        If value is False, set committed to be False for this and all parents.
965        """
966        if value == self._committed:
967            return
968        self._committed = value
969        if self._committed is False and self.parent is not None:
970            self.parent.set_committed(False)

Set committed flag.

If value is True, set committed to be True.

If value is False, set committed to be False for this and all parents.

@synchronized
def committed(self):
972    @synchronized
973    def committed(self):
974        """Get whether this is committed or not."""
975        return self._committed

Get whether this is committed or not.

@synchronized
def add_writer(self, writer):
977    @synchronized
978    def add_writer(self, writer):
979        """Add an ArvadosFileWriter reference to the list of writers"""
980        if isinstance(writer, ArvadosFileWriter):
981            self._writers.add(writer)

Add an ArvadosFileWriter reference to the list of writers

@synchronized
def remove_writer(self, writer, flush):
983    @synchronized
984    def remove_writer(self, writer, flush):
985        """
986        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
987        and do some block maintenance tasks.
988        """
989        self._writers.remove(writer)
990
991        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
992            # File writer closed, not small enough for repacking
993            self.flush()
994        elif self.closed():
995            # All writers closed and size is adequate for repacking
996            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())

Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.

def closed(self):
 998    def closed(self):
 999        """
1000        Get whether this is closed or not. When the writers list is empty, the file
1001        is supposed to be closed.
1002        """
1003        return len(self._writers) == 0

Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.

@must_be_writable
@synchronized
def truncate(self, size):
1005    @must_be_writable
1006    @synchronized
1007    def truncate(self, size):
1008        """Shrink or expand the size of the file.
1009
1010        If `size` is less than the size of the file, the file contents after
1011        `size` will be discarded.  If `size` is greater than the current size
1012        of the file, it will be filled with zero bytes.
1013
1014        """
1015        if size < self.size():
1016            new_segs = []
1017            for r in self._segments:
1018                range_end = r.range_start+r.range_size
1019                if r.range_start >= size:
1020                    # segment is past the trucate size, all done
1021                    break
1022                elif size < range_end:
1023                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1024                    nr.segment_offset = r.segment_offset
1025                    new_segs.append(nr)
1026                    break
1027                else:
1028                    new_segs.append(r)
1029
1030            self._segments = new_segs
1031            self.set_committed(False)
1032        elif size > self.size():
1033            padding = self.parent._my_block_manager().get_padding_block()
1034            diff = size - self.size()
1035            while diff > config.KEEP_BLOCK_SIZE:
1036                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1037                diff -= config.KEEP_BLOCK_SIZE
1038            if diff > 0:
1039                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1040            self.set_committed(False)
1041        else:
1042            # size == self.size()
1043            pass

Shrink or expand the size of the file.

If size is less than the size of the file, the file contents after size will be discarded. If size is greater than the current size of the file, it will be filled with zero bytes.

def readfrom(self, offset, size, num_retries, exact=False):
1045    def readfrom(self, offset, size, num_retries, exact=False):
1046        """Read up to `size` bytes from the file starting at `offset`.
1047
1048        :exact:
1049         If False (default), return less data than requested if the read
1050         crosses a block boundary and the next block isn't cached.  If True,
1051         only return less data than requested when hitting EOF.
1052        """
1053
1054        with self.lock:
1055            if size == 0 or offset >= self.size():
1056                return b''
1057            readsegs = locators_and_ranges(self._segments, offset, size)
1058
1059            prefetch = None
1060            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
1061            if prefetch_lookahead:
1062                # Doing prefetch on every read() call is surprisingly expensive
1063                # when we're trying to deliver data at 600+ MiBps and want
1064                # the read() fast path to be as lightweight as possible.
1065                #
1066                # Only prefetching every 128 read operations
1067                # dramatically reduces the overhead while still
1068                # getting the benefit of prefetching (e.g. when
1069                # reading 128 KiB at a time, it checks for prefetch
1070                # every 16 MiB).
1071                self._read_counter = (self._read_counter+1) % 128
1072                if self._read_counter == 1:
1073                    prefetch = locators_and_ranges(self._segments,
1074                                                   offset + size,
1075                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
1076                                                   limit=(1+prefetch_lookahead))
1077
1078        locs = set()
1079        data = []
1080        for lr in readsegs:
1081            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1082            if block:
1083                blockview = memoryview(block)
1084                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
1085                locs.add(lr.locator)
1086            else:
1087                break
1088
1089        if prefetch:
1090            for lr in prefetch:
1091                if lr.locator not in locs:
1092                    self.parent._my_block_manager().block_prefetch(lr.locator)
1093                    locs.add(lr.locator)
1094
1095        if len(data) == 1:
1096            return data[0]
1097        else:
1098            return b''.join(data)

Read up to size bytes from the file starting at offset.

:exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.

@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
1100    @must_be_writable
1101    @synchronized
1102    def writeto(self, offset, data, num_retries):
1103        """Write `data` to the file starting at `offset`.
1104
1105        This will update existing bytes and/or extend the size of the file as
1106        necessary.
1107
1108        """
1109        if not isinstance(data, bytes) and not isinstance(data, memoryview):
1110            data = data.encode()
1111        if len(data) == 0:
1112            return
1113
1114        if offset > self.size():
1115            self.truncate(offset)
1116
1117        if len(data) > config.KEEP_BLOCK_SIZE:
1118            # Chunk it up into smaller writes
1119            n = 0
1120            dataview = memoryview(data)
1121            while n < len(data):
1122                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1123                n += config.KEEP_BLOCK_SIZE
1124            return
1125
1126        self.set_committed(False)
1127
1128        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1129            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1130
1131        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1132            self._current_bblock.repack_writes()
1133            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1134                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1135                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1136
1137        self._current_bblock.append(data)
1138
1139        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1140
1141        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1142
1143        return len(data)

Write data to the file starting at offset.

This will update existing bytes and/or extend the size of the file as necessary.

@synchronized
def flush(self, sync=True, num_retries=0):
1145    @synchronized
1146    def flush(self, sync=True, num_retries=0):
1147        """Flush the current bufferblock to Keep.
1148
1149        :sync:
1150          If True, commit block synchronously, wait until buffer block has been written.
1151          If False, commit block asynchronously, return immediately after putting block into
1152          the keep put queue.
1153        """
1154        if self.committed():
1155            return
1156
1157        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1158            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1159                self._current_bblock.repack_writes()
1160            if self._current_bblock.state() != _BufferBlock.DELETED:
1161                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1162
1163        if sync:
1164            to_delete = set()
1165            for s in self._segments:
1166                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1167                if bb:
1168                    if bb.state() != _BufferBlock.COMMITTED:
1169                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1170                    to_delete.add(s.locator)
1171                    s.locator = bb.locator()
1172            for s in to_delete:
1173                # Don't delete the bufferblock if it's owned by many files. It'll be
1174                # deleted after all of its owners are flush()ed.
1175                if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1176                    self.parent._my_block_manager().delete_bufferblock(s)
1177
1178        self.parent.notify(MOD, self.parent, self.name, (self, self))

Flush the current bufferblock to Keep.

:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.

@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
1180    @must_be_writable
1181    @synchronized
1182    def add_segment(self, blocks, pos, size):
1183        """Add a segment to the end of the file.
1184
1185        `pos` and `offset` reference a section of the stream described by
1186        `blocks` (a list of Range objects)
1187
1188        """
1189        self._add_segment(blocks, pos, size)

Add a segment to the end of the file.

pos and offset reference a section of the stream described by blocks (a list of Range objects)

@synchronized
def size(self):
1199    @synchronized
1200    def size(self):
1201        """Get the file size."""
1202        if self._segments:
1203            n = self._segments[-1]
1204            return n.range_start + n.range_size
1205        else:
1206            return 0

Get the file size.

@synchronized
def manifest_text( self, stream_name='.', portable_locators=False, normalize=False, only_committed=False):
1208    @synchronized
1209    def manifest_text(self, stream_name=".", portable_locators=False,
1210                      normalize=False, only_committed=False):
1211        buf = ""
1212        filestream = []
1213        for segment in self._segments:
1214            loc = segment.locator
1215            if self.parent._my_block_manager().is_bufferblock(loc):
1216                if only_committed:
1217                    continue
1218                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1219            if portable_locators:
1220                loc = KeepLocator(loc).stripped()
1221            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1222                                 segment.segment_offset, segment.range_size))
1223        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1224        buf += "\n"
1225        return buf
fuse_entry
class ArvadosFileReader(ArvadosFileReaderBase):
1238class ArvadosFileReader(ArvadosFileReaderBase):
1239    """Wraps ArvadosFile in a file-like object supporting reading only.
1240
1241    Be aware that this class is NOT thread safe as there is no locking around
1242    updating file pointer.
1243
1244    """
1245
1246    def __init__(self, arvadosfile, mode="r", num_retries=None):
1247        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1248        self.arvadosfile = arvadosfile
1249
1250    def size(self):
1251        return self.arvadosfile.size()
1252
1253    def stream_name(self):
1254        return self.arvadosfile.parent.stream_name()
1255
1256    def readinto(self, b):
1257        data = self.read(len(b))
1258        b[:len(data)] = data
1259        return len(data)
1260
1261    @_FileLikeObjectBase._before_close
1262    @retry_method
1263    def read(self, size=None, num_retries=None):
1264        """Read up to `size` bytes from the file and return the result.
1265
1266        Starts at the current file position.  If `size` is None, read the
1267        entire remainder of the file.
1268        """
1269        if size is None:
1270            data = []
1271            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1272            while rd:
1273                data.append(rd)
1274                self._filepos += len(rd)
1275                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1276            return b''.join(data)
1277        else:
1278            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1279            self._filepos += len(data)
1280            return data
1281
1282    @_FileLikeObjectBase._before_close
1283    @retry_method
1284    def readfrom(self, offset, size, num_retries=None):
1285        """Read up to `size` bytes from the stream, starting at the specified file offset.
1286
1287        This method does not change the file position.
1288        """
1289        return self.arvadosfile.readfrom(offset, size, num_retries)
1290
1291    def flush(self):
1292        pass

Wraps ArvadosFile in a file-like object supporting reading only.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileReader(arvadosfile, mode='r', num_retries=None)
1246    def __init__(self, arvadosfile, mode="r", num_retries=None):
1247        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries)
1248        self.arvadosfile = arvadosfile
arvadosfile
def size(self):
1250    def size(self):
1251        return self.arvadosfile.size()
def stream_name(self):
1253    def stream_name(self):
1254        return self.arvadosfile.parent.stream_name()
def readinto(self, b):
1256    def readinto(self, b):
1257        data = self.read(len(b))
1258        b[:len(data)] = data
1259        return len(data)
@retry_method
def read(self, size=None, num_retries=None):
1261    @_FileLikeObjectBase._before_close
1262    @retry_method
1263    def read(self, size=None, num_retries=None):
1264        """Read up to `size` bytes from the file and return the result.
1265
1266        Starts at the current file position.  If `size` is None, read the
1267        entire remainder of the file.
1268        """
1269        if size is None:
1270            data = []
1271            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1272            while rd:
1273                data.append(rd)
1274                self._filepos += len(rd)
1275                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1276            return b''.join(data)
1277        else:
1278            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1279            self._filepos += len(data)
1280            return data

Read up to size bytes from the file and return the result.

Starts at the current file position. If size is None, read the entire remainder of the file.

@retry_method
def readfrom(self, offset, size, num_retries=None):
1282    @_FileLikeObjectBase._before_close
1283    @retry_method
1284    def readfrom(self, offset, size, num_retries=None):
1285        """Read up to `size` bytes from the stream, starting at the specified file offset.
1286
1287        This method does not change the file position.
1288        """
1289        return self.arvadosfile.readfrom(offset, size, num_retries)

Read up to size bytes from the stream, starting at the specified file offset.

This method does not change the file position.

def flush(self):
1291    def flush(self):
1292        pass
class ArvadosFileWriter(ArvadosFileReader):
1295class ArvadosFileWriter(ArvadosFileReader):
1296    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1297
1298    Be aware that this class is NOT thread safe as there is no locking around
1299    updating file pointer.
1300
1301    """
1302
1303    def __init__(self, arvadosfile, mode, num_retries=None):
1304        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1305        self.arvadosfile.add_writer(self)
1306
1307    def writable(self):
1308        return True
1309
1310    @_FileLikeObjectBase._before_close
1311    @retry_method
1312    def write(self, data, num_retries=None):
1313        if self.mode[0] == "a":
1314            self._filepos = self.size()
1315        self.arvadosfile.writeto(self._filepos, data, num_retries)
1316        self._filepos += len(data)
1317        return len(data)
1318
1319    @_FileLikeObjectBase._before_close
1320    @retry_method
1321    def writelines(self, seq, num_retries=None):
1322        for s in seq:
1323            self.write(s, num_retries=num_retries)
1324
1325    @_FileLikeObjectBase._before_close
1326    def truncate(self, size=None):
1327        if size is None:
1328            size = self._filepos
1329        self.arvadosfile.truncate(size)
1330
1331    @_FileLikeObjectBase._before_close
1332    def flush(self):
1333        self.arvadosfile.flush()
1334
1335    def close(self, flush=True):
1336        if not self.closed:
1337            self.arvadosfile.remove_writer(self, flush)
1338            super(ArvadosFileWriter, self).close()

Wraps ArvadosFile in a file-like object supporting both reading and writing.

Be aware that this class is NOT thread safe as there is no locking around updating file pointer.

ArvadosFileWriter(arvadosfile, mode, num_retries=None)
1303    def __init__(self, arvadosfile, mode, num_retries=None):
1304        super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries)
1305        self.arvadosfile.add_writer(self)
def writable(self):
1307    def writable(self):
1308        return True
@retry_method
def write(self, data, num_retries=None):
1310    @_FileLikeObjectBase._before_close
1311    @retry_method
1312    def write(self, data, num_retries=None):
1313        if self.mode[0] == "a":
1314            self._filepos = self.size()
1315        self.arvadosfile.writeto(self._filepos, data, num_retries)
1316        self._filepos += len(data)
1317        return len(data)
@retry_method
def writelines(self, seq, num_retries=None):
1319    @_FileLikeObjectBase._before_close
1320    @retry_method
1321    def writelines(self, seq, num_retries=None):
1322        for s in seq:
1323            self.write(s, num_retries=num_retries)
def truncate(self, size=None):
1325    @_FileLikeObjectBase._before_close
1326    def truncate(self, size=None):
1327        if size is None:
1328            size = self._filepos
1329        self.arvadosfile.truncate(size)
def flush(self):
1331    @_FileLikeObjectBase._before_close
1332    def flush(self):
1333        self.arvadosfile.flush()
def close(self, flush=True):
1335    def close(self, flush=True):
1336        if not self.closed:
1337            self.arvadosfile.remove_writer(self, flush)
1338            super(ArvadosFileWriter, self).close()
class WrappableFile:
1341class WrappableFile(object):
1342    """An interface to an Arvados file that's compatible with io wrappers.
1343
1344    """
1345    def __init__(self, f):
1346        self.f = f
1347        self.closed = False
1348    def close(self):
1349        self.closed = True
1350        return self.f.close()
1351    def flush(self):
1352        return self.f.flush()
1353    def read(self, *args, **kwargs):
1354        return self.f.read(*args, **kwargs)
1355    def readable(self):
1356        return self.f.readable()
1357    def readinto(self, *args, **kwargs):
1358        return self.f.readinto(*args, **kwargs)
1359    def seek(self, *args, **kwargs):
1360        return self.f.seek(*args, **kwargs)
1361    def seekable(self):
1362        return self.f.seekable()
1363    def tell(self):
1364        return self.f.tell()
1365    def writable(self):
1366        return self.f.writable()
1367    def write(self, *args, **kwargs):
1368        return self.f.write(*args, **kwargs)

An interface to an Arvados file that’s compatible with io wrappers.

WrappableFile(f)
1345    def __init__(self, f):
1346        self.f = f
1347        self.closed = False
f
closed
def close(self):
1348    def close(self):
1349        self.closed = True
1350        return self.f.close()
def flush(self):
1351    def flush(self):
1352        return self.f.flush()
def read(self, *args, **kwargs):
1353    def read(self, *args, **kwargs):
1354        return self.f.read(*args, **kwargs)
def readable(self):
1355    def readable(self):
1356        return self.f.readable()
def readinto(self, *args, **kwargs):
1357    def readinto(self, *args, **kwargs):
1358        return self.f.readinto(*args, **kwargs)
def seek(self, *args, **kwargs):
1359    def seek(self, *args, **kwargs):
1360        return self.f.seek(*args, **kwargs)
def seekable(self):
1361    def seekable(self):
1362        return self.f.seekable()
def tell(self):
1363    def tell(self):
1364        return self.f.tell()
def writable(self):
1365    def writable(self):
1366        return self.f.writable()
def write(self, *args, **kwargs):
1367    def write(self, *args, **kwargs):
1368        return self.f.write(*args, **kwargs)