arvados.arvfile
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import bz2 6import collections 7import copy 8import errno 9import functools 10import hashlib 11import logging 12import os 13import queue 14import re 15import sys 16import threading 17import uuid 18import zlib 19 20from . import config 21from ._internal import streams 22from .errors import KeepWriteError, AssertionError, ArgumentError 23from .keep import KeepLocator 24from .retry import retry_method 25 26MOD = "mod" 27WRITE = "write" 28 29_logger = logging.getLogger('arvados.arvfile') 30 31def split(path): 32 """split(path) -> streamname, filename 33 34 Separate the stream name and file name in a /-separated stream path and 35 return a tuple (stream_name, file_name). If no stream name is available, 36 assume '.'. 37 38 """ 39 try: 40 stream_name, file_name = path.rsplit('/', 1) 41 except ValueError: # No / in string 42 stream_name, file_name = '.', path 43 return stream_name, file_name 44 45 46class UnownedBlockError(Exception): 47 """Raised when there's an writable block without an owner on the BlockManager.""" 48 pass 49 50 51class _FileLikeObjectBase(object): 52 def __init__(self, name, mode): 53 self.name = name 54 self.mode = mode 55 self.closed = False 56 57 @staticmethod 58 def _before_close(orig_func): 59 @functools.wraps(orig_func) 60 def before_close_wrapper(self, *args, **kwargs): 61 if self.closed: 62 raise ValueError("I/O operation on closed stream file") 63 return orig_func(self, *args, **kwargs) 64 return before_close_wrapper 65 66 def __enter__(self): 67 return self 68 69 def __exit__(self, exc_type, exc_value, traceback): 70 try: 71 self.close() 72 except Exception: 73 if exc_type is None: 74 raise 75 76 def close(self): 77 self.closed = True 78 79 80class ArvadosFileReaderBase(_FileLikeObjectBase): 81 def __init__(self, name, mode, num_retries=None): 82 super(ArvadosFileReaderBase, self).__init__(name, mode) 83 self._filepos = 0 84 self.num_retries = num_retries 85 self._readline_cache = (None, None) 86 87 def __iter__(self): 88 while True: 89 data = self.readline() 90 if not data: 91 break 92 yield data 93 94 def decompressed_name(self): 95 return re.sub(r'\.(bz2|gz)$', '', self.name) 96 97 @_FileLikeObjectBase._before_close 98 def seek(self, pos, whence=os.SEEK_SET): 99 if whence == os.SEEK_CUR: 100 pos += self._filepos 101 elif whence == os.SEEK_END: 102 pos += self.size() 103 if pos < 0: 104 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 105 self._filepos = pos 106 return self._filepos 107 108 def tell(self): 109 return self._filepos 110 111 def readable(self): 112 return True 113 114 def writable(self): 115 return False 116 117 def seekable(self): 118 return True 119 120 @_FileLikeObjectBase._before_close 121 @retry_method 122 def readall(self, size=2**20, num_retries=None): 123 while True: 124 data = self.read(size, num_retries=num_retries) 125 if len(data) == 0: 126 break 127 yield data 128 129 @_FileLikeObjectBase._before_close 130 @retry_method 131 def readline(self, size=float('inf'), num_retries=None): 132 cache_pos, cache_data = self._readline_cache 133 if self.tell() == cache_pos: 134 data = [cache_data] 135 self._filepos += len(cache_data) 136 else: 137 data = [b''] 138 data_size = len(data[-1]) 139 while (data_size < size) and (b'\n' not in data[-1]): 140 next_read = self.read(2 ** 20, num_retries=num_retries) 141 if not next_read: 142 break 143 data.append(next_read) 144 data_size += len(next_read) 145 data = b''.join(data) 146 try: 147 nextline_index = data.index(b'\n') + 1 148 except ValueError: 149 nextline_index = len(data) 150 nextline_index = min(nextline_index, size) 151 self._filepos -= len(data) - nextline_index 152 self._readline_cache = (self.tell(), data[nextline_index:]) 153 return data[:nextline_index].decode() 154 155 @_FileLikeObjectBase._before_close 156 @retry_method 157 def decompress(self, decompress, size, num_retries=None): 158 for segment in self.readall(size, num_retries=num_retries): 159 data = decompress(segment) 160 if data: 161 yield data 162 163 @_FileLikeObjectBase._before_close 164 @retry_method 165 def readall_decompressed(self, size=2**20, num_retries=None): 166 self.seek(0) 167 if self.name.endswith('.bz2'): 168 dc = bz2.BZ2Decompressor() 169 return self.decompress(dc.decompress, size, 170 num_retries=num_retries) 171 elif self.name.endswith('.gz'): 172 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 173 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 174 size, num_retries=num_retries) 175 else: 176 return self.readall(size, num_retries=num_retries) 177 178 @_FileLikeObjectBase._before_close 179 @retry_method 180 def readlines(self, sizehint=float('inf'), num_retries=None): 181 data = [] 182 data_size = 0 183 for s in self.readall(num_retries=num_retries): 184 data.append(s) 185 data_size += len(s) 186 if data_size >= sizehint: 187 break 188 return b''.join(data).decode().splitlines(True) 189 190 def size(self): 191 raise IOError(errno.ENOSYS, "Not implemented") 192 193 def read(self, size, num_retries=None): 194 raise IOError(errno.ENOSYS, "Not implemented") 195 196 def readfrom(self, start, size, num_retries=None): 197 raise IOError(errno.ENOSYS, "Not implemented") 198 199 200def synchronized(orig_func): 201 @functools.wraps(orig_func) 202 def synchronized_wrapper(self, *args, **kwargs): 203 with self.lock: 204 return orig_func(self, *args, **kwargs) 205 return synchronized_wrapper 206 207 208class StateChangeError(Exception): 209 def __init__(self, message, state, nextstate): 210 super(StateChangeError, self).__init__(message) 211 self.state = state 212 self.nextstate = nextstate 213 214class _BufferBlock(object): 215 """A stand-in for a Keep block that is in the process of being written. 216 217 Writers can append to it, get the size, and compute the Keep locator. 218 There are three valid states: 219 220 WRITABLE 221 Can append to block. 222 223 PENDING 224 Block is in the process of being uploaded to Keep, append is an error. 225 226 COMMITTED 227 The block has been written to Keep, its internal buffer has been 228 released, fetching the block will fetch it via keep client (since we 229 discarded the internal copy), and identifiers referring to the BufferBlock 230 can be replaced with the block locator. 231 232 """ 233 234 WRITABLE = 0 235 PENDING = 1 236 COMMITTED = 2 237 ERROR = 3 238 DELETED = 4 239 240 def __init__(self, blockid, starting_capacity, owner): 241 """ 242 :blockid: 243 the identifier for this block 244 245 :starting_capacity: 246 the initial buffer capacity 247 248 :owner: 249 ArvadosFile that owns this block 250 251 """ 252 self.blockid = blockid 253 self.buffer_block = bytearray(starting_capacity) 254 self.buffer_view = memoryview(self.buffer_block) 255 self.write_pointer = 0 256 self._state = _BufferBlock.WRITABLE 257 self._locator = None 258 self.owner = owner 259 self.lock = threading.Lock() 260 self.wait_for_commit = threading.Event() 261 self.error = None 262 263 @synchronized 264 def append(self, data): 265 """Append some data to the buffer. 266 267 Only valid if the block is in WRITABLE state. Implements an expanding 268 buffer, doubling capacity as needed to accomdate all the data. 269 270 """ 271 if self._state == _BufferBlock.WRITABLE: 272 if not isinstance(data, bytes) and not isinstance(data, memoryview): 273 data = data.encode() 274 while (self.write_pointer+len(data)) > len(self.buffer_block): 275 new_buffer_block = bytearray(len(self.buffer_block) * 2) 276 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] 277 self.buffer_block = new_buffer_block 278 self.buffer_view = memoryview(self.buffer_block) 279 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data 280 self.write_pointer += len(data) 281 self._locator = None 282 else: 283 raise AssertionError("Buffer block is not writable") 284 285 STATE_TRANSITIONS = frozenset([ 286 (WRITABLE, PENDING), 287 (PENDING, COMMITTED), 288 (PENDING, ERROR), 289 (ERROR, PENDING)]) 290 291 @synchronized 292 def set_state(self, nextstate, val=None): 293 if (self._state, nextstate) not in self.STATE_TRANSITIONS: 294 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) 295 self._state = nextstate 296 297 if self._state == _BufferBlock.PENDING: 298 self.wait_for_commit.clear() 299 300 if self._state == _BufferBlock.COMMITTED: 301 self._locator = val 302 self.buffer_view = None 303 self.buffer_block = None 304 self.wait_for_commit.set() 305 306 if self._state == _BufferBlock.ERROR: 307 self.error = val 308 self.wait_for_commit.set() 309 310 @synchronized 311 def state(self): 312 return self._state 313 314 def size(self): 315 """The amount of data written to the buffer.""" 316 return self.write_pointer 317 318 @synchronized 319 def locator(self): 320 """The Keep locator for this buffer's contents.""" 321 if self._locator is None: 322 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) 323 return self._locator 324 325 @synchronized 326 def clone(self, new_blockid, owner): 327 if self._state == _BufferBlock.COMMITTED: 328 raise AssertionError("Cannot duplicate committed buffer block") 329 bufferblock = _BufferBlock(new_blockid, self.size(), owner) 330 bufferblock.append(self.buffer_view[0:self.size()]) 331 return bufferblock 332 333 @synchronized 334 def clear(self): 335 self._state = _BufferBlock.DELETED 336 self.owner = None 337 self.buffer_block = None 338 self.buffer_view = None 339 340 @synchronized 341 def repack_writes(self): 342 """Optimize buffer block by repacking segments in file sequence. 343 344 When the client makes random writes, they appear in the buffer block in 345 the sequence they were written rather than the sequence they appear in 346 the file. This makes for inefficient, fragmented manifests. Attempt 347 to optimize by repacking writes in file sequence. 348 349 """ 350 if self._state != _BufferBlock.WRITABLE: 351 raise AssertionError("Cannot repack non-writable block") 352 353 segs = self.owner.segments() 354 355 # Collect the segments that reference the buffer block. 356 bufferblock_segs = [s for s in segs if s.locator == self.blockid] 357 358 # Collect total data referenced by segments (could be smaller than 359 # bufferblock size if a portion of the file was written and 360 # then overwritten). 361 write_total = sum([s.range_size for s in bufferblock_segs]) 362 363 if write_total < self.size() or len(bufferblock_segs) > 1: 364 # If there's more than one segment referencing this block, it is 365 # due to out-of-order writes and will produce a fragmented 366 # manifest, so try to optimize by re-packing into a new buffer. 367 contents = self.buffer_view[0:self.write_pointer].tobytes() 368 new_bb = _BufferBlock(None, write_total, None) 369 for t in bufferblock_segs: 370 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) 371 t.segment_offset = new_bb.size() - t.range_size 372 373 self.buffer_block = new_bb.buffer_block 374 self.buffer_view = new_bb.buffer_view 375 self.write_pointer = new_bb.write_pointer 376 self._locator = None 377 new_bb.clear() 378 self.owner.set_segments(segs) 379 380 def __repr__(self): 381 return "<BufferBlock %s>" % (self.blockid) 382 383 384class NoopLock(object): 385 def __enter__(self): 386 return self 387 388 def __exit__(self, exc_type, exc_value, traceback): 389 pass 390 391 def acquire(self, blocking=False): 392 pass 393 394 def release(self): 395 pass 396 397 398def must_be_writable(orig_func): 399 @functools.wraps(orig_func) 400 def must_be_writable_wrapper(self, *args, **kwargs): 401 if not self.writable(): 402 raise IOError(errno.EROFS, "Collection is read-only.") 403 return orig_func(self, *args, **kwargs) 404 return must_be_writable_wrapper 405 406 407class _BlockManager(object): 408 """BlockManager handles buffer blocks. 409 410 Also handles background block uploads, and background block prefetch for a 411 Collection of ArvadosFiles. 412 413 """ 414 415 DEFAULT_PUT_THREADS = 2 416 417 def __init__(self, keep, 418 copies=None, 419 put_threads=None, 420 num_retries=None, 421 storage_classes_func=None): 422 """keep: KeepClient object to use""" 423 self._keep = keep 424 self._bufferblocks = collections.OrderedDict() 425 self._put_queue = None 426 self._put_threads = None 427 self.lock = threading.Lock() 428 self.prefetch_lookahead = self._keep.num_prefetch_threads 429 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS 430 self.copies = copies 431 self.storage_classes = storage_classes_func or (lambda: []) 432 self._pending_write_size = 0 433 self.threads_lock = threading.Lock() 434 self.padding_block = None 435 self.num_retries = num_retries 436 437 @synchronized 438 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 439 """Allocate a new, empty bufferblock in WRITABLE state and return it. 440 441 :blockid: 442 optional block identifier, otherwise one will be automatically assigned 443 444 :starting_capacity: 445 optional capacity, otherwise will use default capacity 446 447 :owner: 448 ArvadosFile that owns this block 449 450 """ 451 return self._alloc_bufferblock(blockid, starting_capacity, owner) 452 453 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 454 if blockid is None: 455 blockid = str(uuid.uuid4()) 456 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) 457 self._bufferblocks[bufferblock.blockid] = bufferblock 458 return bufferblock 459 460 @synchronized 461 def dup_block(self, block, owner): 462 """Create a new bufferblock initialized with the content of an existing bufferblock. 463 464 :block: 465 the buffer block to copy. 466 467 :owner: 468 ArvadosFile that owns the new block 469 470 """ 471 new_blockid = str(uuid.uuid4()) 472 bufferblock = block.clone(new_blockid, owner) 473 self._bufferblocks[bufferblock.blockid] = bufferblock 474 return bufferblock 475 476 @synchronized 477 def is_bufferblock(self, locator): 478 return locator in self._bufferblocks 479 480 def _commit_bufferblock_worker(self): 481 """Background uploader thread.""" 482 483 while True: 484 try: 485 bufferblock = self._put_queue.get() 486 if bufferblock is None: 487 return 488 489 if self.copies is None: 490 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 491 else: 492 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 493 bufferblock.set_state(_BufferBlock.COMMITTED, loc) 494 except Exception as e: 495 bufferblock.set_state(_BufferBlock.ERROR, e) 496 finally: 497 if self._put_queue is not None: 498 self._put_queue.task_done() 499 500 def start_put_threads(self): 501 with self.threads_lock: 502 if self._put_threads is None: 503 # Start uploader threads. 504 505 # If we don't limit the Queue size, the upload queue can quickly 506 # grow to take up gigabytes of RAM if the writing process is 507 # generating data more quickly than it can be sent to the Keep 508 # servers. 509 # 510 # With two upload threads and a queue size of 2, this means up to 4 511 # blocks pending. If they are full 64 MiB blocks, that means up to 512 # 256 MiB of internal buffering, which is the same size as the 513 # default download block cache in KeepClient. 514 self._put_queue = queue.Queue(maxsize=2) 515 516 self._put_threads = [] 517 for i in range(0, self.num_put_threads): 518 thread = threading.Thread(target=self._commit_bufferblock_worker) 519 self._put_threads.append(thread) 520 thread.daemon = True 521 thread.start() 522 523 @synchronized 524 def stop_threads(self): 525 """Shut down and wait for background upload and download threads to finish.""" 526 527 if self._put_threads is not None: 528 for t in self._put_threads: 529 self._put_queue.put(None) 530 for t in self._put_threads: 531 t.join() 532 self._put_threads = None 533 self._put_queue = None 534 535 def __enter__(self): 536 return self 537 538 def __exit__(self, exc_type, exc_value, traceback): 539 self.stop_threads() 540 541 @synchronized 542 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): 543 """Packs small blocks together before uploading""" 544 545 self._pending_write_size += closed_file_size 546 547 # Check if there are enough small blocks for filling up one in full 548 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): 549 return 550 551 # Search blocks ready for getting packed together before being 552 # committed to Keep. 553 # A WRITABLE block always has an owner. 554 # A WRITABLE block with its owner.closed() implies that its 555 # size is <= KEEP_BLOCK_SIZE/2. 556 try: 557 small_blocks = [b for b in self._bufferblocks.values() 558 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] 559 except AttributeError: 560 # Writable blocks without owner shouldn't exist. 561 raise UnownedBlockError() 562 563 if len(small_blocks) <= 1: 564 # Not enough small blocks for repacking 565 return 566 567 for bb in small_blocks: 568 bb.repack_writes() 569 570 # Update the pending write size count with its true value, just in case 571 # some small file was opened, written and closed several times. 572 self._pending_write_size = sum([b.size() for b in small_blocks]) 573 574 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: 575 return 576 577 new_bb = self._alloc_bufferblock() 578 new_bb.owner = [] 579 files = [] 580 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: 581 bb = small_blocks.pop(0) 582 new_bb.owner.append(bb.owner) 583 self._pending_write_size -= bb.size() 584 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) 585 files.append((bb, new_bb.write_pointer - bb.size())) 586 587 self.commit_bufferblock(new_bb, sync=sync) 588 589 for bb, new_bb_segment_offset in files: 590 newsegs = bb.owner.segments() 591 for s in newsegs: 592 if s.locator == bb.blockid: 593 s.locator = new_bb.blockid 594 s.segment_offset = new_bb_segment_offset+s.segment_offset 595 bb.owner.set_segments(newsegs) 596 self._delete_bufferblock(bb.blockid) 597 598 def commit_bufferblock(self, block, sync): 599 """Initiate a background upload of a bufferblock. 600 601 :block: 602 The block object to upload 603 604 :sync: 605 If `sync` is True, upload the block synchronously. 606 If `sync` is False, upload the block asynchronously. This will 607 return immediately unless the upload queue is at capacity, in 608 which case it will wait on an upload queue slot. 609 610 """ 611 try: 612 # Mark the block as PENDING so to disallow any more appends. 613 block.set_state(_BufferBlock.PENDING) 614 except StateChangeError as e: 615 if e.state == _BufferBlock.PENDING: 616 if sync: 617 block.wait_for_commit.wait() 618 else: 619 return 620 if block.state() == _BufferBlock.COMMITTED: 621 return 622 elif block.state() == _BufferBlock.ERROR: 623 raise block.error 624 else: 625 raise 626 627 if sync: 628 try: 629 if self.copies is None: 630 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 631 else: 632 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 633 block.set_state(_BufferBlock.COMMITTED, loc) 634 except Exception as e: 635 block.set_state(_BufferBlock.ERROR, e) 636 raise 637 else: 638 self.start_put_threads() 639 self._put_queue.put(block) 640 641 @synchronized 642 def get_bufferblock(self, locator): 643 return self._bufferblocks.get(locator) 644 645 @synchronized 646 def get_padding_block(self): 647 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding 648 when using truncate() to extend the size of a file. 649 650 For reference (and possible future optimization), the md5sum of the 651 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 652 653 """ 654 655 if self.padding_block is None: 656 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) 657 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE 658 self.commit_bufferblock(self.padding_block, False) 659 return self.padding_block 660 661 @synchronized 662 def delete_bufferblock(self, locator): 663 self._delete_bufferblock(locator) 664 665 def _delete_bufferblock(self, locator): 666 if locator in self._bufferblocks: 667 bb = self._bufferblocks[locator] 668 bb.clear() 669 del self._bufferblocks[locator] 670 671 def get_block_contents(self, locator, num_retries, cache_only=False): 672 """Fetch a block. 673 674 First checks to see if the locator is a BufferBlock and return that, if 675 not, passes the request through to KeepClient.get(). 676 677 """ 678 with self.lock: 679 if locator in self._bufferblocks: 680 bufferblock = self._bufferblocks[locator] 681 if bufferblock.state() != _BufferBlock.COMMITTED: 682 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() 683 else: 684 locator = bufferblock._locator 685 if cache_only: 686 return self._keep.get_from_cache(locator) 687 else: 688 return self._keep.get(locator, num_retries=num_retries) 689 690 def commit_all(self): 691 """Commit all outstanding buffer blocks. 692 693 This is a synchronous call, and will not return until all buffer blocks 694 are uploaded. Raises KeepWriteError() if any blocks failed to upload. 695 696 """ 697 self.repack_small_blocks(force=True, sync=True) 698 699 with self.lock: 700 items = list(self._bufferblocks.items()) 701 702 for k,v in items: 703 if v.state() != _BufferBlock.COMMITTED and v.owner: 704 # Ignore blocks with a list of owners, as if they're not in COMMITTED 705 # state, they're already being committed asynchronously. 706 if isinstance(v.owner, ArvadosFile): 707 v.owner.flush(sync=False) 708 709 with self.lock: 710 if self._put_queue is not None: 711 self._put_queue.join() 712 713 err = [] 714 for k,v in items: 715 if v.state() == _BufferBlock.ERROR: 716 err.append((v.locator(), v.error)) 717 if err: 718 raise KeepWriteError("Error writing some blocks", err, label="block") 719 720 for k,v in items: 721 # flush again with sync=True to remove committed bufferblocks from 722 # the segments. 723 if v.owner: 724 if isinstance(v.owner, ArvadosFile): 725 v.owner.flush(sync=True) 726 elif isinstance(v.owner, list) and len(v.owner) > 0: 727 # This bufferblock is referenced by many files as a result 728 # of repacking small blocks, so don't delete it when flushing 729 # its owners, just do it after flushing them all. 730 for owner in v.owner: 731 owner.flush(sync=True) 732 self.delete_bufferblock(k) 733 734 self.stop_threads() 735 736 def block_prefetch(self, locator): 737 """Initiate a background download of a block. 738 """ 739 740 if not self.prefetch_lookahead: 741 return 742 743 with self.lock: 744 if locator in self._bufferblocks: 745 return 746 747 self._keep.block_prefetch(locator) 748 749 750class ArvadosFile(object): 751 """Represent a file in a Collection. 752 753 ArvadosFile manages the underlying representation of a file in Keep as a 754 sequence of segments spanning a set of blocks, and implements random 755 read/write access. 756 757 This object may be accessed from multiple threads. 758 759 """ 760 761 __slots__ = ('parent', 'name', '_writers', '_committed', 762 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 763 764 def __init__(self, parent, name, stream=[], segments=[]): 765 """ 766 ArvadosFile constructor. 767 768 :stream: 769 a list of Range objects representing a block stream 770 771 :segments: 772 a list of Range objects representing segments 773 """ 774 self.parent = parent 775 self.name = name 776 self._writers = set() 777 self._committed = False 778 self._segments = [] 779 self.lock = parent.root_collection().lock 780 for s in segments: 781 self._add_segment(stream, s.locator, s.range_size) 782 self._current_bblock = None 783 self._read_counter = 0 784 785 def writable(self): 786 return self.parent.writable() 787 788 @synchronized 789 def permission_expired(self, as_of_dt=None): 790 """Returns True if any of the segment's locators is expired""" 791 for r in self._segments: 792 if KeepLocator(r.locator).permission_expired(as_of_dt): 793 return True 794 return False 795 796 @synchronized 797 def has_remote_blocks(self): 798 """Returns True if any of the segment's locators has a +R signature""" 799 800 for s in self._segments: 801 if '+R' in s.locator: 802 return True 803 return False 804 805 @synchronized 806 def _copy_remote_blocks(self, remote_blocks={}): 807 """Ask Keep to copy remote blocks and point to their local copies. 808 809 This is called from the parent Collection. 810 811 :remote_blocks: 812 Shared cache of remote to local block mappings. This is used to avoid 813 doing extra work when blocks are shared by more than one file in 814 different subdirectories. 815 """ 816 817 for s in self._segments: 818 if '+R' in s.locator: 819 try: 820 loc = remote_blocks[s.locator] 821 except KeyError: 822 loc = self.parent._my_keep().refresh_signature(s.locator) 823 remote_blocks[s.locator] = loc 824 s.locator = loc 825 self.parent.set_committed(False) 826 return remote_blocks 827 828 @synchronized 829 def segments(self): 830 return copy.copy(self._segments) 831 832 @synchronized 833 def clone(self, new_parent, new_name): 834 """Make a copy of this file.""" 835 cp = ArvadosFile(new_parent, new_name) 836 cp.replace_contents(self) 837 return cp 838 839 @must_be_writable 840 @synchronized 841 def replace_contents(self, other): 842 """Replace segments of this file with segments from another `ArvadosFile` object.""" 843 844 map_loc = {} 845 self._segments = [] 846 for other_segment in other.segments(): 847 new_loc = other_segment.locator 848 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 849 if other_segment.locator not in map_loc: 850 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 851 if bufferblock.state() != _BufferBlock.WRITABLE: 852 map_loc[other_segment.locator] = bufferblock.locator() 853 else: 854 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 855 new_loc = map_loc[other_segment.locator] 856 857 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 858 859 self.set_committed(False) 860 861 def __eq__(self, other): 862 if other is self: 863 return True 864 if not isinstance(other, ArvadosFile): 865 return False 866 867 othersegs = other.segments() 868 with self.lock: 869 if len(self._segments) != len(othersegs): 870 return False 871 for i in range(0, len(othersegs)): 872 seg1 = self._segments[i] 873 seg2 = othersegs[i] 874 loc1 = seg1.locator 875 loc2 = seg2.locator 876 877 if self.parent._my_block_manager().is_bufferblock(loc1): 878 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 879 880 if other.parent._my_block_manager().is_bufferblock(loc2): 881 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 882 883 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 884 seg1.range_start != seg2.range_start or 885 seg1.range_size != seg2.range_size or 886 seg1.segment_offset != seg2.segment_offset): 887 return False 888 889 return True 890 891 def __ne__(self, other): 892 return not self.__eq__(other) 893 894 @synchronized 895 def set_segments(self, segs): 896 self._segments = segs 897 898 @synchronized 899 def set_committed(self, value=True): 900 """Set committed flag. 901 902 If value is True, set committed to be True. 903 904 If value is False, set committed to be False for this and all parents. 905 """ 906 if value == self._committed: 907 return 908 self._committed = value 909 if self._committed is False and self.parent is not None: 910 self.parent.set_committed(False) 911 912 @synchronized 913 def committed(self): 914 """Get whether this is committed or not.""" 915 return self._committed 916 917 @synchronized 918 def add_writer(self, writer): 919 """Add an ArvadosFileWriter reference to the list of writers""" 920 if isinstance(writer, ArvadosFileWriter): 921 self._writers.add(writer) 922 923 @synchronized 924 def remove_writer(self, writer, flush): 925 """ 926 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 927 and do some block maintenance tasks. 928 """ 929 self._writers.remove(writer) 930 931 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 932 # File writer closed, not small enough for repacking 933 self.flush() 934 elif self.closed(): 935 # All writers closed and size is adequate for repacking 936 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 937 938 def closed(self): 939 """ 940 Get whether this is closed or not. When the writers list is empty, the file 941 is supposed to be closed. 942 """ 943 return len(self._writers) == 0 944 945 @must_be_writable 946 @synchronized 947 def truncate(self, size): 948 """Shrink or expand the size of the file. 949 950 If `size` is less than the size of the file, the file contents after 951 `size` will be discarded. If `size` is greater than the current size 952 of the file, it will be filled with zero bytes. 953 954 """ 955 if size < self.size(): 956 new_segs = [] 957 for r in self._segments: 958 range_end = r.range_start+r.range_size 959 if r.range_start >= size: 960 # segment is past the trucate size, all done 961 break 962 elif size < range_end: 963 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0) 964 nr.segment_offset = r.segment_offset 965 new_segs.append(nr) 966 break 967 else: 968 new_segs.append(r) 969 970 self._segments = new_segs 971 self.set_committed(False) 972 elif size > self.size(): 973 padding = self.parent._my_block_manager().get_padding_block() 974 diff = size - self.size() 975 while diff > config.KEEP_BLOCK_SIZE: 976 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 977 diff -= config.KEEP_BLOCK_SIZE 978 if diff > 0: 979 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0)) 980 self.set_committed(False) 981 else: 982 # size == self.size() 983 pass 984 985 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 986 """Read up to `size` bytes from the file starting at `offset`. 987 988 Arguments: 989 990 * exact: bool --- If False (default), return less data than 991 requested if the read crosses a block boundary and the next 992 block isn't cached. If True, only return less data than 993 requested when hitting EOF. 994 995 * return_memoryview: bool --- If False (default) return a 996 `bytes` object, which may entail making a copy in some 997 situations. If True, return a `memoryview` object which may 998 avoid making a copy, but may be incompatible with code 999 expecting a `bytes` object. 1000 1001 """ 1002 1003 with self.lock: 1004 if size == 0 or offset >= self.size(): 1005 return memoryview(b'') if return_memoryview else b'' 1006 readsegs = streams.locators_and_ranges(self._segments, offset, size) 1007 1008 prefetch = None 1009 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1010 if prefetch_lookahead: 1011 # Doing prefetch on every read() call is surprisingly expensive 1012 # when we're trying to deliver data at 600+ MiBps and want 1013 # the read() fast path to be as lightweight as possible. 1014 # 1015 # Only prefetching every 128 read operations 1016 # dramatically reduces the overhead while still 1017 # getting the benefit of prefetching (e.g. when 1018 # reading 128 KiB at a time, it checks for prefetch 1019 # every 16 MiB). 1020 self._read_counter = (self._read_counter+1) % 128 1021 if self._read_counter == 1: 1022 prefetch = streams.locators_and_ranges( 1023 self._segments, 1024 offset + size, 1025 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1026 limit=(1+prefetch_lookahead), 1027 ) 1028 1029 locs = set() 1030 data = [] 1031 for lr in readsegs: 1032 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1033 if block: 1034 blockview = memoryview(block) 1035 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1036 locs.add(lr.locator) 1037 else: 1038 break 1039 1040 if prefetch: 1041 for lr in prefetch: 1042 if lr.locator not in locs: 1043 self.parent._my_block_manager().block_prefetch(lr.locator) 1044 locs.add(lr.locator) 1045 1046 if len(data) == 1: 1047 return data[0] if return_memoryview else data[0].tobytes() 1048 else: 1049 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1050 1051 1052 @must_be_writable 1053 @synchronized 1054 def writeto(self, offset, data, num_retries): 1055 """Write `data` to the file starting at `offset`. 1056 1057 This will update existing bytes and/or extend the size of the file as 1058 necessary. 1059 1060 """ 1061 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1062 data = data.encode() 1063 if len(data) == 0: 1064 return 1065 1066 if offset > self.size(): 1067 self.truncate(offset) 1068 1069 if len(data) > config.KEEP_BLOCK_SIZE: 1070 # Chunk it up into smaller writes 1071 n = 0 1072 dataview = memoryview(data) 1073 while n < len(data): 1074 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1075 n += config.KEEP_BLOCK_SIZE 1076 return 1077 1078 self.set_committed(False) 1079 1080 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1081 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1082 1083 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1084 self._current_bblock.repack_writes() 1085 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1086 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1087 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1088 1089 self._current_bblock.append(data) 1090 streams.replace_range( 1091 self._segments, 1092 offset, 1093 len(data), 1094 self._current_bblock.blockid, 1095 self._current_bblock.write_pointer - len(data), 1096 ) 1097 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1098 return len(data) 1099 1100 @synchronized 1101 def flush(self, sync=True, num_retries=0): 1102 """Flush the current bufferblock to Keep. 1103 1104 :sync: 1105 If True, commit block synchronously, wait until buffer block has been written. 1106 If False, commit block asynchronously, return immediately after putting block into 1107 the keep put queue. 1108 """ 1109 if self.committed(): 1110 return 1111 1112 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1113 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1114 self._current_bblock.repack_writes() 1115 if self._current_bblock.state() != _BufferBlock.DELETED: 1116 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1117 1118 if sync: 1119 to_delete = set() 1120 for s in self._segments: 1121 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1122 if bb: 1123 if bb.state() != _BufferBlock.COMMITTED: 1124 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1125 to_delete.add(s.locator) 1126 s.locator = bb.locator() 1127 for s in to_delete: 1128 # Don't delete the bufferblock if it's owned by many files. It'll be 1129 # deleted after all of its owners are flush()ed. 1130 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1131 self.parent._my_block_manager().delete_bufferblock(s) 1132 1133 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1134 1135 @must_be_writable 1136 @synchronized 1137 def add_segment(self, blocks, pos, size): 1138 """Add a segment to the end of the file. 1139 1140 `pos` and `offset` reference a section of the stream described by 1141 `blocks` (a list of Range objects) 1142 1143 """ 1144 self._add_segment(blocks, pos, size) 1145 1146 def _add_segment(self, blocks, pos, size): 1147 """Internal implementation of add_segment.""" 1148 self.set_committed(False) 1149 for lr in streams.locators_and_ranges(blocks, pos, size): 1150 last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0) 1151 r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1152 self._segments.append(r) 1153 1154 @synchronized 1155 def size(self): 1156 """Get the file size.""" 1157 if self._segments: 1158 n = self._segments[-1] 1159 return n.range_start + n.range_size 1160 else: 1161 return 0 1162 1163 @synchronized 1164 def manifest_text(self, stream_name=".", portable_locators=False, 1165 normalize=False, only_committed=False): 1166 buf = "" 1167 filestream = [] 1168 for segment in self._segments: 1169 loc = segment.locator 1170 if self.parent._my_block_manager().is_bufferblock(loc): 1171 if only_committed: 1172 continue 1173 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1174 if portable_locators: 1175 loc = KeepLocator(loc).stripped() 1176 filestream.append(streams.LocatorAndRange( 1177 loc, 1178 KeepLocator(loc).size, 1179 segment.segment_offset, 1180 segment.range_size, 1181 )) 1182 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream})) 1183 buf += "\n" 1184 return buf 1185 1186 @must_be_writable 1187 @synchronized 1188 def _reparent(self, newparent, newname): 1189 self.set_committed(False) 1190 self.flush(sync=True) 1191 self.parent.remove(self.name) 1192 self.parent = newparent 1193 self.name = newname 1194 self.lock = self.parent.root_collection().lock 1195 1196 1197class ArvadosFileReader(ArvadosFileReaderBase): 1198 """Wraps ArvadosFile in a file-like object supporting reading only. 1199 1200 Be aware that this class is NOT thread safe as there is no locking around 1201 updating file pointer. 1202 1203 """ 1204 1205 def __init__(self, arvadosfile, mode="r", num_retries=None): 1206 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1207 self.arvadosfile = arvadosfile 1208 1209 def size(self): 1210 return self.arvadosfile.size() 1211 1212 def stream_name(self): 1213 return self.arvadosfile.parent.stream_name() 1214 1215 def readinto(self, b): 1216 data = self.read(len(b)) 1217 b[:len(data)] = data 1218 return len(data) 1219 1220 @_FileLikeObjectBase._before_close 1221 @retry_method 1222 def read(self, size=-1, num_retries=None, return_memoryview=False): 1223 """Read up to `size` bytes from the file and return the result. 1224 1225 Starts at the current file position. If `size` is negative or None, 1226 read the entire remainder of the file. 1227 1228 Returns None if the file pointer is at the end of the file. 1229 1230 Returns a `bytes` object, unless `return_memoryview` is True, 1231 in which case it returns a memory view, which may avoid an 1232 unnecessary data copy in some situations. 1233 1234 """ 1235 if size < 0 or size is None: 1236 data = [] 1237 # 1238 # specify exact=False, return_memoryview=True here so that we 1239 # only copy data once into the final buffer. 1240 # 1241 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1242 while rd: 1243 data.append(rd) 1244 self._filepos += len(rd) 1245 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1246 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1247 else: 1248 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1249 self._filepos += len(data) 1250 return data 1251 1252 @_FileLikeObjectBase._before_close 1253 @retry_method 1254 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1255 """Read up to `size` bytes from the stream, starting at the specified file offset. 1256 1257 This method does not change the file position. 1258 1259 Returns a `bytes` object, unless `return_memoryview` is True, 1260 in which case it returns a memory view, which may avoid an 1261 unnecessary data copy in some situations. 1262 1263 """ 1264 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview) 1265 1266 def flush(self): 1267 pass 1268 1269 1270class ArvadosFileWriter(ArvadosFileReader): 1271 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1272 1273 Be aware that this class is NOT thread safe as there is no locking around 1274 updating file pointer. 1275 1276 """ 1277 1278 def __init__(self, arvadosfile, mode, num_retries=None): 1279 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1280 self.arvadosfile.add_writer(self) 1281 1282 def writable(self): 1283 return True 1284 1285 @_FileLikeObjectBase._before_close 1286 @retry_method 1287 def write(self, data, num_retries=None): 1288 if self.mode[0] == "a": 1289 self._filepos = self.size() 1290 self.arvadosfile.writeto(self._filepos, data, num_retries) 1291 self._filepos += len(data) 1292 return len(data) 1293 1294 @_FileLikeObjectBase._before_close 1295 @retry_method 1296 def writelines(self, seq, num_retries=None): 1297 for s in seq: 1298 self.write(s, num_retries=num_retries) 1299 1300 @_FileLikeObjectBase._before_close 1301 def truncate(self, size=None): 1302 if size is None: 1303 size = self._filepos 1304 self.arvadosfile.truncate(size) 1305 1306 @_FileLikeObjectBase._before_close 1307 def flush(self): 1308 self.arvadosfile.flush() 1309 1310 def close(self, flush=True): 1311 if not self.closed: 1312 self.arvadosfile.remove_writer(self, flush) 1313 super(ArvadosFileWriter, self).close() 1314 1315 1316class WrappableFile(object): 1317 """An interface to an Arvados file that's compatible with io wrappers. 1318 1319 """ 1320 def __init__(self, f): 1321 self.f = f 1322 self.closed = False 1323 def close(self): 1324 self.closed = True 1325 return self.f.close() 1326 def flush(self): 1327 return self.f.flush() 1328 def read(self, *args, **kwargs): 1329 return self.f.read(*args, **kwargs) 1330 def readable(self): 1331 return self.f.readable() 1332 def readinto(self, *args, **kwargs): 1333 return self.f.readinto(*args, **kwargs) 1334 def seek(self, *args, **kwargs): 1335 return self.f.seek(*args, **kwargs) 1336 def seekable(self): 1337 return self.f.seekable() 1338 def tell(self): 1339 return self.f.tell() 1340 def writable(self): 1341 return self.f.writable() 1342 def write(self, *args, **kwargs): 1343 return self.f.write(*args, **kwargs)
32def split(path): 33 """split(path) -> streamname, filename 34 35 Separate the stream name and file name in a /-separated stream path and 36 return a tuple (stream_name, file_name). If no stream name is available, 37 assume '.'. 38 39 """ 40 try: 41 stream_name, file_name = path.rsplit('/', 1) 42 except ValueError: # No / in string 43 stream_name, file_name = '.', path 44 return stream_name, file_name
split(path) -> streamname, filename
Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.
47class UnownedBlockError(Exception): 48 """Raised when there's an writable block without an owner on the BlockManager.""" 49 pass
Raised when there’s an writable block without an owner on the BlockManager.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
81class ArvadosFileReaderBase(_FileLikeObjectBase): 82 def __init__(self, name, mode, num_retries=None): 83 super(ArvadosFileReaderBase, self).__init__(name, mode) 84 self._filepos = 0 85 self.num_retries = num_retries 86 self._readline_cache = (None, None) 87 88 def __iter__(self): 89 while True: 90 data = self.readline() 91 if not data: 92 break 93 yield data 94 95 def decompressed_name(self): 96 return re.sub(r'\.(bz2|gz)$', '', self.name) 97 98 @_FileLikeObjectBase._before_close 99 def seek(self, pos, whence=os.SEEK_SET): 100 if whence == os.SEEK_CUR: 101 pos += self._filepos 102 elif whence == os.SEEK_END: 103 pos += self.size() 104 if pos < 0: 105 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 106 self._filepos = pos 107 return self._filepos 108 109 def tell(self): 110 return self._filepos 111 112 def readable(self): 113 return True 114 115 def writable(self): 116 return False 117 118 def seekable(self): 119 return True 120 121 @_FileLikeObjectBase._before_close 122 @retry_method 123 def readall(self, size=2**20, num_retries=None): 124 while True: 125 data = self.read(size, num_retries=num_retries) 126 if len(data) == 0: 127 break 128 yield data 129 130 @_FileLikeObjectBase._before_close 131 @retry_method 132 def readline(self, size=float('inf'), num_retries=None): 133 cache_pos, cache_data = self._readline_cache 134 if self.tell() == cache_pos: 135 data = [cache_data] 136 self._filepos += len(cache_data) 137 else: 138 data = [b''] 139 data_size = len(data[-1]) 140 while (data_size < size) and (b'\n' not in data[-1]): 141 next_read = self.read(2 ** 20, num_retries=num_retries) 142 if not next_read: 143 break 144 data.append(next_read) 145 data_size += len(next_read) 146 data = b''.join(data) 147 try: 148 nextline_index = data.index(b'\n') + 1 149 except ValueError: 150 nextline_index = len(data) 151 nextline_index = min(nextline_index, size) 152 self._filepos -= len(data) - nextline_index 153 self._readline_cache = (self.tell(), data[nextline_index:]) 154 return data[:nextline_index].decode() 155 156 @_FileLikeObjectBase._before_close 157 @retry_method 158 def decompress(self, decompress, size, num_retries=None): 159 for segment in self.readall(size, num_retries=num_retries): 160 data = decompress(segment) 161 if data: 162 yield data 163 164 @_FileLikeObjectBase._before_close 165 @retry_method 166 def readall_decompressed(self, size=2**20, num_retries=None): 167 self.seek(0) 168 if self.name.endswith('.bz2'): 169 dc = bz2.BZ2Decompressor() 170 return self.decompress(dc.decompress, size, 171 num_retries=num_retries) 172 elif self.name.endswith('.gz'): 173 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 174 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 175 size, num_retries=num_retries) 176 else: 177 return self.readall(size, num_retries=num_retries) 178 179 @_FileLikeObjectBase._before_close 180 @retry_method 181 def readlines(self, sizehint=float('inf'), num_retries=None): 182 data = [] 183 data_size = 0 184 for s in self.readall(num_retries=num_retries): 185 data.append(s) 186 data_size += len(s) 187 if data_size >= sizehint: 188 break 189 return b''.join(data).decode().splitlines(True) 190 191 def size(self): 192 raise IOError(errno.ENOSYS, "Not implemented") 193 194 def read(self, size, num_retries=None): 195 raise IOError(errno.ENOSYS, "Not implemented") 196 197 def readfrom(self, start, size, num_retries=None): 198 raise IOError(errno.ENOSYS, "Not implemented")
98 @_FileLikeObjectBase._before_close 99 def seek(self, pos, whence=os.SEEK_SET): 100 if whence == os.SEEK_CUR: 101 pos += self._filepos 102 elif whence == os.SEEK_END: 103 pos += self.size() 104 if pos < 0: 105 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 106 self._filepos = pos 107 return self._filepos
130 @_FileLikeObjectBase._before_close 131 @retry_method 132 def readline(self, size=float('inf'), num_retries=None): 133 cache_pos, cache_data = self._readline_cache 134 if self.tell() == cache_pos: 135 data = [cache_data] 136 self._filepos += len(cache_data) 137 else: 138 data = [b''] 139 data_size = len(data[-1]) 140 while (data_size < size) and (b'\n' not in data[-1]): 141 next_read = self.read(2 ** 20, num_retries=num_retries) 142 if not next_read: 143 break 144 data.append(next_read) 145 data_size += len(next_read) 146 data = b''.join(data) 147 try: 148 nextline_index = data.index(b'\n') + 1 149 except ValueError: 150 nextline_index = len(data) 151 nextline_index = min(nextline_index, size) 152 self._filepos -= len(data) - nextline_index 153 self._readline_cache = (self.tell(), data[nextline_index:]) 154 return data[:nextline_index].decode()
164 @_FileLikeObjectBase._before_close 165 @retry_method 166 def readall_decompressed(self, size=2**20, num_retries=None): 167 self.seek(0) 168 if self.name.endswith('.bz2'): 169 dc = bz2.BZ2Decompressor() 170 return self.decompress(dc.decompress, size, 171 num_retries=num_retries) 172 elif self.name.endswith('.gz'): 173 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 174 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 175 size, num_retries=num_retries) 176 else: 177 return self.readall(size, num_retries=num_retries)
179 @_FileLikeObjectBase._before_close 180 @retry_method 181 def readlines(self, sizehint=float('inf'), num_retries=None): 182 data = [] 183 data_size = 0 184 for s in self.readall(num_retries=num_retries): 185 data.append(s) 186 data_size += len(s) 187 if data_size >= sizehint: 188 break 189 return b''.join(data).decode().splitlines(True)
Inherited Members
209class StateChangeError(Exception): 210 def __init__(self, message, state, nextstate): 211 super(StateChangeError, self).__init__(message) 212 self.state = state 213 self.nextstate = nextstate
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
751class ArvadosFile(object): 752 """Represent a file in a Collection. 753 754 ArvadosFile manages the underlying representation of a file in Keep as a 755 sequence of segments spanning a set of blocks, and implements random 756 read/write access. 757 758 This object may be accessed from multiple threads. 759 760 """ 761 762 __slots__ = ('parent', 'name', '_writers', '_committed', 763 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 764 765 def __init__(self, parent, name, stream=[], segments=[]): 766 """ 767 ArvadosFile constructor. 768 769 :stream: 770 a list of Range objects representing a block stream 771 772 :segments: 773 a list of Range objects representing segments 774 """ 775 self.parent = parent 776 self.name = name 777 self._writers = set() 778 self._committed = False 779 self._segments = [] 780 self.lock = parent.root_collection().lock 781 for s in segments: 782 self._add_segment(stream, s.locator, s.range_size) 783 self._current_bblock = None 784 self._read_counter = 0 785 786 def writable(self): 787 return self.parent.writable() 788 789 @synchronized 790 def permission_expired(self, as_of_dt=None): 791 """Returns True if any of the segment's locators is expired""" 792 for r in self._segments: 793 if KeepLocator(r.locator).permission_expired(as_of_dt): 794 return True 795 return False 796 797 @synchronized 798 def has_remote_blocks(self): 799 """Returns True if any of the segment's locators has a +R signature""" 800 801 for s in self._segments: 802 if '+R' in s.locator: 803 return True 804 return False 805 806 @synchronized 807 def _copy_remote_blocks(self, remote_blocks={}): 808 """Ask Keep to copy remote blocks and point to their local copies. 809 810 This is called from the parent Collection. 811 812 :remote_blocks: 813 Shared cache of remote to local block mappings. This is used to avoid 814 doing extra work when blocks are shared by more than one file in 815 different subdirectories. 816 """ 817 818 for s in self._segments: 819 if '+R' in s.locator: 820 try: 821 loc = remote_blocks[s.locator] 822 except KeyError: 823 loc = self.parent._my_keep().refresh_signature(s.locator) 824 remote_blocks[s.locator] = loc 825 s.locator = loc 826 self.parent.set_committed(False) 827 return remote_blocks 828 829 @synchronized 830 def segments(self): 831 return copy.copy(self._segments) 832 833 @synchronized 834 def clone(self, new_parent, new_name): 835 """Make a copy of this file.""" 836 cp = ArvadosFile(new_parent, new_name) 837 cp.replace_contents(self) 838 return cp 839 840 @must_be_writable 841 @synchronized 842 def replace_contents(self, other): 843 """Replace segments of this file with segments from another `ArvadosFile` object.""" 844 845 map_loc = {} 846 self._segments = [] 847 for other_segment in other.segments(): 848 new_loc = other_segment.locator 849 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 850 if other_segment.locator not in map_loc: 851 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 852 if bufferblock.state() != _BufferBlock.WRITABLE: 853 map_loc[other_segment.locator] = bufferblock.locator() 854 else: 855 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 856 new_loc = map_loc[other_segment.locator] 857 858 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 859 860 self.set_committed(False) 861 862 def __eq__(self, other): 863 if other is self: 864 return True 865 if not isinstance(other, ArvadosFile): 866 return False 867 868 othersegs = other.segments() 869 with self.lock: 870 if len(self._segments) != len(othersegs): 871 return False 872 for i in range(0, len(othersegs)): 873 seg1 = self._segments[i] 874 seg2 = othersegs[i] 875 loc1 = seg1.locator 876 loc2 = seg2.locator 877 878 if self.parent._my_block_manager().is_bufferblock(loc1): 879 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 880 881 if other.parent._my_block_manager().is_bufferblock(loc2): 882 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 883 884 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 885 seg1.range_start != seg2.range_start or 886 seg1.range_size != seg2.range_size or 887 seg1.segment_offset != seg2.segment_offset): 888 return False 889 890 return True 891 892 def __ne__(self, other): 893 return not self.__eq__(other) 894 895 @synchronized 896 def set_segments(self, segs): 897 self._segments = segs 898 899 @synchronized 900 def set_committed(self, value=True): 901 """Set committed flag. 902 903 If value is True, set committed to be True. 904 905 If value is False, set committed to be False for this and all parents. 906 """ 907 if value == self._committed: 908 return 909 self._committed = value 910 if self._committed is False and self.parent is not None: 911 self.parent.set_committed(False) 912 913 @synchronized 914 def committed(self): 915 """Get whether this is committed or not.""" 916 return self._committed 917 918 @synchronized 919 def add_writer(self, writer): 920 """Add an ArvadosFileWriter reference to the list of writers""" 921 if isinstance(writer, ArvadosFileWriter): 922 self._writers.add(writer) 923 924 @synchronized 925 def remove_writer(self, writer, flush): 926 """ 927 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 928 and do some block maintenance tasks. 929 """ 930 self._writers.remove(writer) 931 932 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 933 # File writer closed, not small enough for repacking 934 self.flush() 935 elif self.closed(): 936 # All writers closed and size is adequate for repacking 937 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 938 939 def closed(self): 940 """ 941 Get whether this is closed or not. When the writers list is empty, the file 942 is supposed to be closed. 943 """ 944 return len(self._writers) == 0 945 946 @must_be_writable 947 @synchronized 948 def truncate(self, size): 949 """Shrink or expand the size of the file. 950 951 If `size` is less than the size of the file, the file contents after 952 `size` will be discarded. If `size` is greater than the current size 953 of the file, it will be filled with zero bytes. 954 955 """ 956 if size < self.size(): 957 new_segs = [] 958 for r in self._segments: 959 range_end = r.range_start+r.range_size 960 if r.range_start >= size: 961 # segment is past the trucate size, all done 962 break 963 elif size < range_end: 964 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0) 965 nr.segment_offset = r.segment_offset 966 new_segs.append(nr) 967 break 968 else: 969 new_segs.append(r) 970 971 self._segments = new_segs 972 self.set_committed(False) 973 elif size > self.size(): 974 padding = self.parent._my_block_manager().get_padding_block() 975 diff = size - self.size() 976 while diff > config.KEEP_BLOCK_SIZE: 977 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 978 diff -= config.KEEP_BLOCK_SIZE 979 if diff > 0: 980 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0)) 981 self.set_committed(False) 982 else: 983 # size == self.size() 984 pass 985 986 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 987 """Read up to `size` bytes from the file starting at `offset`. 988 989 Arguments: 990 991 * exact: bool --- If False (default), return less data than 992 requested if the read crosses a block boundary and the next 993 block isn't cached. If True, only return less data than 994 requested when hitting EOF. 995 996 * return_memoryview: bool --- If False (default) return a 997 `bytes` object, which may entail making a copy in some 998 situations. If True, return a `memoryview` object which may 999 avoid making a copy, but may be incompatible with code 1000 expecting a `bytes` object. 1001 1002 """ 1003 1004 with self.lock: 1005 if size == 0 or offset >= self.size(): 1006 return memoryview(b'') if return_memoryview else b'' 1007 readsegs = streams.locators_and_ranges(self._segments, offset, size) 1008 1009 prefetch = None 1010 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1011 if prefetch_lookahead: 1012 # Doing prefetch on every read() call is surprisingly expensive 1013 # when we're trying to deliver data at 600+ MiBps and want 1014 # the read() fast path to be as lightweight as possible. 1015 # 1016 # Only prefetching every 128 read operations 1017 # dramatically reduces the overhead while still 1018 # getting the benefit of prefetching (e.g. when 1019 # reading 128 KiB at a time, it checks for prefetch 1020 # every 16 MiB). 1021 self._read_counter = (self._read_counter+1) % 128 1022 if self._read_counter == 1: 1023 prefetch = streams.locators_and_ranges( 1024 self._segments, 1025 offset + size, 1026 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1027 limit=(1+prefetch_lookahead), 1028 ) 1029 1030 locs = set() 1031 data = [] 1032 for lr in readsegs: 1033 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1034 if block: 1035 blockview = memoryview(block) 1036 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1037 locs.add(lr.locator) 1038 else: 1039 break 1040 1041 if prefetch: 1042 for lr in prefetch: 1043 if lr.locator not in locs: 1044 self.parent._my_block_manager().block_prefetch(lr.locator) 1045 locs.add(lr.locator) 1046 1047 if len(data) == 1: 1048 return data[0] if return_memoryview else data[0].tobytes() 1049 else: 1050 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1051 1052 1053 @must_be_writable 1054 @synchronized 1055 def writeto(self, offset, data, num_retries): 1056 """Write `data` to the file starting at `offset`. 1057 1058 This will update existing bytes and/or extend the size of the file as 1059 necessary. 1060 1061 """ 1062 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1063 data = data.encode() 1064 if len(data) == 0: 1065 return 1066 1067 if offset > self.size(): 1068 self.truncate(offset) 1069 1070 if len(data) > config.KEEP_BLOCK_SIZE: 1071 # Chunk it up into smaller writes 1072 n = 0 1073 dataview = memoryview(data) 1074 while n < len(data): 1075 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1076 n += config.KEEP_BLOCK_SIZE 1077 return 1078 1079 self.set_committed(False) 1080 1081 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1082 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1083 1084 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1085 self._current_bblock.repack_writes() 1086 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1087 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1088 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1089 1090 self._current_bblock.append(data) 1091 streams.replace_range( 1092 self._segments, 1093 offset, 1094 len(data), 1095 self._current_bblock.blockid, 1096 self._current_bblock.write_pointer - len(data), 1097 ) 1098 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1099 return len(data) 1100 1101 @synchronized 1102 def flush(self, sync=True, num_retries=0): 1103 """Flush the current bufferblock to Keep. 1104 1105 :sync: 1106 If True, commit block synchronously, wait until buffer block has been written. 1107 If False, commit block asynchronously, return immediately after putting block into 1108 the keep put queue. 1109 """ 1110 if self.committed(): 1111 return 1112 1113 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1114 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1115 self._current_bblock.repack_writes() 1116 if self._current_bblock.state() != _BufferBlock.DELETED: 1117 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1118 1119 if sync: 1120 to_delete = set() 1121 for s in self._segments: 1122 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1123 if bb: 1124 if bb.state() != _BufferBlock.COMMITTED: 1125 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1126 to_delete.add(s.locator) 1127 s.locator = bb.locator() 1128 for s in to_delete: 1129 # Don't delete the bufferblock if it's owned by many files. It'll be 1130 # deleted after all of its owners are flush()ed. 1131 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1132 self.parent._my_block_manager().delete_bufferblock(s) 1133 1134 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1135 1136 @must_be_writable 1137 @synchronized 1138 def add_segment(self, blocks, pos, size): 1139 """Add a segment to the end of the file. 1140 1141 `pos` and `offset` reference a section of the stream described by 1142 `blocks` (a list of Range objects) 1143 1144 """ 1145 self._add_segment(blocks, pos, size) 1146 1147 def _add_segment(self, blocks, pos, size): 1148 """Internal implementation of add_segment.""" 1149 self.set_committed(False) 1150 for lr in streams.locators_and_ranges(blocks, pos, size): 1151 last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0) 1152 r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1153 self._segments.append(r) 1154 1155 @synchronized 1156 def size(self): 1157 """Get the file size.""" 1158 if self._segments: 1159 n = self._segments[-1] 1160 return n.range_start + n.range_size 1161 else: 1162 return 0 1163 1164 @synchronized 1165 def manifest_text(self, stream_name=".", portable_locators=False, 1166 normalize=False, only_committed=False): 1167 buf = "" 1168 filestream = [] 1169 for segment in self._segments: 1170 loc = segment.locator 1171 if self.parent._my_block_manager().is_bufferblock(loc): 1172 if only_committed: 1173 continue 1174 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1175 if portable_locators: 1176 loc = KeepLocator(loc).stripped() 1177 filestream.append(streams.LocatorAndRange( 1178 loc, 1179 KeepLocator(loc).size, 1180 segment.segment_offset, 1181 segment.range_size, 1182 )) 1183 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream})) 1184 buf += "\n" 1185 return buf 1186 1187 @must_be_writable 1188 @synchronized 1189 def _reparent(self, newparent, newname): 1190 self.set_committed(False) 1191 self.flush(sync=True) 1192 self.parent.remove(self.name) 1193 self.parent = newparent 1194 self.name = newname 1195 self.lock = self.parent.root_collection().lock
Represent a file in a Collection.
ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.
This object may be accessed from multiple threads.
765 def __init__(self, parent, name, stream=[], segments=[]): 766 """ 767 ArvadosFile constructor. 768 769 :stream: 770 a list of Range objects representing a block stream 771 772 :segments: 773 a list of Range objects representing segments 774 """ 775 self.parent = parent 776 self.name = name 777 self._writers = set() 778 self._committed = False 779 self._segments = [] 780 self.lock = parent.root_collection().lock 781 for s in segments: 782 self._add_segment(stream, s.locator, s.range_size) 783 self._current_bblock = None 784 self._read_counter = 0
ArvadosFile constructor.
:stream: a list of Range objects representing a block stream
:segments: a list of Range objects representing segments
789 @synchronized 790 def permission_expired(self, as_of_dt=None): 791 """Returns True if any of the segment's locators is expired""" 792 for r in self._segments: 793 if KeepLocator(r.locator).permission_expired(as_of_dt): 794 return True 795 return False
Returns True if any of the segment’s locators is expired
797 @synchronized 798 def has_remote_blocks(self): 799 """Returns True if any of the segment's locators has a +R signature""" 800 801 for s in self._segments: 802 if '+R' in s.locator: 803 return True 804 return False
Returns True if any of the segment’s locators has a +R signature
833 @synchronized 834 def clone(self, new_parent, new_name): 835 """Make a copy of this file.""" 836 cp = ArvadosFile(new_parent, new_name) 837 cp.replace_contents(self) 838 return cp
Make a copy of this file.
840 @must_be_writable 841 @synchronized 842 def replace_contents(self, other): 843 """Replace segments of this file with segments from another `ArvadosFile` object.""" 844 845 map_loc = {} 846 self._segments = [] 847 for other_segment in other.segments(): 848 new_loc = other_segment.locator 849 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 850 if other_segment.locator not in map_loc: 851 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 852 if bufferblock.state() != _BufferBlock.WRITABLE: 853 map_loc[other_segment.locator] = bufferblock.locator() 854 else: 855 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 856 new_loc = map_loc[other_segment.locator] 857 858 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 859 860 self.set_committed(False)
Replace segments of this file with segments from another ArvadosFile
object.
899 @synchronized 900 def set_committed(self, value=True): 901 """Set committed flag. 902 903 If value is True, set committed to be True. 904 905 If value is False, set committed to be False for this and all parents. 906 """ 907 if value == self._committed: 908 return 909 self._committed = value 910 if self._committed is False and self.parent is not None: 911 self.parent.set_committed(False)
Set committed flag.
If value is True, set committed to be True.
If value is False, set committed to be False for this and all parents.
913 @synchronized 914 def committed(self): 915 """Get whether this is committed or not.""" 916 return self._committed
Get whether this is committed or not.
918 @synchronized 919 def add_writer(self, writer): 920 """Add an ArvadosFileWriter reference to the list of writers""" 921 if isinstance(writer, ArvadosFileWriter): 922 self._writers.add(writer)
Add an ArvadosFileWriter reference to the list of writers
924 @synchronized 925 def remove_writer(self, writer, flush): 926 """ 927 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 928 and do some block maintenance tasks. 929 """ 930 self._writers.remove(writer) 931 932 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 933 # File writer closed, not small enough for repacking 934 self.flush() 935 elif self.closed(): 936 # All writers closed and size is adequate for repacking 937 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.
939 def closed(self): 940 """ 941 Get whether this is closed or not. When the writers list is empty, the file 942 is supposed to be closed. 943 """ 944 return len(self._writers) == 0
Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.
946 @must_be_writable 947 @synchronized 948 def truncate(self, size): 949 """Shrink or expand the size of the file. 950 951 If `size` is less than the size of the file, the file contents after 952 `size` will be discarded. If `size` is greater than the current size 953 of the file, it will be filled with zero bytes. 954 955 """ 956 if size < self.size(): 957 new_segs = [] 958 for r in self._segments: 959 range_end = r.range_start+r.range_size 960 if r.range_start >= size: 961 # segment is past the trucate size, all done 962 break 963 elif size < range_end: 964 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0) 965 nr.segment_offset = r.segment_offset 966 new_segs.append(nr) 967 break 968 else: 969 new_segs.append(r) 970 971 self._segments = new_segs 972 self.set_committed(False) 973 elif size > self.size(): 974 padding = self.parent._my_block_manager().get_padding_block() 975 diff = size - self.size() 976 while diff > config.KEEP_BLOCK_SIZE: 977 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 978 diff -= config.KEEP_BLOCK_SIZE 979 if diff > 0: 980 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0)) 981 self.set_committed(False) 982 else: 983 # size == self.size() 984 pass
986 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 987 """Read up to `size` bytes from the file starting at `offset`. 988 989 Arguments: 990 991 * exact: bool --- If False (default), return less data than 992 requested if the read crosses a block boundary and the next 993 block isn't cached. If True, only return less data than 994 requested when hitting EOF. 995 996 * return_memoryview: bool --- If False (default) return a 997 `bytes` object, which may entail making a copy in some 998 situations. If True, return a `memoryview` object which may 999 avoid making a copy, but may be incompatible with code 1000 expecting a `bytes` object. 1001 1002 """ 1003 1004 with self.lock: 1005 if size == 0 or offset >= self.size(): 1006 return memoryview(b'') if return_memoryview else b'' 1007 readsegs = streams.locators_and_ranges(self._segments, offset, size) 1008 1009 prefetch = None 1010 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1011 if prefetch_lookahead: 1012 # Doing prefetch on every read() call is surprisingly expensive 1013 # when we're trying to deliver data at 600+ MiBps and want 1014 # the read() fast path to be as lightweight as possible. 1015 # 1016 # Only prefetching every 128 read operations 1017 # dramatically reduces the overhead while still 1018 # getting the benefit of prefetching (e.g. when 1019 # reading 128 KiB at a time, it checks for prefetch 1020 # every 16 MiB). 1021 self._read_counter = (self._read_counter+1) % 128 1022 if self._read_counter == 1: 1023 prefetch = streams.locators_and_ranges( 1024 self._segments, 1025 offset + size, 1026 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1027 limit=(1+prefetch_lookahead), 1028 ) 1029 1030 locs = set() 1031 data = [] 1032 for lr in readsegs: 1033 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1034 if block: 1035 blockview = memoryview(block) 1036 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1037 locs.add(lr.locator) 1038 else: 1039 break 1040 1041 if prefetch: 1042 for lr in prefetch: 1043 if lr.locator not in locs: 1044 self.parent._my_block_manager().block_prefetch(lr.locator) 1045 locs.add(lr.locator) 1046 1047 if len(data) == 1: 1048 return data[0] if return_memoryview else data[0].tobytes() 1049 else: 1050 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
Read up to size
bytes from the file starting at offset
.
Arguments:
exact: bool — If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.
return_memoryview: bool — If False (default) return a
bytes
object, which may entail making a copy in some situations. If True, return amemoryview
object which may avoid making a copy, but may be incompatible with code expecting abytes
object.
1053 @must_be_writable 1054 @synchronized 1055 def writeto(self, offset, data, num_retries): 1056 """Write `data` to the file starting at `offset`. 1057 1058 This will update existing bytes and/or extend the size of the file as 1059 necessary. 1060 1061 """ 1062 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1063 data = data.encode() 1064 if len(data) == 0: 1065 return 1066 1067 if offset > self.size(): 1068 self.truncate(offset) 1069 1070 if len(data) > config.KEEP_BLOCK_SIZE: 1071 # Chunk it up into smaller writes 1072 n = 0 1073 dataview = memoryview(data) 1074 while n < len(data): 1075 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1076 n += config.KEEP_BLOCK_SIZE 1077 return 1078 1079 self.set_committed(False) 1080 1081 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1082 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1083 1084 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1085 self._current_bblock.repack_writes() 1086 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1087 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1088 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1089 1090 self._current_bblock.append(data) 1091 streams.replace_range( 1092 self._segments, 1093 offset, 1094 len(data), 1095 self._current_bblock.blockid, 1096 self._current_bblock.write_pointer - len(data), 1097 ) 1098 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1099 return len(data)
Write data
to the file starting at offset
.
This will update existing bytes and/or extend the size of the file as necessary.
1101 @synchronized 1102 def flush(self, sync=True, num_retries=0): 1103 """Flush the current bufferblock to Keep. 1104 1105 :sync: 1106 If True, commit block synchronously, wait until buffer block has been written. 1107 If False, commit block asynchronously, return immediately after putting block into 1108 the keep put queue. 1109 """ 1110 if self.committed(): 1111 return 1112 1113 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1114 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1115 self._current_bblock.repack_writes() 1116 if self._current_bblock.state() != _BufferBlock.DELETED: 1117 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1118 1119 if sync: 1120 to_delete = set() 1121 for s in self._segments: 1122 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1123 if bb: 1124 if bb.state() != _BufferBlock.COMMITTED: 1125 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1126 to_delete.add(s.locator) 1127 s.locator = bb.locator() 1128 for s in to_delete: 1129 # Don't delete the bufferblock if it's owned by many files. It'll be 1130 # deleted after all of its owners are flush()ed. 1131 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1132 self.parent._my_block_manager().delete_bufferblock(s) 1133 1134 self.parent.notify(MOD, self.parent, self.name, (self, self))
Flush the current bufferblock to Keep.
:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.
1136 @must_be_writable 1137 @synchronized 1138 def add_segment(self, blocks, pos, size): 1139 """Add a segment to the end of the file. 1140 1141 `pos` and `offset` reference a section of the stream described by 1142 `blocks` (a list of Range objects) 1143 1144 """ 1145 self._add_segment(blocks, pos, size)
Add a segment to the end of the file.
pos
and offset
reference a section of the stream described by
blocks
(a list of Range objects)
1155 @synchronized 1156 def size(self): 1157 """Get the file size.""" 1158 if self._segments: 1159 n = self._segments[-1] 1160 return n.range_start + n.range_size 1161 else: 1162 return 0
Get the file size.
1164 @synchronized 1165 def manifest_text(self, stream_name=".", portable_locators=False, 1166 normalize=False, only_committed=False): 1167 buf = "" 1168 filestream = [] 1169 for segment in self._segments: 1170 loc = segment.locator 1171 if self.parent._my_block_manager().is_bufferblock(loc): 1172 if only_committed: 1173 continue 1174 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1175 if portable_locators: 1176 loc = KeepLocator(loc).stripped() 1177 filestream.append(streams.LocatorAndRange( 1178 loc, 1179 KeepLocator(loc).size, 1180 segment.segment_offset, 1181 segment.range_size, 1182 )) 1183 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream})) 1184 buf += "\n" 1185 return buf
1198class ArvadosFileReader(ArvadosFileReaderBase): 1199 """Wraps ArvadosFile in a file-like object supporting reading only. 1200 1201 Be aware that this class is NOT thread safe as there is no locking around 1202 updating file pointer. 1203 1204 """ 1205 1206 def __init__(self, arvadosfile, mode="r", num_retries=None): 1207 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1208 self.arvadosfile = arvadosfile 1209 1210 def size(self): 1211 return self.arvadosfile.size() 1212 1213 def stream_name(self): 1214 return self.arvadosfile.parent.stream_name() 1215 1216 def readinto(self, b): 1217 data = self.read(len(b)) 1218 b[:len(data)] = data 1219 return len(data) 1220 1221 @_FileLikeObjectBase._before_close 1222 @retry_method 1223 def read(self, size=-1, num_retries=None, return_memoryview=False): 1224 """Read up to `size` bytes from the file and return the result. 1225 1226 Starts at the current file position. If `size` is negative or None, 1227 read the entire remainder of the file. 1228 1229 Returns None if the file pointer is at the end of the file. 1230 1231 Returns a `bytes` object, unless `return_memoryview` is True, 1232 in which case it returns a memory view, which may avoid an 1233 unnecessary data copy in some situations. 1234 1235 """ 1236 if size < 0 or size is None: 1237 data = [] 1238 # 1239 # specify exact=False, return_memoryview=True here so that we 1240 # only copy data once into the final buffer. 1241 # 1242 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1243 while rd: 1244 data.append(rd) 1245 self._filepos += len(rd) 1246 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1247 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1248 else: 1249 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1250 self._filepos += len(data) 1251 return data 1252 1253 @_FileLikeObjectBase._before_close 1254 @retry_method 1255 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1256 """Read up to `size` bytes from the stream, starting at the specified file offset. 1257 1258 This method does not change the file position. 1259 1260 Returns a `bytes` object, unless `return_memoryview` is True, 1261 in which case it returns a memory view, which may avoid an 1262 unnecessary data copy in some situations. 1263 1264 """ 1265 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview) 1266 1267 def flush(self): 1268 pass
Wraps ArvadosFile in a file-like object supporting reading only.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1221 @_FileLikeObjectBase._before_close 1222 @retry_method 1223 def read(self, size=-1, num_retries=None, return_memoryview=False): 1224 """Read up to `size` bytes from the file and return the result. 1225 1226 Starts at the current file position. If `size` is negative or None, 1227 read the entire remainder of the file. 1228 1229 Returns None if the file pointer is at the end of the file. 1230 1231 Returns a `bytes` object, unless `return_memoryview` is True, 1232 in which case it returns a memory view, which may avoid an 1233 unnecessary data copy in some situations. 1234 1235 """ 1236 if size < 0 or size is None: 1237 data = [] 1238 # 1239 # specify exact=False, return_memoryview=True here so that we 1240 # only copy data once into the final buffer. 1241 # 1242 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1243 while rd: 1244 data.append(rd) 1245 self._filepos += len(rd) 1246 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1247 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1248 else: 1249 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1250 self._filepos += len(data) 1251 return data
Read up to size
bytes from the file and return the result.
Starts at the current file position. If size
is negative or None,
read the entire remainder of the file.
Returns None if the file pointer is at the end of the file.
Returns a bytes
object, unless return_memoryview
is True,
in which case it returns a memory view, which may avoid an
unnecessary data copy in some situations.
1253 @_FileLikeObjectBase._before_close 1254 @retry_method 1255 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1256 """Read up to `size` bytes from the stream, starting at the specified file offset. 1257 1258 This method does not change the file position. 1259 1260 Returns a `bytes` object, unless `return_memoryview` is True, 1261 in which case it returns a memory view, which may avoid an 1262 unnecessary data copy in some situations. 1263 1264 """ 1265 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
Read up to size
bytes from the stream, starting at the specified file offset.
This method does not change the file position.
Returns a bytes
object, unless return_memoryview
is True,
in which case it returns a memory view, which may avoid an
unnecessary data copy in some situations.
1271class ArvadosFileWriter(ArvadosFileReader): 1272 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1273 1274 Be aware that this class is NOT thread safe as there is no locking around 1275 updating file pointer. 1276 1277 """ 1278 1279 def __init__(self, arvadosfile, mode, num_retries=None): 1280 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1281 self.arvadosfile.add_writer(self) 1282 1283 def writable(self): 1284 return True 1285 1286 @_FileLikeObjectBase._before_close 1287 @retry_method 1288 def write(self, data, num_retries=None): 1289 if self.mode[0] == "a": 1290 self._filepos = self.size() 1291 self.arvadosfile.writeto(self._filepos, data, num_retries) 1292 self._filepos += len(data) 1293 return len(data) 1294 1295 @_FileLikeObjectBase._before_close 1296 @retry_method 1297 def writelines(self, seq, num_retries=None): 1298 for s in seq: 1299 self.write(s, num_retries=num_retries) 1300 1301 @_FileLikeObjectBase._before_close 1302 def truncate(self, size=None): 1303 if size is None: 1304 size = self._filepos 1305 self.arvadosfile.truncate(size) 1306 1307 @_FileLikeObjectBase._before_close 1308 def flush(self): 1309 self.arvadosfile.flush() 1310 1311 def close(self, flush=True): 1312 if not self.closed: 1313 self.arvadosfile.remove_writer(self, flush) 1314 super(ArvadosFileWriter, self).close()
Wraps ArvadosFile in a file-like object supporting both reading and writing.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1317class WrappableFile(object): 1318 """An interface to an Arvados file that's compatible with io wrappers. 1319 1320 """ 1321 def __init__(self, f): 1322 self.f = f 1323 self.closed = False 1324 def close(self): 1325 self.closed = True 1326 return self.f.close() 1327 def flush(self): 1328 return self.f.flush() 1329 def read(self, *args, **kwargs): 1330 return self.f.read(*args, **kwargs) 1331 def readable(self): 1332 return self.f.readable() 1333 def readinto(self, *args, **kwargs): 1334 return self.f.readinto(*args, **kwargs) 1335 def seek(self, *args, **kwargs): 1336 return self.f.seek(*args, **kwargs) 1337 def seekable(self): 1338 return self.f.seekable() 1339 def tell(self): 1340 return self.f.tell() 1341 def writable(self): 1342 return self.f.writable() 1343 def write(self, *args, **kwargs): 1344 return self.f.write(*args, **kwargs)
An interface to an Arvados file that’s compatible with io wrappers.