arvados.arvfile
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import bz2 6import collections 7import copy 8import errno 9import functools 10import hashlib 11import logging 12import os 13import queue 14import re 15import sys 16import threading 17import uuid 18import zlib 19 20from . import config 21from ._internal import streams 22from .errors import KeepWriteError, AssertionError, ArgumentError 23from .keep import KeepLocator 24from .retry import retry_method 25 26ADD = "add" 27"""Argument value for `Collection` methods to represent an added item""" 28DEL = "del" 29"""Argument value for `Collection` methods to represent a removed item""" 30MOD = "mod" 31"""Argument value for `Collection` methods to represent a modified item""" 32TOK = "tok" 33"""Argument value for `Collection` methods to represent an item with token differences""" 34WRITE = "write" 35"""Argument value for `Collection` methods to represent that a file was written to""" 36 37_logger = logging.getLogger('arvados.arvfile') 38 39def split(path): 40 """split(path) -> streamname, filename 41 42 Separate the stream name and file name in a /-separated stream path and 43 return a tuple (stream_name, file_name). If no stream name is available, 44 assume '.'. 45 46 """ 47 try: 48 stream_name, file_name = path.rsplit('/', 1) 49 except ValueError: # No / in string 50 stream_name, file_name = '.', path 51 return stream_name, file_name 52 53 54class UnownedBlockError(Exception): 55 """Raised when there's an writable block without an owner on the BlockManager.""" 56 pass 57 58 59class _FileLikeObjectBase(object): 60 def __init__(self, name, mode): 61 self.name = name 62 self.mode = mode 63 self.closed = False 64 65 @staticmethod 66 def _before_close(orig_func): 67 @functools.wraps(orig_func) 68 def before_close_wrapper(self, *args, **kwargs): 69 if self.closed: 70 raise ValueError("I/O operation on closed stream file") 71 return orig_func(self, *args, **kwargs) 72 return before_close_wrapper 73 74 def __enter__(self): 75 return self 76 77 def __exit__(self, exc_type, exc_value, traceback): 78 try: 79 self.close() 80 except Exception: 81 if exc_type is None: 82 raise 83 84 def close(self): 85 self.closed = True 86 87 88class ArvadosFileReaderBase(_FileLikeObjectBase): 89 def __init__(self, name, mode, num_retries=None): 90 super(ArvadosFileReaderBase, self).__init__(name, mode) 91 self._filepos = 0 92 self.num_retries = num_retries 93 self._readline_cache = (None, None) 94 95 def __iter__(self): 96 while True: 97 data = self.readline() 98 if not data: 99 break 100 yield data 101 102 def decompressed_name(self): 103 return re.sub(r'\.(bz2|gz)$', '', self.name) 104 105 @_FileLikeObjectBase._before_close 106 def seek(self, pos, whence=os.SEEK_SET): 107 if whence == os.SEEK_CUR: 108 pos += self._filepos 109 elif whence == os.SEEK_END: 110 pos += self.size() 111 if pos < 0: 112 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 113 self._filepos = pos 114 return self._filepos 115 116 def tell(self): 117 return self._filepos 118 119 def readable(self): 120 return True 121 122 def writable(self): 123 return False 124 125 def seekable(self): 126 return True 127 128 @_FileLikeObjectBase._before_close 129 @retry_method 130 def readall(self, size=2**20, num_retries=None): 131 while True: 132 data = self.read(size, num_retries=num_retries) 133 if len(data) == 0: 134 break 135 yield data 136 137 @_FileLikeObjectBase._before_close 138 @retry_method 139 def readline(self, size=float('inf'), num_retries=None): 140 cache_pos, cache_data = self._readline_cache 141 if self.tell() == cache_pos: 142 data = [cache_data] 143 self._filepos += len(cache_data) 144 else: 145 data = [b''] 146 data_size = len(data[-1]) 147 while (data_size < size) and (b'\n' not in data[-1]): 148 next_read = self.read(2 ** 20, num_retries=num_retries) 149 if not next_read: 150 break 151 data.append(next_read) 152 data_size += len(next_read) 153 data = b''.join(data) 154 try: 155 nextline_index = data.index(b'\n') + 1 156 except ValueError: 157 nextline_index = len(data) 158 nextline_index = min(nextline_index, size) 159 self._filepos -= len(data) - nextline_index 160 self._readline_cache = (self.tell(), data[nextline_index:]) 161 return data[:nextline_index].decode() 162 163 @_FileLikeObjectBase._before_close 164 @retry_method 165 def decompress(self, decompress, size, num_retries=None): 166 for segment in self.readall(size, num_retries=num_retries): 167 data = decompress(segment) 168 if data: 169 yield data 170 171 @_FileLikeObjectBase._before_close 172 @retry_method 173 def readall_decompressed(self, size=2**20, num_retries=None): 174 self.seek(0) 175 if self.name.endswith('.bz2'): 176 dc = bz2.BZ2Decompressor() 177 return self.decompress(dc.decompress, size, 178 num_retries=num_retries) 179 elif self.name.endswith('.gz'): 180 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 181 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 182 size, num_retries=num_retries) 183 else: 184 return self.readall(size, num_retries=num_retries) 185 186 @_FileLikeObjectBase._before_close 187 @retry_method 188 def readlines(self, sizehint=float('inf'), num_retries=None): 189 data = [] 190 data_size = 0 191 for s in self.readall(num_retries=num_retries): 192 data.append(s) 193 data_size += len(s) 194 if data_size >= sizehint: 195 break 196 return b''.join(data).decode().splitlines(True) 197 198 def size(self): 199 raise IOError(errno.ENOSYS, "Not implemented") 200 201 def read(self, size, num_retries=None): 202 raise IOError(errno.ENOSYS, "Not implemented") 203 204 def readfrom(self, start, size, num_retries=None): 205 raise IOError(errno.ENOSYS, "Not implemented") 206 207 208def synchronized(orig_func): 209 @functools.wraps(orig_func) 210 def synchronized_wrapper(self, *args, **kwargs): 211 with self.lock: 212 return orig_func(self, *args, **kwargs) 213 return synchronized_wrapper 214 215 216class StateChangeError(Exception): 217 def __init__(self, message, state, nextstate): 218 super(StateChangeError, self).__init__(message) 219 self.state = state 220 self.nextstate = nextstate 221 222class _BufferBlock(object): 223 """A stand-in for a Keep block that is in the process of being written. 224 225 Writers can append to it, get the size, and compute the Keep locator. 226 There are three valid states: 227 228 WRITABLE 229 Can append to block. 230 231 PENDING 232 Block is in the process of being uploaded to Keep, append is an error. 233 234 COMMITTED 235 The block has been written to Keep, its internal buffer has been 236 released, fetching the block will fetch it via keep client (since we 237 discarded the internal copy), and identifiers referring to the BufferBlock 238 can be replaced with the block locator. 239 240 """ 241 242 WRITABLE = 0 243 PENDING = 1 244 COMMITTED = 2 245 ERROR = 3 246 DELETED = 4 247 248 def __init__(self, blockid, starting_capacity, owner): 249 """ 250 :blockid: 251 the identifier for this block 252 253 :starting_capacity: 254 the initial buffer capacity 255 256 :owner: 257 ArvadosFile that owns this block 258 259 """ 260 self.blockid = blockid 261 self.buffer_block = bytearray(starting_capacity) 262 self.buffer_view = memoryview(self.buffer_block) 263 self.write_pointer = 0 264 self._state = _BufferBlock.WRITABLE 265 self._locator = None 266 self.owner = owner 267 self.lock = threading.Lock() 268 self.wait_for_commit = threading.Event() 269 self.error = None 270 271 @synchronized 272 def append(self, data): 273 """Append some data to the buffer. 274 275 Only valid if the block is in WRITABLE state. Implements an expanding 276 buffer, doubling capacity as needed to accomdate all the data. 277 278 """ 279 if self._state == _BufferBlock.WRITABLE: 280 if not isinstance(data, bytes) and not isinstance(data, memoryview): 281 data = data.encode() 282 while (self.write_pointer+len(data)) > len(self.buffer_block): 283 new_buffer_block = bytearray(len(self.buffer_block) * 2) 284 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] 285 self.buffer_block = new_buffer_block 286 self.buffer_view = memoryview(self.buffer_block) 287 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data 288 self.write_pointer += len(data) 289 self._locator = None 290 else: 291 raise AssertionError("Buffer block is not writable") 292 293 STATE_TRANSITIONS = frozenset([ 294 (WRITABLE, PENDING), 295 (PENDING, COMMITTED), 296 (PENDING, ERROR), 297 (ERROR, PENDING)]) 298 299 @synchronized 300 def set_state(self, nextstate, val=None): 301 if (self._state, nextstate) not in self.STATE_TRANSITIONS: 302 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) 303 self._state = nextstate 304 305 if self._state == _BufferBlock.PENDING: 306 self.wait_for_commit.clear() 307 308 if self._state == _BufferBlock.COMMITTED: 309 self._locator = val 310 self.buffer_view = None 311 self.buffer_block = None 312 self.wait_for_commit.set() 313 314 if self._state == _BufferBlock.ERROR: 315 self.error = val 316 self.wait_for_commit.set() 317 318 @synchronized 319 def state(self): 320 return self._state 321 322 def size(self): 323 """The amount of data written to the buffer.""" 324 return self.write_pointer 325 326 @synchronized 327 def locator(self): 328 """The Keep locator for this buffer's contents.""" 329 if self._locator is None: 330 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) 331 return self._locator 332 333 @synchronized 334 def clone(self, new_blockid, owner): 335 if self._state == _BufferBlock.COMMITTED: 336 raise AssertionError("Cannot duplicate committed buffer block") 337 bufferblock = _BufferBlock(new_blockid, self.size(), owner) 338 bufferblock.append(self.buffer_view[0:self.size()]) 339 return bufferblock 340 341 @synchronized 342 def clear(self): 343 self._state = _BufferBlock.DELETED 344 self.owner = None 345 self.buffer_block = None 346 self.buffer_view = None 347 348 @synchronized 349 def repack_writes(self): 350 """Optimize buffer block by repacking segments in file sequence. 351 352 When the client makes random writes, they appear in the buffer block in 353 the sequence they were written rather than the sequence they appear in 354 the file. This makes for inefficient, fragmented manifests. Attempt 355 to optimize by repacking writes in file sequence. 356 357 """ 358 if self._state != _BufferBlock.WRITABLE: 359 raise AssertionError("Cannot repack non-writable block") 360 361 segs = self.owner.segments() 362 363 # Collect the segments that reference the buffer block. 364 bufferblock_segs = [s for s in segs if s.locator == self.blockid] 365 366 # Collect total data referenced by segments (could be smaller than 367 # bufferblock size if a portion of the file was written and 368 # then overwritten). 369 write_total = sum([s.range_size for s in bufferblock_segs]) 370 371 if write_total < self.size() or len(bufferblock_segs) > 1: 372 # If there's more than one segment referencing this block, it is 373 # due to out-of-order writes and will produce a fragmented 374 # manifest, so try to optimize by re-packing into a new buffer. 375 contents = self.buffer_view[0:self.write_pointer].tobytes() 376 new_bb = _BufferBlock(None, write_total, None) 377 for t in bufferblock_segs: 378 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) 379 t.segment_offset = new_bb.size() - t.range_size 380 381 self.buffer_block = new_bb.buffer_block 382 self.buffer_view = new_bb.buffer_view 383 self.write_pointer = new_bb.write_pointer 384 self._locator = None 385 new_bb.clear() 386 self.owner.set_segments(segs) 387 388 def __repr__(self): 389 return "<BufferBlock %s>" % (self.blockid) 390 391 392class NoopLock(object): 393 def __enter__(self): 394 return self 395 396 def __exit__(self, exc_type, exc_value, traceback): 397 pass 398 399 def acquire(self, blocking=False): 400 pass 401 402 def release(self): 403 pass 404 405 406def must_be_writable(orig_func): 407 @functools.wraps(orig_func) 408 def must_be_writable_wrapper(self, *args, **kwargs): 409 if not self.writable(): 410 raise IOError(errno.EROFS, "Collection is read-only.") 411 return orig_func(self, *args, **kwargs) 412 return must_be_writable_wrapper 413 414 415class _BlockManager(object): 416 """BlockManager handles buffer blocks. 417 418 Also handles background block uploads, and background block prefetch for a 419 Collection of ArvadosFiles. 420 421 """ 422 423 DEFAULT_PUT_THREADS = 2 424 425 def __init__(self, keep, 426 copies=None, 427 put_threads=None, 428 num_retries=None, 429 storage_classes_func=None): 430 """keep: KeepClient object to use""" 431 self._keep = keep 432 self._bufferblocks = collections.OrderedDict() 433 self._put_queue = None 434 self._put_threads = None 435 self.lock = threading.Lock() 436 self.prefetch_lookahead = self._keep.num_prefetch_threads 437 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS 438 self.copies = copies 439 self.storage_classes = storage_classes_func or (lambda: []) 440 self._pending_write_size = 0 441 self.threads_lock = threading.Lock() 442 self.padding_block = None 443 self.num_retries = num_retries 444 445 @synchronized 446 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 447 """Allocate a new, empty bufferblock in WRITABLE state and return it. 448 449 :blockid: 450 optional block identifier, otherwise one will be automatically assigned 451 452 :starting_capacity: 453 optional capacity, otherwise will use default capacity 454 455 :owner: 456 ArvadosFile that owns this block 457 458 """ 459 return self._alloc_bufferblock(blockid, starting_capacity, owner) 460 461 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 462 if blockid is None: 463 blockid = str(uuid.uuid4()) 464 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) 465 self._bufferblocks[bufferblock.blockid] = bufferblock 466 return bufferblock 467 468 @synchronized 469 def dup_block(self, block, owner): 470 """Create a new bufferblock initialized with the content of an existing bufferblock. 471 472 :block: 473 the buffer block to copy. 474 475 :owner: 476 ArvadosFile that owns the new block 477 478 """ 479 new_blockid = str(uuid.uuid4()) 480 bufferblock = block.clone(new_blockid, owner) 481 self._bufferblocks[bufferblock.blockid] = bufferblock 482 return bufferblock 483 484 @synchronized 485 def is_bufferblock(self, locator): 486 return locator in self._bufferblocks 487 488 def _commit_bufferblock_worker(self): 489 """Background uploader thread.""" 490 491 while True: 492 try: 493 bufferblock = self._put_queue.get() 494 if bufferblock is None: 495 return 496 497 if self.copies is None: 498 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 499 else: 500 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 501 bufferblock.set_state(_BufferBlock.COMMITTED, loc) 502 except Exception as e: 503 bufferblock.set_state(_BufferBlock.ERROR, e) 504 finally: 505 if self._put_queue is not None: 506 self._put_queue.task_done() 507 508 def start_put_threads(self): 509 with self.threads_lock: 510 if self._put_threads is None: 511 # Start uploader threads. 512 513 # If we don't limit the Queue size, the upload queue can quickly 514 # grow to take up gigabytes of RAM if the writing process is 515 # generating data more quickly than it can be sent to the Keep 516 # servers. 517 # 518 # With two upload threads and a queue size of 2, this means up to 4 519 # blocks pending. If they are full 64 MiB blocks, that means up to 520 # 256 MiB of internal buffering, which is the same size as the 521 # default download block cache in KeepClient. 522 self._put_queue = queue.Queue(maxsize=2) 523 524 self._put_threads = [] 525 for i in range(0, self.num_put_threads): 526 thread = threading.Thread(target=self._commit_bufferblock_worker) 527 self._put_threads.append(thread) 528 thread.daemon = True 529 thread.start() 530 531 @synchronized 532 def stop_threads(self): 533 """Shut down and wait for background upload and download threads to finish.""" 534 535 if self._put_threads is not None: 536 for t in self._put_threads: 537 self._put_queue.put(None) 538 for t in self._put_threads: 539 t.join() 540 self._put_threads = None 541 self._put_queue = None 542 543 def __enter__(self): 544 return self 545 546 def __exit__(self, exc_type, exc_value, traceback): 547 self.stop_threads() 548 549 @synchronized 550 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): 551 """Packs small blocks together before uploading""" 552 553 self._pending_write_size += closed_file_size 554 555 # Check if there are enough small blocks for filling up one in full 556 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): 557 return 558 559 # Search blocks ready for getting packed together before being 560 # committed to Keep. 561 # A WRITABLE block always has an owner. 562 # A WRITABLE block with its owner.closed() implies that its 563 # size is <= KEEP_BLOCK_SIZE/2. 564 try: 565 small_blocks = [b for b in self._bufferblocks.values() 566 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] 567 except AttributeError: 568 # Writable blocks without owner shouldn't exist. 569 raise UnownedBlockError() 570 571 if len(small_blocks) <= 1: 572 # Not enough small blocks for repacking 573 return 574 575 for bb in small_blocks: 576 bb.repack_writes() 577 578 # Update the pending write size count with its true value, just in case 579 # some small file was opened, written and closed several times. 580 self._pending_write_size = sum([b.size() for b in small_blocks]) 581 582 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: 583 return 584 585 new_bb = self._alloc_bufferblock() 586 new_bb.owner = [] 587 files = [] 588 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: 589 bb = small_blocks.pop(0) 590 new_bb.owner.append(bb.owner) 591 self._pending_write_size -= bb.size() 592 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) 593 files.append((bb, new_bb.write_pointer - bb.size())) 594 595 self.commit_bufferblock(new_bb, sync=sync) 596 597 for bb, new_bb_segment_offset in files: 598 newsegs = bb.owner.segments() 599 for s in newsegs: 600 if s.locator == bb.blockid: 601 s.locator = new_bb.blockid 602 s.segment_offset = new_bb_segment_offset+s.segment_offset 603 bb.owner.set_segments(newsegs) 604 self._delete_bufferblock(bb.blockid) 605 606 def commit_bufferblock(self, block, sync): 607 """Initiate a background upload of a bufferblock. 608 609 :block: 610 The block object to upload 611 612 :sync: 613 If `sync` is True, upload the block synchronously. 614 If `sync` is False, upload the block asynchronously. This will 615 return immediately unless the upload queue is at capacity, in 616 which case it will wait on an upload queue slot. 617 618 """ 619 try: 620 # Mark the block as PENDING so to disallow any more appends. 621 block.set_state(_BufferBlock.PENDING) 622 except StateChangeError as e: 623 if e.state == _BufferBlock.PENDING: 624 if sync: 625 block.wait_for_commit.wait() 626 else: 627 return 628 if block.state() == _BufferBlock.COMMITTED: 629 return 630 elif block.state() == _BufferBlock.ERROR: 631 raise block.error 632 else: 633 raise 634 635 if sync: 636 try: 637 if self.copies is None: 638 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 639 else: 640 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 641 block.set_state(_BufferBlock.COMMITTED, loc) 642 except Exception as e: 643 block.set_state(_BufferBlock.ERROR, e) 644 raise 645 else: 646 self.start_put_threads() 647 self._put_queue.put(block) 648 649 @synchronized 650 def get_bufferblock(self, locator): 651 return self._bufferblocks.get(locator) 652 653 @synchronized 654 def get_padding_block(self): 655 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding 656 when using truncate() to extend the size of a file. 657 658 For reference (and possible future optimization), the md5sum of the 659 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 660 661 """ 662 663 if self.padding_block is None: 664 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) 665 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE 666 self.commit_bufferblock(self.padding_block, False) 667 return self.padding_block 668 669 @synchronized 670 def delete_bufferblock(self, locator): 671 self._delete_bufferblock(locator) 672 673 def _delete_bufferblock(self, locator): 674 if locator in self._bufferblocks: 675 bb = self._bufferblocks[locator] 676 bb.clear() 677 del self._bufferblocks[locator] 678 679 def get_block_contents(self, locator, num_retries, cache_only=False): 680 """Fetch a block. 681 682 First checks to see if the locator is a BufferBlock and return that, if 683 not, passes the request through to KeepClient.get(). 684 685 """ 686 with self.lock: 687 if locator in self._bufferblocks: 688 bufferblock = self._bufferblocks[locator] 689 if bufferblock.state() != _BufferBlock.COMMITTED: 690 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() 691 else: 692 locator = bufferblock._locator 693 if cache_only: 694 return self._keep.get_from_cache(locator) 695 else: 696 return self._keep.get(locator, num_retries=num_retries) 697 698 def commit_all(self): 699 """Commit all outstanding buffer blocks. 700 701 This is a synchronous call, and will not return until all buffer blocks 702 are uploaded. Raises KeepWriteError() if any blocks failed to upload. 703 704 """ 705 self.repack_small_blocks(force=True, sync=True) 706 707 with self.lock: 708 items = list(self._bufferblocks.items()) 709 710 for k,v in items: 711 if v.state() != _BufferBlock.COMMITTED and v.owner: 712 # Ignore blocks with a list of owners, as if they're not in COMMITTED 713 # state, they're already being committed asynchronously. 714 if isinstance(v.owner, ArvadosFile): 715 v.owner.flush(sync=False) 716 717 with self.lock: 718 if self._put_queue is not None: 719 self._put_queue.join() 720 721 err = [] 722 for k,v in items: 723 if v.state() == _BufferBlock.ERROR: 724 err.append((v.locator(), v.error)) 725 if err: 726 raise KeepWriteError("Error writing some blocks", err, label="block") 727 728 for k,v in items: 729 # flush again with sync=True to remove committed bufferblocks from 730 # the segments. 731 if v.owner: 732 if isinstance(v.owner, ArvadosFile): 733 v.owner.flush(sync=True) 734 elif isinstance(v.owner, list) and len(v.owner) > 0: 735 # This bufferblock is referenced by many files as a result 736 # of repacking small blocks, so don't delete it when flushing 737 # its owners, just do it after flushing them all. 738 for owner in v.owner: 739 owner.flush(sync=True) 740 self.delete_bufferblock(k) 741 742 self.stop_threads() 743 744 def block_prefetch(self, locator): 745 """Initiate a background download of a block. 746 """ 747 748 if not self.prefetch_lookahead: 749 return 750 751 with self.lock: 752 if locator in self._bufferblocks: 753 return 754 755 self._keep.block_prefetch(locator) 756 757 758class ArvadosFile(object): 759 """Represent a file in a Collection. 760 761 ArvadosFile manages the underlying representation of a file in Keep as a 762 sequence of segments spanning a set of blocks, and implements random 763 read/write access. 764 765 This object may be accessed from multiple threads. 766 767 """ 768 769 __slots__ = ('parent', 'name', '_writers', '_committed', 770 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 771 772 def __init__(self, parent, name, stream=[], segments=[]): 773 """ 774 ArvadosFile constructor. 775 776 :stream: 777 a list of Range objects representing a block stream 778 779 :segments: 780 a list of Range objects representing segments 781 """ 782 self.parent = parent 783 self.name = name 784 self._writers = set() 785 self._committed = False 786 self._segments = [] 787 self.lock = parent.root_collection().lock 788 for s in segments: 789 self._add_segment(stream, s.locator, s.range_size) 790 self._current_bblock = None 791 self._read_counter = 0 792 793 def writable(self): 794 return self.parent.writable() 795 796 @synchronized 797 def permission_expired(self, as_of_dt=None): 798 """Returns True if any of the segment's locators is expired""" 799 for r in self._segments: 800 if KeepLocator(r.locator).permission_expired(as_of_dt): 801 return True 802 return False 803 804 @synchronized 805 def has_remote_blocks(self): 806 """Returns True if any of the segment's locators has a +R signature""" 807 808 for s in self._segments: 809 if '+R' in s.locator: 810 return True 811 return False 812 813 @synchronized 814 def _copy_remote_blocks(self, remote_blocks={}): 815 """Ask Keep to copy remote blocks and point to their local copies. 816 817 This is called from the parent Collection. 818 819 :remote_blocks: 820 Shared cache of remote to local block mappings. This is used to avoid 821 doing extra work when blocks are shared by more than one file in 822 different subdirectories. 823 """ 824 825 for s in self._segments: 826 if '+R' in s.locator: 827 try: 828 loc = remote_blocks[s.locator] 829 except KeyError: 830 loc = self.parent._my_keep().refresh_signature(s.locator) 831 remote_blocks[s.locator] = loc 832 s.locator = loc 833 self.parent.set_committed(False) 834 return remote_blocks 835 836 @synchronized 837 def segments(self): 838 return copy.copy(self._segments) 839 840 @synchronized 841 def clone(self, new_parent, new_name): 842 """Make a copy of this file.""" 843 cp = ArvadosFile(new_parent, new_name) 844 cp.replace_contents(self) 845 return cp 846 847 @must_be_writable 848 @synchronized 849 def replace_contents(self, other): 850 """Replace segments of this file with segments from another `ArvadosFile` object.""" 851 852 eventtype = TOK if self == other else MOD 853 854 map_loc = {} 855 self._segments = [] 856 for other_segment in other.segments(): 857 new_loc = other_segment.locator 858 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 859 if other_segment.locator not in map_loc: 860 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 861 if bufferblock.state() != _BufferBlock.WRITABLE: 862 map_loc[other_segment.locator] = bufferblock.locator() 863 else: 864 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 865 new_loc = map_loc[other_segment.locator] 866 867 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 868 869 self.set_committed(False) 870 self.parent.notify(eventtype, self.parent, self.name, (self, self)) 871 872 def __eq__(self, other): 873 if other is self: 874 return True 875 if not isinstance(other, ArvadosFile): 876 return False 877 878 othersegs = other.segments() 879 with self.lock: 880 if len(self._segments) != len(othersegs): 881 return False 882 for i in range(0, len(othersegs)): 883 seg1 = self._segments[i] 884 seg2 = othersegs[i] 885 loc1 = seg1.locator 886 loc2 = seg2.locator 887 888 if self.parent._my_block_manager().is_bufferblock(loc1): 889 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 890 891 if other.parent._my_block_manager().is_bufferblock(loc2): 892 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 893 894 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 895 seg1.range_start != seg2.range_start or 896 seg1.range_size != seg2.range_size or 897 seg1.segment_offset != seg2.segment_offset): 898 return False 899 900 return True 901 902 def __ne__(self, other): 903 return not self.__eq__(other) 904 905 @synchronized 906 def set_segments(self, segs): 907 self._segments = segs 908 909 @synchronized 910 def set_committed(self, value=True): 911 """Set committed flag. 912 913 If value is True, set committed to be True. 914 915 If value is False, set committed to be False for this and all parents. 916 """ 917 if value == self._committed: 918 return 919 self._committed = value 920 if self._committed is False and self.parent is not None: 921 self.parent.set_committed(False) 922 923 @synchronized 924 def committed(self): 925 """Get whether this is committed or not.""" 926 return self._committed 927 928 @synchronized 929 def add_writer(self, writer): 930 """Add an ArvadosFileWriter reference to the list of writers""" 931 if isinstance(writer, ArvadosFileWriter): 932 self._writers.add(writer) 933 934 @synchronized 935 def remove_writer(self, writer, flush): 936 """ 937 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 938 and do some block maintenance tasks. 939 """ 940 self._writers.remove(writer) 941 942 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 943 # File writer closed, not small enough for repacking 944 self.flush() 945 elif self.closed(): 946 # All writers closed and size is adequate for repacking 947 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 948 949 def closed(self): 950 """ 951 Get whether this is closed or not. When the writers list is empty, the file 952 is supposed to be closed. 953 """ 954 return len(self._writers) == 0 955 956 @must_be_writable 957 @synchronized 958 def truncate(self, size): 959 """Shrink or expand the size of the file. 960 961 If `size` is less than the size of the file, the file contents after 962 `size` will be discarded. If `size` is greater than the current size 963 of the file, it will be filled with zero bytes. 964 965 """ 966 if size < self.size(): 967 new_segs = [] 968 for r in self._segments: 969 range_end = r.range_start+r.range_size 970 if r.range_start >= size: 971 # segment is past the trucate size, all done 972 break 973 elif size < range_end: 974 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0) 975 nr.segment_offset = r.segment_offset 976 new_segs.append(nr) 977 break 978 else: 979 new_segs.append(r) 980 981 self._segments = new_segs 982 self.set_committed(False) 983 elif size > self.size(): 984 padding = self.parent._my_block_manager().get_padding_block() 985 diff = size - self.size() 986 while diff > config.KEEP_BLOCK_SIZE: 987 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 988 diff -= config.KEEP_BLOCK_SIZE 989 if diff > 0: 990 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0)) 991 self.set_committed(False) 992 else: 993 # size == self.size() 994 pass 995 996 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 997 """Read up to `size` bytes from the file starting at `offset`. 998 999 Arguments: 1000 1001 * exact: bool --- If False (default), return less data than 1002 requested if the read crosses a block boundary and the next 1003 block isn't cached. If True, only return less data than 1004 requested when hitting EOF. 1005 1006 * return_memoryview: bool --- If False (default) return a 1007 `bytes` object, which may entail making a copy in some 1008 situations. If True, return a `memoryview` object which may 1009 avoid making a copy, but may be incompatible with code 1010 expecting a `bytes` object. 1011 1012 """ 1013 1014 with self.lock: 1015 if size == 0 or offset >= self.size(): 1016 return memoryview(b'') if return_memoryview else b'' 1017 readsegs = streams.locators_and_ranges(self._segments, offset, size) 1018 1019 prefetch = None 1020 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1021 if prefetch_lookahead: 1022 # Doing prefetch on every read() call is surprisingly expensive 1023 # when we're trying to deliver data at 600+ MiBps and want 1024 # the read() fast path to be as lightweight as possible. 1025 # 1026 # Only prefetching every 128 read operations 1027 # dramatically reduces the overhead while still 1028 # getting the benefit of prefetching (e.g. when 1029 # reading 128 KiB at a time, it checks for prefetch 1030 # every 16 MiB). 1031 self._read_counter = (self._read_counter+1) % 128 1032 if self._read_counter == 1: 1033 prefetch = streams.locators_and_ranges( 1034 self._segments, 1035 offset + size, 1036 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1037 limit=(1+prefetch_lookahead), 1038 ) 1039 1040 locs = set() 1041 data = [] 1042 for lr in readsegs: 1043 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1044 if block: 1045 blockview = memoryview(block) 1046 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1047 locs.add(lr.locator) 1048 else: 1049 break 1050 1051 if prefetch: 1052 for lr in prefetch: 1053 if lr.locator not in locs: 1054 self.parent._my_block_manager().block_prefetch(lr.locator) 1055 locs.add(lr.locator) 1056 1057 if len(data) == 1: 1058 return data[0] if return_memoryview else data[0].tobytes() 1059 else: 1060 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1061 1062 1063 @must_be_writable 1064 @synchronized 1065 def writeto(self, offset, data, num_retries): 1066 """Write `data` to the file starting at `offset`. 1067 1068 This will update existing bytes and/or extend the size of the file as 1069 necessary. 1070 1071 """ 1072 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1073 data = data.encode() 1074 if len(data) == 0: 1075 return 1076 1077 if offset > self.size(): 1078 self.truncate(offset) 1079 1080 if len(data) > config.KEEP_BLOCK_SIZE: 1081 # Chunk it up into smaller writes 1082 n = 0 1083 dataview = memoryview(data) 1084 while n < len(data): 1085 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1086 n += config.KEEP_BLOCK_SIZE 1087 return 1088 1089 self.set_committed(False) 1090 1091 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1092 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1093 1094 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1095 self._current_bblock.repack_writes() 1096 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1097 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1098 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1099 1100 self._current_bblock.append(data) 1101 streams.replace_range( 1102 self._segments, 1103 offset, 1104 len(data), 1105 self._current_bblock.blockid, 1106 self._current_bblock.write_pointer - len(data), 1107 ) 1108 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1109 return len(data) 1110 1111 @synchronized 1112 def flush(self, sync=True, num_retries=0): 1113 """Flush the current bufferblock to Keep. 1114 1115 :sync: 1116 If True, commit block synchronously, wait until buffer block has been written. 1117 If False, commit block asynchronously, return immediately after putting block into 1118 the keep put queue. 1119 """ 1120 if self.committed(): 1121 return 1122 1123 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1124 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1125 self._current_bblock.repack_writes() 1126 if self._current_bblock.state() != _BufferBlock.DELETED: 1127 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1128 1129 if sync: 1130 to_delete = set() 1131 for s in self._segments: 1132 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1133 if bb: 1134 if bb.state() != _BufferBlock.COMMITTED: 1135 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1136 to_delete.add(s.locator) 1137 s.locator = bb.locator() 1138 for s in to_delete: 1139 # Don't delete the bufferblock if it's owned by many files. It'll be 1140 # deleted after all of its owners are flush()ed. 1141 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1142 self.parent._my_block_manager().delete_bufferblock(s) 1143 1144 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1145 1146 @must_be_writable 1147 @synchronized 1148 def add_segment(self, blocks, pos, size): 1149 """Add a segment to the end of the file. 1150 1151 `pos` and `offset` reference a section of the stream described by 1152 `blocks` (a list of Range objects) 1153 1154 """ 1155 self._add_segment(blocks, pos, size) 1156 1157 def _add_segment(self, blocks, pos, size): 1158 """Internal implementation of add_segment.""" 1159 self.set_committed(False) 1160 for lr in streams.locators_and_ranges(blocks, pos, size): 1161 last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0) 1162 r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1163 self._segments.append(r) 1164 1165 @synchronized 1166 def size(self): 1167 """Get the file size.""" 1168 if self._segments: 1169 n = self._segments[-1] 1170 return n.range_start + n.range_size 1171 else: 1172 return 0 1173 1174 @synchronized 1175 def manifest_text(self, stream_name=".", portable_locators=False, 1176 normalize=False, only_committed=False): 1177 buf = "" 1178 filestream = [] 1179 for segment in self._segments: 1180 loc = segment.locator 1181 if self.parent._my_block_manager().is_bufferblock(loc): 1182 if only_committed: 1183 continue 1184 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1185 if portable_locators: 1186 loc = KeepLocator(loc).stripped() 1187 filestream.append(streams.LocatorAndRange( 1188 loc, 1189 KeepLocator(loc).size, 1190 segment.segment_offset, 1191 segment.range_size, 1192 )) 1193 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream})) 1194 buf += "\n" 1195 return buf 1196 1197 @must_be_writable 1198 @synchronized 1199 def _reparent(self, newparent, newname): 1200 self.set_committed(False) 1201 self.flush(sync=True) 1202 self.parent.remove(self.name) 1203 self.parent = newparent 1204 self.name = newname 1205 self.lock = self.parent.root_collection().lock 1206 1207 1208class ArvadosFileReader(ArvadosFileReaderBase): 1209 """Wraps ArvadosFile in a file-like object supporting reading only. 1210 1211 Be aware that this class is NOT thread safe as there is no locking around 1212 updating file pointer. 1213 1214 """ 1215 1216 def __init__(self, arvadosfile, mode="r", num_retries=None): 1217 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1218 self.arvadosfile = arvadosfile 1219 1220 def size(self): 1221 return self.arvadosfile.size() 1222 1223 def stream_name(self): 1224 return self.arvadosfile.parent.stream_name() 1225 1226 def readinto(self, b): 1227 data = self.read(len(b)) 1228 b[:len(data)] = data 1229 return len(data) 1230 1231 @_FileLikeObjectBase._before_close 1232 @retry_method 1233 def read(self, size=-1, num_retries=None, return_memoryview=False): 1234 """Read up to `size` bytes from the file and return the result. 1235 1236 Starts at the current file position. If `size` is negative or None, 1237 read the entire remainder of the file. 1238 1239 Returns None if the file pointer is at the end of the file. 1240 1241 Returns a `bytes` object, unless `return_memoryview` is True, 1242 in which case it returns a memory view, which may avoid an 1243 unnecessary data copy in some situations. 1244 1245 """ 1246 if size < 0 or size is None: 1247 data = [] 1248 # 1249 # specify exact=False, return_memoryview=True here so that we 1250 # only copy data once into the final buffer. 1251 # 1252 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1253 while rd: 1254 data.append(rd) 1255 self._filepos += len(rd) 1256 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1257 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1258 else: 1259 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1260 self._filepos += len(data) 1261 return data 1262 1263 @_FileLikeObjectBase._before_close 1264 @retry_method 1265 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1266 """Read up to `size` bytes from the stream, starting at the specified file offset. 1267 1268 This method does not change the file position. 1269 1270 Returns a `bytes` object, unless `return_memoryview` is True, 1271 in which case it returns a memory view, which may avoid an 1272 unnecessary data copy in some situations. 1273 1274 """ 1275 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview) 1276 1277 def flush(self): 1278 pass 1279 1280 1281class ArvadosFileWriter(ArvadosFileReader): 1282 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1283 1284 Be aware that this class is NOT thread safe as there is no locking around 1285 updating file pointer. 1286 1287 """ 1288 1289 def __init__(self, arvadosfile, mode, num_retries=None): 1290 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1291 self.arvadosfile.add_writer(self) 1292 1293 def writable(self): 1294 return True 1295 1296 @_FileLikeObjectBase._before_close 1297 @retry_method 1298 def write(self, data, num_retries=None): 1299 if self.mode[0] == "a": 1300 self._filepos = self.size() 1301 self.arvadosfile.writeto(self._filepos, data, num_retries) 1302 self._filepos += len(data) 1303 return len(data) 1304 1305 @_FileLikeObjectBase._before_close 1306 @retry_method 1307 def writelines(self, seq, num_retries=None): 1308 for s in seq: 1309 self.write(s, num_retries=num_retries) 1310 1311 @_FileLikeObjectBase._before_close 1312 def truncate(self, size=None): 1313 if size is None: 1314 size = self._filepos 1315 self.arvadosfile.truncate(size) 1316 1317 @_FileLikeObjectBase._before_close 1318 def flush(self): 1319 self.arvadosfile.flush() 1320 1321 def close(self, flush=True): 1322 if not self.closed: 1323 self.arvadosfile.remove_writer(self, flush) 1324 super(ArvadosFileWriter, self).close() 1325 1326 1327class WrappableFile(object): 1328 """An interface to an Arvados file that's compatible with io wrappers. 1329 1330 """ 1331 def __init__(self, f): 1332 self.f = f 1333 self.closed = False 1334 def close(self): 1335 self.closed = True 1336 return self.f.close() 1337 def flush(self): 1338 return self.f.flush() 1339 def read(self, *args, **kwargs): 1340 return self.f.read(*args, **kwargs) 1341 def readable(self): 1342 return self.f.readable() 1343 def readinto(self, *args, **kwargs): 1344 return self.f.readinto(*args, **kwargs) 1345 def seek(self, *args, **kwargs): 1346 return self.f.seek(*args, **kwargs) 1347 def seekable(self): 1348 return self.f.seekable() 1349 def tell(self): 1350 return self.f.tell() 1351 def writable(self): 1352 return self.f.writable() 1353 def write(self, *args, **kwargs): 1354 return self.f.write(*args, **kwargs)
Argument value for Collection
methods to represent an added item
Argument value for Collection
methods to represent a removed item
Argument value for Collection
methods to represent a modified item
Argument value for Collection
methods to represent an item with token differences
Argument value for Collection
methods to represent that a file was written to
40def split(path): 41 """split(path) -> streamname, filename 42 43 Separate the stream name and file name in a /-separated stream path and 44 return a tuple (stream_name, file_name). If no stream name is available, 45 assume '.'. 46 47 """ 48 try: 49 stream_name, file_name = path.rsplit('/', 1) 50 except ValueError: # No / in string 51 stream_name, file_name = '.', path 52 return stream_name, file_name
split(path) -> streamname, filename
Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.
55class UnownedBlockError(Exception): 56 """Raised when there's an writable block without an owner on the BlockManager.""" 57 pass
Raised when there’s an writable block without an owner on the BlockManager.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
89class ArvadosFileReaderBase(_FileLikeObjectBase): 90 def __init__(self, name, mode, num_retries=None): 91 super(ArvadosFileReaderBase, self).__init__(name, mode) 92 self._filepos = 0 93 self.num_retries = num_retries 94 self._readline_cache = (None, None) 95 96 def __iter__(self): 97 while True: 98 data = self.readline() 99 if not data: 100 break 101 yield data 102 103 def decompressed_name(self): 104 return re.sub(r'\.(bz2|gz)$', '', self.name) 105 106 @_FileLikeObjectBase._before_close 107 def seek(self, pos, whence=os.SEEK_SET): 108 if whence == os.SEEK_CUR: 109 pos += self._filepos 110 elif whence == os.SEEK_END: 111 pos += self.size() 112 if pos < 0: 113 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 114 self._filepos = pos 115 return self._filepos 116 117 def tell(self): 118 return self._filepos 119 120 def readable(self): 121 return True 122 123 def writable(self): 124 return False 125 126 def seekable(self): 127 return True 128 129 @_FileLikeObjectBase._before_close 130 @retry_method 131 def readall(self, size=2**20, num_retries=None): 132 while True: 133 data = self.read(size, num_retries=num_retries) 134 if len(data) == 0: 135 break 136 yield data 137 138 @_FileLikeObjectBase._before_close 139 @retry_method 140 def readline(self, size=float('inf'), num_retries=None): 141 cache_pos, cache_data = self._readline_cache 142 if self.tell() == cache_pos: 143 data = [cache_data] 144 self._filepos += len(cache_data) 145 else: 146 data = [b''] 147 data_size = len(data[-1]) 148 while (data_size < size) and (b'\n' not in data[-1]): 149 next_read = self.read(2 ** 20, num_retries=num_retries) 150 if not next_read: 151 break 152 data.append(next_read) 153 data_size += len(next_read) 154 data = b''.join(data) 155 try: 156 nextline_index = data.index(b'\n') + 1 157 except ValueError: 158 nextline_index = len(data) 159 nextline_index = min(nextline_index, size) 160 self._filepos -= len(data) - nextline_index 161 self._readline_cache = (self.tell(), data[nextline_index:]) 162 return data[:nextline_index].decode() 163 164 @_FileLikeObjectBase._before_close 165 @retry_method 166 def decompress(self, decompress, size, num_retries=None): 167 for segment in self.readall(size, num_retries=num_retries): 168 data = decompress(segment) 169 if data: 170 yield data 171 172 @_FileLikeObjectBase._before_close 173 @retry_method 174 def readall_decompressed(self, size=2**20, num_retries=None): 175 self.seek(0) 176 if self.name.endswith('.bz2'): 177 dc = bz2.BZ2Decompressor() 178 return self.decompress(dc.decompress, size, 179 num_retries=num_retries) 180 elif self.name.endswith('.gz'): 181 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 182 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 183 size, num_retries=num_retries) 184 else: 185 return self.readall(size, num_retries=num_retries) 186 187 @_FileLikeObjectBase._before_close 188 @retry_method 189 def readlines(self, sizehint=float('inf'), num_retries=None): 190 data = [] 191 data_size = 0 192 for s in self.readall(num_retries=num_retries): 193 data.append(s) 194 data_size += len(s) 195 if data_size >= sizehint: 196 break 197 return b''.join(data).decode().splitlines(True) 198 199 def size(self): 200 raise IOError(errno.ENOSYS, "Not implemented") 201 202 def read(self, size, num_retries=None): 203 raise IOError(errno.ENOSYS, "Not implemented") 204 205 def readfrom(self, start, size, num_retries=None): 206 raise IOError(errno.ENOSYS, "Not implemented")
106 @_FileLikeObjectBase._before_close 107 def seek(self, pos, whence=os.SEEK_SET): 108 if whence == os.SEEK_CUR: 109 pos += self._filepos 110 elif whence == os.SEEK_END: 111 pos += self.size() 112 if pos < 0: 113 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 114 self._filepos = pos 115 return self._filepos
138 @_FileLikeObjectBase._before_close 139 @retry_method 140 def readline(self, size=float('inf'), num_retries=None): 141 cache_pos, cache_data = self._readline_cache 142 if self.tell() == cache_pos: 143 data = [cache_data] 144 self._filepos += len(cache_data) 145 else: 146 data = [b''] 147 data_size = len(data[-1]) 148 while (data_size < size) and (b'\n' not in data[-1]): 149 next_read = self.read(2 ** 20, num_retries=num_retries) 150 if not next_read: 151 break 152 data.append(next_read) 153 data_size += len(next_read) 154 data = b''.join(data) 155 try: 156 nextline_index = data.index(b'\n') + 1 157 except ValueError: 158 nextline_index = len(data) 159 nextline_index = min(nextline_index, size) 160 self._filepos -= len(data) - nextline_index 161 self._readline_cache = (self.tell(), data[nextline_index:]) 162 return data[:nextline_index].decode()
172 @_FileLikeObjectBase._before_close 173 @retry_method 174 def readall_decompressed(self, size=2**20, num_retries=None): 175 self.seek(0) 176 if self.name.endswith('.bz2'): 177 dc = bz2.BZ2Decompressor() 178 return self.decompress(dc.decompress, size, 179 num_retries=num_retries) 180 elif self.name.endswith('.gz'): 181 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 182 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 183 size, num_retries=num_retries) 184 else: 185 return self.readall(size, num_retries=num_retries)
187 @_FileLikeObjectBase._before_close 188 @retry_method 189 def readlines(self, sizehint=float('inf'), num_retries=None): 190 data = [] 191 data_size = 0 192 for s in self.readall(num_retries=num_retries): 193 data.append(s) 194 data_size += len(s) 195 if data_size >= sizehint: 196 break 197 return b''.join(data).decode().splitlines(True)
Inherited Members
217class StateChangeError(Exception): 218 def __init__(self, message, state, nextstate): 219 super(StateChangeError, self).__init__(message) 220 self.state = state 221 self.nextstate = nextstate
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
759class ArvadosFile(object): 760 """Represent a file in a Collection. 761 762 ArvadosFile manages the underlying representation of a file in Keep as a 763 sequence of segments spanning a set of blocks, and implements random 764 read/write access. 765 766 This object may be accessed from multiple threads. 767 768 """ 769 770 __slots__ = ('parent', 'name', '_writers', '_committed', 771 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 772 773 def __init__(self, parent, name, stream=[], segments=[]): 774 """ 775 ArvadosFile constructor. 776 777 :stream: 778 a list of Range objects representing a block stream 779 780 :segments: 781 a list of Range objects representing segments 782 """ 783 self.parent = parent 784 self.name = name 785 self._writers = set() 786 self._committed = False 787 self._segments = [] 788 self.lock = parent.root_collection().lock 789 for s in segments: 790 self._add_segment(stream, s.locator, s.range_size) 791 self._current_bblock = None 792 self._read_counter = 0 793 794 def writable(self): 795 return self.parent.writable() 796 797 @synchronized 798 def permission_expired(self, as_of_dt=None): 799 """Returns True if any of the segment's locators is expired""" 800 for r in self._segments: 801 if KeepLocator(r.locator).permission_expired(as_of_dt): 802 return True 803 return False 804 805 @synchronized 806 def has_remote_blocks(self): 807 """Returns True if any of the segment's locators has a +R signature""" 808 809 for s in self._segments: 810 if '+R' in s.locator: 811 return True 812 return False 813 814 @synchronized 815 def _copy_remote_blocks(self, remote_blocks={}): 816 """Ask Keep to copy remote blocks and point to their local copies. 817 818 This is called from the parent Collection. 819 820 :remote_blocks: 821 Shared cache of remote to local block mappings. This is used to avoid 822 doing extra work when blocks are shared by more than one file in 823 different subdirectories. 824 """ 825 826 for s in self._segments: 827 if '+R' in s.locator: 828 try: 829 loc = remote_blocks[s.locator] 830 except KeyError: 831 loc = self.parent._my_keep().refresh_signature(s.locator) 832 remote_blocks[s.locator] = loc 833 s.locator = loc 834 self.parent.set_committed(False) 835 return remote_blocks 836 837 @synchronized 838 def segments(self): 839 return copy.copy(self._segments) 840 841 @synchronized 842 def clone(self, new_parent, new_name): 843 """Make a copy of this file.""" 844 cp = ArvadosFile(new_parent, new_name) 845 cp.replace_contents(self) 846 return cp 847 848 @must_be_writable 849 @synchronized 850 def replace_contents(self, other): 851 """Replace segments of this file with segments from another `ArvadosFile` object.""" 852 853 eventtype = TOK if self == other else MOD 854 855 map_loc = {} 856 self._segments = [] 857 for other_segment in other.segments(): 858 new_loc = other_segment.locator 859 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 860 if other_segment.locator not in map_loc: 861 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 862 if bufferblock.state() != _BufferBlock.WRITABLE: 863 map_loc[other_segment.locator] = bufferblock.locator() 864 else: 865 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 866 new_loc = map_loc[other_segment.locator] 867 868 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 869 870 self.set_committed(False) 871 self.parent.notify(eventtype, self.parent, self.name, (self, self)) 872 873 def __eq__(self, other): 874 if other is self: 875 return True 876 if not isinstance(other, ArvadosFile): 877 return False 878 879 othersegs = other.segments() 880 with self.lock: 881 if len(self._segments) != len(othersegs): 882 return False 883 for i in range(0, len(othersegs)): 884 seg1 = self._segments[i] 885 seg2 = othersegs[i] 886 loc1 = seg1.locator 887 loc2 = seg2.locator 888 889 if self.parent._my_block_manager().is_bufferblock(loc1): 890 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 891 892 if other.parent._my_block_manager().is_bufferblock(loc2): 893 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 894 895 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 896 seg1.range_start != seg2.range_start or 897 seg1.range_size != seg2.range_size or 898 seg1.segment_offset != seg2.segment_offset): 899 return False 900 901 return True 902 903 def __ne__(self, other): 904 return not self.__eq__(other) 905 906 @synchronized 907 def set_segments(self, segs): 908 self._segments = segs 909 910 @synchronized 911 def set_committed(self, value=True): 912 """Set committed flag. 913 914 If value is True, set committed to be True. 915 916 If value is False, set committed to be False for this and all parents. 917 """ 918 if value == self._committed: 919 return 920 self._committed = value 921 if self._committed is False and self.parent is not None: 922 self.parent.set_committed(False) 923 924 @synchronized 925 def committed(self): 926 """Get whether this is committed or not.""" 927 return self._committed 928 929 @synchronized 930 def add_writer(self, writer): 931 """Add an ArvadosFileWriter reference to the list of writers""" 932 if isinstance(writer, ArvadosFileWriter): 933 self._writers.add(writer) 934 935 @synchronized 936 def remove_writer(self, writer, flush): 937 """ 938 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 939 and do some block maintenance tasks. 940 """ 941 self._writers.remove(writer) 942 943 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 944 # File writer closed, not small enough for repacking 945 self.flush() 946 elif self.closed(): 947 # All writers closed and size is adequate for repacking 948 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 949 950 def closed(self): 951 """ 952 Get whether this is closed or not. When the writers list is empty, the file 953 is supposed to be closed. 954 """ 955 return len(self._writers) == 0 956 957 @must_be_writable 958 @synchronized 959 def truncate(self, size): 960 """Shrink or expand the size of the file. 961 962 If `size` is less than the size of the file, the file contents after 963 `size` will be discarded. If `size` is greater than the current size 964 of the file, it will be filled with zero bytes. 965 966 """ 967 if size < self.size(): 968 new_segs = [] 969 for r in self._segments: 970 range_end = r.range_start+r.range_size 971 if r.range_start >= size: 972 # segment is past the trucate size, all done 973 break 974 elif size < range_end: 975 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0) 976 nr.segment_offset = r.segment_offset 977 new_segs.append(nr) 978 break 979 else: 980 new_segs.append(r) 981 982 self._segments = new_segs 983 self.set_committed(False) 984 elif size > self.size(): 985 padding = self.parent._my_block_manager().get_padding_block() 986 diff = size - self.size() 987 while diff > config.KEEP_BLOCK_SIZE: 988 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 989 diff -= config.KEEP_BLOCK_SIZE 990 if diff > 0: 991 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0)) 992 self.set_committed(False) 993 else: 994 # size == self.size() 995 pass 996 997 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 998 """Read up to `size` bytes from the file starting at `offset`. 999 1000 Arguments: 1001 1002 * exact: bool --- If False (default), return less data than 1003 requested if the read crosses a block boundary and the next 1004 block isn't cached. If True, only return less data than 1005 requested when hitting EOF. 1006 1007 * return_memoryview: bool --- If False (default) return a 1008 `bytes` object, which may entail making a copy in some 1009 situations. If True, return a `memoryview` object which may 1010 avoid making a copy, but may be incompatible with code 1011 expecting a `bytes` object. 1012 1013 """ 1014 1015 with self.lock: 1016 if size == 0 or offset >= self.size(): 1017 return memoryview(b'') if return_memoryview else b'' 1018 readsegs = streams.locators_and_ranges(self._segments, offset, size) 1019 1020 prefetch = None 1021 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1022 if prefetch_lookahead: 1023 # Doing prefetch on every read() call is surprisingly expensive 1024 # when we're trying to deliver data at 600+ MiBps and want 1025 # the read() fast path to be as lightweight as possible. 1026 # 1027 # Only prefetching every 128 read operations 1028 # dramatically reduces the overhead while still 1029 # getting the benefit of prefetching (e.g. when 1030 # reading 128 KiB at a time, it checks for prefetch 1031 # every 16 MiB). 1032 self._read_counter = (self._read_counter+1) % 128 1033 if self._read_counter == 1: 1034 prefetch = streams.locators_and_ranges( 1035 self._segments, 1036 offset + size, 1037 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1038 limit=(1+prefetch_lookahead), 1039 ) 1040 1041 locs = set() 1042 data = [] 1043 for lr in readsegs: 1044 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1045 if block: 1046 blockview = memoryview(block) 1047 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1048 locs.add(lr.locator) 1049 else: 1050 break 1051 1052 if prefetch: 1053 for lr in prefetch: 1054 if lr.locator not in locs: 1055 self.parent._my_block_manager().block_prefetch(lr.locator) 1056 locs.add(lr.locator) 1057 1058 if len(data) == 1: 1059 return data[0] if return_memoryview else data[0].tobytes() 1060 else: 1061 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1062 1063 1064 @must_be_writable 1065 @synchronized 1066 def writeto(self, offset, data, num_retries): 1067 """Write `data` to the file starting at `offset`. 1068 1069 This will update existing bytes and/or extend the size of the file as 1070 necessary. 1071 1072 """ 1073 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1074 data = data.encode() 1075 if len(data) == 0: 1076 return 1077 1078 if offset > self.size(): 1079 self.truncate(offset) 1080 1081 if len(data) > config.KEEP_BLOCK_SIZE: 1082 # Chunk it up into smaller writes 1083 n = 0 1084 dataview = memoryview(data) 1085 while n < len(data): 1086 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1087 n += config.KEEP_BLOCK_SIZE 1088 return 1089 1090 self.set_committed(False) 1091 1092 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1093 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1094 1095 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1096 self._current_bblock.repack_writes() 1097 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1098 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1099 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1100 1101 self._current_bblock.append(data) 1102 streams.replace_range( 1103 self._segments, 1104 offset, 1105 len(data), 1106 self._current_bblock.blockid, 1107 self._current_bblock.write_pointer - len(data), 1108 ) 1109 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1110 return len(data) 1111 1112 @synchronized 1113 def flush(self, sync=True, num_retries=0): 1114 """Flush the current bufferblock to Keep. 1115 1116 :sync: 1117 If True, commit block synchronously, wait until buffer block has been written. 1118 If False, commit block asynchronously, return immediately after putting block into 1119 the keep put queue. 1120 """ 1121 if self.committed(): 1122 return 1123 1124 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1125 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1126 self._current_bblock.repack_writes() 1127 if self._current_bblock.state() != _BufferBlock.DELETED: 1128 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1129 1130 if sync: 1131 to_delete = set() 1132 for s in self._segments: 1133 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1134 if bb: 1135 if bb.state() != _BufferBlock.COMMITTED: 1136 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1137 to_delete.add(s.locator) 1138 s.locator = bb.locator() 1139 for s in to_delete: 1140 # Don't delete the bufferblock if it's owned by many files. It'll be 1141 # deleted after all of its owners are flush()ed. 1142 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1143 self.parent._my_block_manager().delete_bufferblock(s) 1144 1145 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1146 1147 @must_be_writable 1148 @synchronized 1149 def add_segment(self, blocks, pos, size): 1150 """Add a segment to the end of the file. 1151 1152 `pos` and `offset` reference a section of the stream described by 1153 `blocks` (a list of Range objects) 1154 1155 """ 1156 self._add_segment(blocks, pos, size) 1157 1158 def _add_segment(self, blocks, pos, size): 1159 """Internal implementation of add_segment.""" 1160 self.set_committed(False) 1161 for lr in streams.locators_and_ranges(blocks, pos, size): 1162 last = self._segments[-1] if self._segments else streams.Range(0, 0, 0, 0) 1163 r = streams.Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1164 self._segments.append(r) 1165 1166 @synchronized 1167 def size(self): 1168 """Get the file size.""" 1169 if self._segments: 1170 n = self._segments[-1] 1171 return n.range_start + n.range_size 1172 else: 1173 return 0 1174 1175 @synchronized 1176 def manifest_text(self, stream_name=".", portable_locators=False, 1177 normalize=False, only_committed=False): 1178 buf = "" 1179 filestream = [] 1180 for segment in self._segments: 1181 loc = segment.locator 1182 if self.parent._my_block_manager().is_bufferblock(loc): 1183 if only_committed: 1184 continue 1185 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1186 if portable_locators: 1187 loc = KeepLocator(loc).stripped() 1188 filestream.append(streams.LocatorAndRange( 1189 loc, 1190 KeepLocator(loc).size, 1191 segment.segment_offset, 1192 segment.range_size, 1193 )) 1194 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream})) 1195 buf += "\n" 1196 return buf 1197 1198 @must_be_writable 1199 @synchronized 1200 def _reparent(self, newparent, newname): 1201 self.set_committed(False) 1202 self.flush(sync=True) 1203 self.parent.remove(self.name) 1204 self.parent = newparent 1205 self.name = newname 1206 self.lock = self.parent.root_collection().lock
Represent a file in a Collection.
ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.
This object may be accessed from multiple threads.
773 def __init__(self, parent, name, stream=[], segments=[]): 774 """ 775 ArvadosFile constructor. 776 777 :stream: 778 a list of Range objects representing a block stream 779 780 :segments: 781 a list of Range objects representing segments 782 """ 783 self.parent = parent 784 self.name = name 785 self._writers = set() 786 self._committed = False 787 self._segments = [] 788 self.lock = parent.root_collection().lock 789 for s in segments: 790 self._add_segment(stream, s.locator, s.range_size) 791 self._current_bblock = None 792 self._read_counter = 0
ArvadosFile constructor.
:stream: a list of Range objects representing a block stream
:segments: a list of Range objects representing segments
797 @synchronized 798 def permission_expired(self, as_of_dt=None): 799 """Returns True if any of the segment's locators is expired""" 800 for r in self._segments: 801 if KeepLocator(r.locator).permission_expired(as_of_dt): 802 return True 803 return False
Returns True if any of the segment’s locators is expired
805 @synchronized 806 def has_remote_blocks(self): 807 """Returns True if any of the segment's locators has a +R signature""" 808 809 for s in self._segments: 810 if '+R' in s.locator: 811 return True 812 return False
Returns True if any of the segment’s locators has a +R signature
841 @synchronized 842 def clone(self, new_parent, new_name): 843 """Make a copy of this file.""" 844 cp = ArvadosFile(new_parent, new_name) 845 cp.replace_contents(self) 846 return cp
Make a copy of this file.
848 @must_be_writable 849 @synchronized 850 def replace_contents(self, other): 851 """Replace segments of this file with segments from another `ArvadosFile` object.""" 852 853 eventtype = TOK if self == other else MOD 854 855 map_loc = {} 856 self._segments = [] 857 for other_segment in other.segments(): 858 new_loc = other_segment.locator 859 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 860 if other_segment.locator not in map_loc: 861 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 862 if bufferblock.state() != _BufferBlock.WRITABLE: 863 map_loc[other_segment.locator] = bufferblock.locator() 864 else: 865 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 866 new_loc = map_loc[other_segment.locator] 867 868 self._segments.append(streams.Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 869 870 self.set_committed(False) 871 self.parent.notify(eventtype, self.parent, self.name, (self, self))
Replace segments of this file with segments from another ArvadosFile
object.
910 @synchronized 911 def set_committed(self, value=True): 912 """Set committed flag. 913 914 If value is True, set committed to be True. 915 916 If value is False, set committed to be False for this and all parents. 917 """ 918 if value == self._committed: 919 return 920 self._committed = value 921 if self._committed is False and self.parent is not None: 922 self.parent.set_committed(False)
Set committed flag.
If value is True, set committed to be True.
If value is False, set committed to be False for this and all parents.
924 @synchronized 925 def committed(self): 926 """Get whether this is committed or not.""" 927 return self._committed
Get whether this is committed or not.
929 @synchronized 930 def add_writer(self, writer): 931 """Add an ArvadosFileWriter reference to the list of writers""" 932 if isinstance(writer, ArvadosFileWriter): 933 self._writers.add(writer)
Add an ArvadosFileWriter reference to the list of writers
935 @synchronized 936 def remove_writer(self, writer, flush): 937 """ 938 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 939 and do some block maintenance tasks. 940 """ 941 self._writers.remove(writer) 942 943 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 944 # File writer closed, not small enough for repacking 945 self.flush() 946 elif self.closed(): 947 # All writers closed and size is adequate for repacking 948 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.
950 def closed(self): 951 """ 952 Get whether this is closed or not. When the writers list is empty, the file 953 is supposed to be closed. 954 """ 955 return len(self._writers) == 0
Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.
957 @must_be_writable 958 @synchronized 959 def truncate(self, size): 960 """Shrink or expand the size of the file. 961 962 If `size` is less than the size of the file, the file contents after 963 `size` will be discarded. If `size` is greater than the current size 964 of the file, it will be filled with zero bytes. 965 966 """ 967 if size < self.size(): 968 new_segs = [] 969 for r in self._segments: 970 range_end = r.range_start+r.range_size 971 if r.range_start >= size: 972 # segment is past the trucate size, all done 973 break 974 elif size < range_end: 975 nr = streams.Range(r.locator, r.range_start, size - r.range_start, 0) 976 nr.segment_offset = r.segment_offset 977 new_segs.append(nr) 978 break 979 else: 980 new_segs.append(r) 981 982 self._segments = new_segs 983 self.set_committed(False) 984 elif size > self.size(): 985 padding = self.parent._my_block_manager().get_padding_block() 986 diff = size - self.size() 987 while diff > config.KEEP_BLOCK_SIZE: 988 self._segments.append(streams.Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 989 diff -= config.KEEP_BLOCK_SIZE 990 if diff > 0: 991 self._segments.append(streams.Range(padding.blockid, self.size(), diff, 0)) 992 self.set_committed(False) 993 else: 994 # size == self.size() 995 pass
997 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 998 """Read up to `size` bytes from the file starting at `offset`. 999 1000 Arguments: 1001 1002 * exact: bool --- If False (default), return less data than 1003 requested if the read crosses a block boundary and the next 1004 block isn't cached. If True, only return less data than 1005 requested when hitting EOF. 1006 1007 * return_memoryview: bool --- If False (default) return a 1008 `bytes` object, which may entail making a copy in some 1009 situations. If True, return a `memoryview` object which may 1010 avoid making a copy, but may be incompatible with code 1011 expecting a `bytes` object. 1012 1013 """ 1014 1015 with self.lock: 1016 if size == 0 or offset >= self.size(): 1017 return memoryview(b'') if return_memoryview else b'' 1018 readsegs = streams.locators_and_ranges(self._segments, offset, size) 1019 1020 prefetch = None 1021 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1022 if prefetch_lookahead: 1023 # Doing prefetch on every read() call is surprisingly expensive 1024 # when we're trying to deliver data at 600+ MiBps and want 1025 # the read() fast path to be as lightweight as possible. 1026 # 1027 # Only prefetching every 128 read operations 1028 # dramatically reduces the overhead while still 1029 # getting the benefit of prefetching (e.g. when 1030 # reading 128 KiB at a time, it checks for prefetch 1031 # every 16 MiB). 1032 self._read_counter = (self._read_counter+1) % 128 1033 if self._read_counter == 1: 1034 prefetch = streams.locators_and_ranges( 1035 self._segments, 1036 offset + size, 1037 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1038 limit=(1+prefetch_lookahead), 1039 ) 1040 1041 locs = set() 1042 data = [] 1043 for lr in readsegs: 1044 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1045 if block: 1046 blockview = memoryview(block) 1047 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1048 locs.add(lr.locator) 1049 else: 1050 break 1051 1052 if prefetch: 1053 for lr in prefetch: 1054 if lr.locator not in locs: 1055 self.parent._my_block_manager().block_prefetch(lr.locator) 1056 locs.add(lr.locator) 1057 1058 if len(data) == 1: 1059 return data[0] if return_memoryview else data[0].tobytes() 1060 else: 1061 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
Read up to size
bytes from the file starting at offset
.
Arguments:
exact: bool — If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.
return_memoryview: bool — If False (default) return a
bytes
object, which may entail making a copy in some situations. If True, return amemoryview
object which may avoid making a copy, but may be incompatible with code expecting abytes
object.
1064 @must_be_writable 1065 @synchronized 1066 def writeto(self, offset, data, num_retries): 1067 """Write `data` to the file starting at `offset`. 1068 1069 This will update existing bytes and/or extend the size of the file as 1070 necessary. 1071 1072 """ 1073 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1074 data = data.encode() 1075 if len(data) == 0: 1076 return 1077 1078 if offset > self.size(): 1079 self.truncate(offset) 1080 1081 if len(data) > config.KEEP_BLOCK_SIZE: 1082 # Chunk it up into smaller writes 1083 n = 0 1084 dataview = memoryview(data) 1085 while n < len(data): 1086 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1087 n += config.KEEP_BLOCK_SIZE 1088 return 1089 1090 self.set_committed(False) 1091 1092 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1093 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1094 1095 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1096 self._current_bblock.repack_writes() 1097 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1098 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1099 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1100 1101 self._current_bblock.append(data) 1102 streams.replace_range( 1103 self._segments, 1104 offset, 1105 len(data), 1106 self._current_bblock.blockid, 1107 self._current_bblock.write_pointer - len(data), 1108 ) 1109 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1110 return len(data)
Write data
to the file starting at offset
.
This will update existing bytes and/or extend the size of the file as necessary.
1112 @synchronized 1113 def flush(self, sync=True, num_retries=0): 1114 """Flush the current bufferblock to Keep. 1115 1116 :sync: 1117 If True, commit block synchronously, wait until buffer block has been written. 1118 If False, commit block asynchronously, return immediately after putting block into 1119 the keep put queue. 1120 """ 1121 if self.committed(): 1122 return 1123 1124 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1125 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1126 self._current_bblock.repack_writes() 1127 if self._current_bblock.state() != _BufferBlock.DELETED: 1128 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1129 1130 if sync: 1131 to_delete = set() 1132 for s in self._segments: 1133 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1134 if bb: 1135 if bb.state() != _BufferBlock.COMMITTED: 1136 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1137 to_delete.add(s.locator) 1138 s.locator = bb.locator() 1139 for s in to_delete: 1140 # Don't delete the bufferblock if it's owned by many files. It'll be 1141 # deleted after all of its owners are flush()ed. 1142 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1143 self.parent._my_block_manager().delete_bufferblock(s) 1144 1145 self.parent.notify(MOD, self.parent, self.name, (self, self))
Flush the current bufferblock to Keep.
:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.
1147 @must_be_writable 1148 @synchronized 1149 def add_segment(self, blocks, pos, size): 1150 """Add a segment to the end of the file. 1151 1152 `pos` and `offset` reference a section of the stream described by 1153 `blocks` (a list of Range objects) 1154 1155 """ 1156 self._add_segment(blocks, pos, size)
Add a segment to the end of the file.
pos
and offset
reference a section of the stream described by
blocks
(a list of Range objects)
1166 @synchronized 1167 def size(self): 1168 """Get the file size.""" 1169 if self._segments: 1170 n = self._segments[-1] 1171 return n.range_start + n.range_size 1172 else: 1173 return 0
Get the file size.
1175 @synchronized 1176 def manifest_text(self, stream_name=".", portable_locators=False, 1177 normalize=False, only_committed=False): 1178 buf = "" 1179 filestream = [] 1180 for segment in self._segments: 1181 loc = segment.locator 1182 if self.parent._my_block_manager().is_bufferblock(loc): 1183 if only_committed: 1184 continue 1185 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1186 if portable_locators: 1187 loc = KeepLocator(loc).stripped() 1188 filestream.append(streams.LocatorAndRange( 1189 loc, 1190 KeepLocator(loc).size, 1191 segment.segment_offset, 1192 segment.range_size, 1193 )) 1194 buf += ' '.join(streams.normalize_stream(stream_name, {self.name: filestream})) 1195 buf += "\n" 1196 return buf
1209class ArvadosFileReader(ArvadosFileReaderBase): 1210 """Wraps ArvadosFile in a file-like object supporting reading only. 1211 1212 Be aware that this class is NOT thread safe as there is no locking around 1213 updating file pointer. 1214 1215 """ 1216 1217 def __init__(self, arvadosfile, mode="r", num_retries=None): 1218 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1219 self.arvadosfile = arvadosfile 1220 1221 def size(self): 1222 return self.arvadosfile.size() 1223 1224 def stream_name(self): 1225 return self.arvadosfile.parent.stream_name() 1226 1227 def readinto(self, b): 1228 data = self.read(len(b)) 1229 b[:len(data)] = data 1230 return len(data) 1231 1232 @_FileLikeObjectBase._before_close 1233 @retry_method 1234 def read(self, size=-1, num_retries=None, return_memoryview=False): 1235 """Read up to `size` bytes from the file and return the result. 1236 1237 Starts at the current file position. If `size` is negative or None, 1238 read the entire remainder of the file. 1239 1240 Returns None if the file pointer is at the end of the file. 1241 1242 Returns a `bytes` object, unless `return_memoryview` is True, 1243 in which case it returns a memory view, which may avoid an 1244 unnecessary data copy in some situations. 1245 1246 """ 1247 if size < 0 or size is None: 1248 data = [] 1249 # 1250 # specify exact=False, return_memoryview=True here so that we 1251 # only copy data once into the final buffer. 1252 # 1253 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1254 while rd: 1255 data.append(rd) 1256 self._filepos += len(rd) 1257 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1258 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1259 else: 1260 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1261 self._filepos += len(data) 1262 return data 1263 1264 @_FileLikeObjectBase._before_close 1265 @retry_method 1266 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1267 """Read up to `size` bytes from the stream, starting at the specified file offset. 1268 1269 This method does not change the file position. 1270 1271 Returns a `bytes` object, unless `return_memoryview` is True, 1272 in which case it returns a memory view, which may avoid an 1273 unnecessary data copy in some situations. 1274 1275 """ 1276 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview) 1277 1278 def flush(self): 1279 pass
Wraps ArvadosFile in a file-like object supporting reading only.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1232 @_FileLikeObjectBase._before_close 1233 @retry_method 1234 def read(self, size=-1, num_retries=None, return_memoryview=False): 1235 """Read up to `size` bytes from the file and return the result. 1236 1237 Starts at the current file position. If `size` is negative or None, 1238 read the entire remainder of the file. 1239 1240 Returns None if the file pointer is at the end of the file. 1241 1242 Returns a `bytes` object, unless `return_memoryview` is True, 1243 in which case it returns a memory view, which may avoid an 1244 unnecessary data copy in some situations. 1245 1246 """ 1247 if size < 0 or size is None: 1248 data = [] 1249 # 1250 # specify exact=False, return_memoryview=True here so that we 1251 # only copy data once into the final buffer. 1252 # 1253 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1254 while rd: 1255 data.append(rd) 1256 self._filepos += len(rd) 1257 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1258 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1259 else: 1260 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1261 self._filepos += len(data) 1262 return data
Read up to size
bytes from the file and return the result.
Starts at the current file position. If size
is negative or None,
read the entire remainder of the file.
Returns None if the file pointer is at the end of the file.
Returns a bytes
object, unless return_memoryview
is True,
in which case it returns a memory view, which may avoid an
unnecessary data copy in some situations.
1264 @_FileLikeObjectBase._before_close 1265 @retry_method 1266 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1267 """Read up to `size` bytes from the stream, starting at the specified file offset. 1268 1269 This method does not change the file position. 1270 1271 Returns a `bytes` object, unless `return_memoryview` is True, 1272 in which case it returns a memory view, which may avoid an 1273 unnecessary data copy in some situations. 1274 1275 """ 1276 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
Read up to size
bytes from the stream, starting at the specified file offset.
This method does not change the file position.
Returns a bytes
object, unless return_memoryview
is True,
in which case it returns a memory view, which may avoid an
unnecessary data copy in some situations.
1282class ArvadosFileWriter(ArvadosFileReader): 1283 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1284 1285 Be aware that this class is NOT thread safe as there is no locking around 1286 updating file pointer. 1287 1288 """ 1289 1290 def __init__(self, arvadosfile, mode, num_retries=None): 1291 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1292 self.arvadosfile.add_writer(self) 1293 1294 def writable(self): 1295 return True 1296 1297 @_FileLikeObjectBase._before_close 1298 @retry_method 1299 def write(self, data, num_retries=None): 1300 if self.mode[0] == "a": 1301 self._filepos = self.size() 1302 self.arvadosfile.writeto(self._filepos, data, num_retries) 1303 self._filepos += len(data) 1304 return len(data) 1305 1306 @_FileLikeObjectBase._before_close 1307 @retry_method 1308 def writelines(self, seq, num_retries=None): 1309 for s in seq: 1310 self.write(s, num_retries=num_retries) 1311 1312 @_FileLikeObjectBase._before_close 1313 def truncate(self, size=None): 1314 if size is None: 1315 size = self._filepos 1316 self.arvadosfile.truncate(size) 1317 1318 @_FileLikeObjectBase._before_close 1319 def flush(self): 1320 self.arvadosfile.flush() 1321 1322 def close(self, flush=True): 1323 if not self.closed: 1324 self.arvadosfile.remove_writer(self, flush) 1325 super(ArvadosFileWriter, self).close()
Wraps ArvadosFile in a file-like object supporting both reading and writing.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1328class WrappableFile(object): 1329 """An interface to an Arvados file that's compatible with io wrappers. 1330 1331 """ 1332 def __init__(self, f): 1333 self.f = f 1334 self.closed = False 1335 def close(self): 1336 self.closed = True 1337 return self.f.close() 1338 def flush(self): 1339 return self.f.flush() 1340 def read(self, *args, **kwargs): 1341 return self.f.read(*args, **kwargs) 1342 def readable(self): 1343 return self.f.readable() 1344 def readinto(self, *args, **kwargs): 1345 return self.f.readinto(*args, **kwargs) 1346 def seek(self, *args, **kwargs): 1347 return self.f.seek(*args, **kwargs) 1348 def seekable(self): 1349 return self.f.seekable() 1350 def tell(self): 1351 return self.f.tell() 1352 def writable(self): 1353 return self.f.writable() 1354 def write(self, *args, **kwargs): 1355 return self.f.write(*args, **kwargs)
An interface to an Arvados file that’s compatible with io wrappers.