arvados.arvfile
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import bz2 6import collections 7import copy 8import errno 9import functools 10import hashlib 11import logging 12import os 13import queue 14import re 15import sys 16import threading 17import uuid 18import zlib 19 20from . import config 21from .errors import KeepWriteError, AssertionError, ArgumentError 22from .keep import KeepLocator 23from ._normalize_stream import normalize_stream 24from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange 25from .retry import retry_method 26 27MOD = "mod" 28WRITE = "write" 29 30_logger = logging.getLogger('arvados.arvfile') 31 32def split(path): 33 """split(path) -> streamname, filename 34 35 Separate the stream name and file name in a /-separated stream path and 36 return a tuple (stream_name, file_name). If no stream name is available, 37 assume '.'. 38 39 """ 40 try: 41 stream_name, file_name = path.rsplit('/', 1) 42 except ValueError: # No / in string 43 stream_name, file_name = '.', path 44 return stream_name, file_name 45 46 47class UnownedBlockError(Exception): 48 """Raised when there's an writable block without an owner on the BlockManager.""" 49 pass 50 51 52class _FileLikeObjectBase(object): 53 def __init__(self, name, mode): 54 self.name = name 55 self.mode = mode 56 self.closed = False 57 58 @staticmethod 59 def _before_close(orig_func): 60 @functools.wraps(orig_func) 61 def before_close_wrapper(self, *args, **kwargs): 62 if self.closed: 63 raise ValueError("I/O operation on closed stream file") 64 return orig_func(self, *args, **kwargs) 65 return before_close_wrapper 66 67 def __enter__(self): 68 return self 69 70 def __exit__(self, exc_type, exc_value, traceback): 71 try: 72 self.close() 73 except Exception: 74 if exc_type is None: 75 raise 76 77 def close(self): 78 self.closed = True 79 80 81class ArvadosFileReaderBase(_FileLikeObjectBase): 82 def __init__(self, name, mode, num_retries=None): 83 super(ArvadosFileReaderBase, self).__init__(name, mode) 84 self._filepos = 0 85 self.num_retries = num_retries 86 self._readline_cache = (None, None) 87 88 def __iter__(self): 89 while True: 90 data = self.readline() 91 if not data: 92 break 93 yield data 94 95 def decompressed_name(self): 96 return re.sub(r'\.(bz2|gz)$', '', self.name) 97 98 @_FileLikeObjectBase._before_close 99 def seek(self, pos, whence=os.SEEK_SET): 100 if whence == os.SEEK_CUR: 101 pos += self._filepos 102 elif whence == os.SEEK_END: 103 pos += self.size() 104 if pos < 0: 105 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 106 self._filepos = pos 107 return self._filepos 108 109 def tell(self): 110 return self._filepos 111 112 def readable(self): 113 return True 114 115 def writable(self): 116 return False 117 118 def seekable(self): 119 return True 120 121 @_FileLikeObjectBase._before_close 122 @retry_method 123 def readall(self, size=2**20, num_retries=None): 124 while True: 125 data = self.read(size, num_retries=num_retries) 126 if len(data) == 0: 127 break 128 yield data 129 130 @_FileLikeObjectBase._before_close 131 @retry_method 132 def readline(self, size=float('inf'), num_retries=None): 133 cache_pos, cache_data = self._readline_cache 134 if self.tell() == cache_pos: 135 data = [cache_data] 136 self._filepos += len(cache_data) 137 else: 138 data = [b''] 139 data_size = len(data[-1]) 140 while (data_size < size) and (b'\n' not in data[-1]): 141 next_read = self.read(2 ** 20, num_retries=num_retries) 142 if not next_read: 143 break 144 data.append(next_read) 145 data_size += len(next_read) 146 data = b''.join(data) 147 try: 148 nextline_index = data.index(b'\n') + 1 149 except ValueError: 150 nextline_index = len(data) 151 nextline_index = min(nextline_index, size) 152 self._filepos -= len(data) - nextline_index 153 self._readline_cache = (self.tell(), data[nextline_index:]) 154 return data[:nextline_index].decode() 155 156 @_FileLikeObjectBase._before_close 157 @retry_method 158 def decompress(self, decompress, size, num_retries=None): 159 for segment in self.readall(size, num_retries=num_retries): 160 data = decompress(segment) 161 if data: 162 yield data 163 164 @_FileLikeObjectBase._before_close 165 @retry_method 166 def readall_decompressed(self, size=2**20, num_retries=None): 167 self.seek(0) 168 if self.name.endswith('.bz2'): 169 dc = bz2.BZ2Decompressor() 170 return self.decompress(dc.decompress, size, 171 num_retries=num_retries) 172 elif self.name.endswith('.gz'): 173 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 174 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 175 size, num_retries=num_retries) 176 else: 177 return self.readall(size, num_retries=num_retries) 178 179 @_FileLikeObjectBase._before_close 180 @retry_method 181 def readlines(self, sizehint=float('inf'), num_retries=None): 182 data = [] 183 data_size = 0 184 for s in self.readall(num_retries=num_retries): 185 data.append(s) 186 data_size += len(s) 187 if data_size >= sizehint: 188 break 189 return b''.join(data).decode().splitlines(True) 190 191 def size(self): 192 raise IOError(errno.ENOSYS, "Not implemented") 193 194 def read(self, size, num_retries=None): 195 raise IOError(errno.ENOSYS, "Not implemented") 196 197 def readfrom(self, start, size, num_retries=None): 198 raise IOError(errno.ENOSYS, "Not implemented") 199 200 201class StreamFileReader(ArvadosFileReaderBase): 202 class _NameAttribute(str): 203 # The Python file API provides a plain .name attribute. 204 # Older SDK provided a name() method. 205 # This class provides both, for maximum compatibility. 206 def __call__(self): 207 return self 208 209 def __init__(self, stream, segments, name): 210 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 211 self._stream = stream 212 self.segments = segments 213 214 def stream_name(self): 215 return self._stream.name() 216 217 def size(self): 218 n = self.segments[-1] 219 return n.range_start + n.range_size 220 221 @_FileLikeObjectBase._before_close 222 @retry_method 223 def read(self, size, num_retries=None): 224 """Read up to 'size' bytes from the stream, starting at the current file position""" 225 if size == 0: 226 return b'' 227 228 data = b'' 229 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 230 if available_chunks: 231 lr = available_chunks[0] 232 data = self._stream.readfrom(lr.locator+lr.segment_offset, 233 lr.segment_size, 234 num_retries=num_retries) 235 236 self._filepos += len(data) 237 return data 238 239 @_FileLikeObjectBase._before_close 240 @retry_method 241 def readfrom(self, start, size, num_retries=None): 242 """Read up to 'size' bytes from the stream, starting at 'start'""" 243 if size == 0: 244 return b'' 245 246 data = [] 247 for lr in locators_and_ranges(self.segments, start, size): 248 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 249 num_retries=num_retries)) 250 return b''.join(data) 251 252 def as_manifest(self): 253 segs = [] 254 for r in self.segments: 255 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 256 return " ".join(normalize_stream(".", {self.name: segs})) + "\n" 257 258 259def synchronized(orig_func): 260 @functools.wraps(orig_func) 261 def synchronized_wrapper(self, *args, **kwargs): 262 with self.lock: 263 return orig_func(self, *args, **kwargs) 264 return synchronized_wrapper 265 266 267class StateChangeError(Exception): 268 def __init__(self, message, state, nextstate): 269 super(StateChangeError, self).__init__(message) 270 self.state = state 271 self.nextstate = nextstate 272 273class _BufferBlock(object): 274 """A stand-in for a Keep block that is in the process of being written. 275 276 Writers can append to it, get the size, and compute the Keep locator. 277 There are three valid states: 278 279 WRITABLE 280 Can append to block. 281 282 PENDING 283 Block is in the process of being uploaded to Keep, append is an error. 284 285 COMMITTED 286 The block has been written to Keep, its internal buffer has been 287 released, fetching the block will fetch it via keep client (since we 288 discarded the internal copy), and identifiers referring to the BufferBlock 289 can be replaced with the block locator. 290 291 """ 292 293 WRITABLE = 0 294 PENDING = 1 295 COMMITTED = 2 296 ERROR = 3 297 DELETED = 4 298 299 def __init__(self, blockid, starting_capacity, owner): 300 """ 301 :blockid: 302 the identifier for this block 303 304 :starting_capacity: 305 the initial buffer capacity 306 307 :owner: 308 ArvadosFile that owns this block 309 310 """ 311 self.blockid = blockid 312 self.buffer_block = bytearray(starting_capacity) 313 self.buffer_view = memoryview(self.buffer_block) 314 self.write_pointer = 0 315 self._state = _BufferBlock.WRITABLE 316 self._locator = None 317 self.owner = owner 318 self.lock = threading.Lock() 319 self.wait_for_commit = threading.Event() 320 self.error = None 321 322 @synchronized 323 def append(self, data): 324 """Append some data to the buffer. 325 326 Only valid if the block is in WRITABLE state. Implements an expanding 327 buffer, doubling capacity as needed to accomdate all the data. 328 329 """ 330 if self._state == _BufferBlock.WRITABLE: 331 if not isinstance(data, bytes) and not isinstance(data, memoryview): 332 data = data.encode() 333 while (self.write_pointer+len(data)) > len(self.buffer_block): 334 new_buffer_block = bytearray(len(self.buffer_block) * 2) 335 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] 336 self.buffer_block = new_buffer_block 337 self.buffer_view = memoryview(self.buffer_block) 338 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data 339 self.write_pointer += len(data) 340 self._locator = None 341 else: 342 raise AssertionError("Buffer block is not writable") 343 344 STATE_TRANSITIONS = frozenset([ 345 (WRITABLE, PENDING), 346 (PENDING, COMMITTED), 347 (PENDING, ERROR), 348 (ERROR, PENDING)]) 349 350 @synchronized 351 def set_state(self, nextstate, val=None): 352 if (self._state, nextstate) not in self.STATE_TRANSITIONS: 353 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) 354 self._state = nextstate 355 356 if self._state == _BufferBlock.PENDING: 357 self.wait_for_commit.clear() 358 359 if self._state == _BufferBlock.COMMITTED: 360 self._locator = val 361 self.buffer_view = None 362 self.buffer_block = None 363 self.wait_for_commit.set() 364 365 if self._state == _BufferBlock.ERROR: 366 self.error = val 367 self.wait_for_commit.set() 368 369 @synchronized 370 def state(self): 371 return self._state 372 373 def size(self): 374 """The amount of data written to the buffer.""" 375 return self.write_pointer 376 377 @synchronized 378 def locator(self): 379 """The Keep locator for this buffer's contents.""" 380 if self._locator is None: 381 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) 382 return self._locator 383 384 @synchronized 385 def clone(self, new_blockid, owner): 386 if self._state == _BufferBlock.COMMITTED: 387 raise AssertionError("Cannot duplicate committed buffer block") 388 bufferblock = _BufferBlock(new_blockid, self.size(), owner) 389 bufferblock.append(self.buffer_view[0:self.size()]) 390 return bufferblock 391 392 @synchronized 393 def clear(self): 394 self._state = _BufferBlock.DELETED 395 self.owner = None 396 self.buffer_block = None 397 self.buffer_view = None 398 399 @synchronized 400 def repack_writes(self): 401 """Optimize buffer block by repacking segments in file sequence. 402 403 When the client makes random writes, they appear in the buffer block in 404 the sequence they were written rather than the sequence they appear in 405 the file. This makes for inefficient, fragmented manifests. Attempt 406 to optimize by repacking writes in file sequence. 407 408 """ 409 if self._state != _BufferBlock.WRITABLE: 410 raise AssertionError("Cannot repack non-writable block") 411 412 segs = self.owner.segments() 413 414 # Collect the segments that reference the buffer block. 415 bufferblock_segs = [s for s in segs if s.locator == self.blockid] 416 417 # Collect total data referenced by segments (could be smaller than 418 # bufferblock size if a portion of the file was written and 419 # then overwritten). 420 write_total = sum([s.range_size for s in bufferblock_segs]) 421 422 if write_total < self.size() or len(bufferblock_segs) > 1: 423 # If there's more than one segment referencing this block, it is 424 # due to out-of-order writes and will produce a fragmented 425 # manifest, so try to optimize by re-packing into a new buffer. 426 contents = self.buffer_view[0:self.write_pointer].tobytes() 427 new_bb = _BufferBlock(None, write_total, None) 428 for t in bufferblock_segs: 429 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) 430 t.segment_offset = new_bb.size() - t.range_size 431 432 self.buffer_block = new_bb.buffer_block 433 self.buffer_view = new_bb.buffer_view 434 self.write_pointer = new_bb.write_pointer 435 self._locator = None 436 new_bb.clear() 437 self.owner.set_segments(segs) 438 439 def __repr__(self): 440 return "<BufferBlock %s>" % (self.blockid) 441 442 443class NoopLock(object): 444 def __enter__(self): 445 return self 446 447 def __exit__(self, exc_type, exc_value, traceback): 448 pass 449 450 def acquire(self, blocking=False): 451 pass 452 453 def release(self): 454 pass 455 456 457def must_be_writable(orig_func): 458 @functools.wraps(orig_func) 459 def must_be_writable_wrapper(self, *args, **kwargs): 460 if not self.writable(): 461 raise IOError(errno.EROFS, "Collection is read-only.") 462 return orig_func(self, *args, **kwargs) 463 return must_be_writable_wrapper 464 465 466class _BlockManager(object): 467 """BlockManager handles buffer blocks. 468 469 Also handles background block uploads, and background block prefetch for a 470 Collection of ArvadosFiles. 471 472 """ 473 474 DEFAULT_PUT_THREADS = 2 475 476 def __init__(self, keep, 477 copies=None, 478 put_threads=None, 479 num_retries=None, 480 storage_classes_func=None): 481 """keep: KeepClient object to use""" 482 self._keep = keep 483 self._bufferblocks = collections.OrderedDict() 484 self._put_queue = None 485 self._put_threads = None 486 self.lock = threading.Lock() 487 self.prefetch_lookahead = self._keep.num_prefetch_threads 488 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS 489 self.copies = copies 490 self.storage_classes = storage_classes_func or (lambda: []) 491 self._pending_write_size = 0 492 self.threads_lock = threading.Lock() 493 self.padding_block = None 494 self.num_retries = num_retries 495 496 @synchronized 497 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 498 """Allocate a new, empty bufferblock in WRITABLE state and return it. 499 500 :blockid: 501 optional block identifier, otherwise one will be automatically assigned 502 503 :starting_capacity: 504 optional capacity, otherwise will use default capacity 505 506 :owner: 507 ArvadosFile that owns this block 508 509 """ 510 return self._alloc_bufferblock(blockid, starting_capacity, owner) 511 512 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 513 if blockid is None: 514 blockid = str(uuid.uuid4()) 515 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) 516 self._bufferblocks[bufferblock.blockid] = bufferblock 517 return bufferblock 518 519 @synchronized 520 def dup_block(self, block, owner): 521 """Create a new bufferblock initialized with the content of an existing bufferblock. 522 523 :block: 524 the buffer block to copy. 525 526 :owner: 527 ArvadosFile that owns the new block 528 529 """ 530 new_blockid = str(uuid.uuid4()) 531 bufferblock = block.clone(new_blockid, owner) 532 self._bufferblocks[bufferblock.blockid] = bufferblock 533 return bufferblock 534 535 @synchronized 536 def is_bufferblock(self, locator): 537 return locator in self._bufferblocks 538 539 def _commit_bufferblock_worker(self): 540 """Background uploader thread.""" 541 542 while True: 543 try: 544 bufferblock = self._put_queue.get() 545 if bufferblock is None: 546 return 547 548 if self.copies is None: 549 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 550 else: 551 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 552 bufferblock.set_state(_BufferBlock.COMMITTED, loc) 553 except Exception as e: 554 bufferblock.set_state(_BufferBlock.ERROR, e) 555 finally: 556 if self._put_queue is not None: 557 self._put_queue.task_done() 558 559 def start_put_threads(self): 560 with self.threads_lock: 561 if self._put_threads is None: 562 # Start uploader threads. 563 564 # If we don't limit the Queue size, the upload queue can quickly 565 # grow to take up gigabytes of RAM if the writing process is 566 # generating data more quickly than it can be sent to the Keep 567 # servers. 568 # 569 # With two upload threads and a queue size of 2, this means up to 4 570 # blocks pending. If they are full 64 MiB blocks, that means up to 571 # 256 MiB of internal buffering, which is the same size as the 572 # default download block cache in KeepClient. 573 self._put_queue = queue.Queue(maxsize=2) 574 575 self._put_threads = [] 576 for i in range(0, self.num_put_threads): 577 thread = threading.Thread(target=self._commit_bufferblock_worker) 578 self._put_threads.append(thread) 579 thread.daemon = True 580 thread.start() 581 582 @synchronized 583 def stop_threads(self): 584 """Shut down and wait for background upload and download threads to finish.""" 585 586 if self._put_threads is not None: 587 for t in self._put_threads: 588 self._put_queue.put(None) 589 for t in self._put_threads: 590 t.join() 591 self._put_threads = None 592 self._put_queue = None 593 594 def __enter__(self): 595 return self 596 597 def __exit__(self, exc_type, exc_value, traceback): 598 self.stop_threads() 599 600 @synchronized 601 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): 602 """Packs small blocks together before uploading""" 603 604 self._pending_write_size += closed_file_size 605 606 # Check if there are enough small blocks for filling up one in full 607 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): 608 return 609 610 # Search blocks ready for getting packed together before being 611 # committed to Keep. 612 # A WRITABLE block always has an owner. 613 # A WRITABLE block with its owner.closed() implies that its 614 # size is <= KEEP_BLOCK_SIZE/2. 615 try: 616 small_blocks = [b for b in self._bufferblocks.values() 617 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] 618 except AttributeError: 619 # Writable blocks without owner shouldn't exist. 620 raise UnownedBlockError() 621 622 if len(small_blocks) <= 1: 623 # Not enough small blocks for repacking 624 return 625 626 for bb in small_blocks: 627 bb.repack_writes() 628 629 # Update the pending write size count with its true value, just in case 630 # some small file was opened, written and closed several times. 631 self._pending_write_size = sum([b.size() for b in small_blocks]) 632 633 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: 634 return 635 636 new_bb = self._alloc_bufferblock() 637 new_bb.owner = [] 638 files = [] 639 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: 640 bb = small_blocks.pop(0) 641 new_bb.owner.append(bb.owner) 642 self._pending_write_size -= bb.size() 643 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) 644 files.append((bb, new_bb.write_pointer - bb.size())) 645 646 self.commit_bufferblock(new_bb, sync=sync) 647 648 for bb, new_bb_segment_offset in files: 649 newsegs = bb.owner.segments() 650 for s in newsegs: 651 if s.locator == bb.blockid: 652 s.locator = new_bb.blockid 653 s.segment_offset = new_bb_segment_offset+s.segment_offset 654 bb.owner.set_segments(newsegs) 655 self._delete_bufferblock(bb.blockid) 656 657 def commit_bufferblock(self, block, sync): 658 """Initiate a background upload of a bufferblock. 659 660 :block: 661 The block object to upload 662 663 :sync: 664 If `sync` is True, upload the block synchronously. 665 If `sync` is False, upload the block asynchronously. This will 666 return immediately unless the upload queue is at capacity, in 667 which case it will wait on an upload queue slot. 668 669 """ 670 try: 671 # Mark the block as PENDING so to disallow any more appends. 672 block.set_state(_BufferBlock.PENDING) 673 except StateChangeError as e: 674 if e.state == _BufferBlock.PENDING: 675 if sync: 676 block.wait_for_commit.wait() 677 else: 678 return 679 if block.state() == _BufferBlock.COMMITTED: 680 return 681 elif block.state() == _BufferBlock.ERROR: 682 raise block.error 683 else: 684 raise 685 686 if sync: 687 try: 688 if self.copies is None: 689 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 690 else: 691 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 692 block.set_state(_BufferBlock.COMMITTED, loc) 693 except Exception as e: 694 block.set_state(_BufferBlock.ERROR, e) 695 raise 696 else: 697 self.start_put_threads() 698 self._put_queue.put(block) 699 700 @synchronized 701 def get_bufferblock(self, locator): 702 return self._bufferblocks.get(locator) 703 704 @synchronized 705 def get_padding_block(self): 706 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding 707 when using truncate() to extend the size of a file. 708 709 For reference (and possible future optimization), the md5sum of the 710 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 711 712 """ 713 714 if self.padding_block is None: 715 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) 716 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE 717 self.commit_bufferblock(self.padding_block, False) 718 return self.padding_block 719 720 @synchronized 721 def delete_bufferblock(self, locator): 722 self._delete_bufferblock(locator) 723 724 def _delete_bufferblock(self, locator): 725 if locator in self._bufferblocks: 726 bb = self._bufferblocks[locator] 727 bb.clear() 728 del self._bufferblocks[locator] 729 730 def get_block_contents(self, locator, num_retries, cache_only=False): 731 """Fetch a block. 732 733 First checks to see if the locator is a BufferBlock and return that, if 734 not, passes the request through to KeepClient.get(). 735 736 """ 737 with self.lock: 738 if locator in self._bufferblocks: 739 bufferblock = self._bufferblocks[locator] 740 if bufferblock.state() != _BufferBlock.COMMITTED: 741 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() 742 else: 743 locator = bufferblock._locator 744 if cache_only: 745 return self._keep.get_from_cache(locator) 746 else: 747 return self._keep.get(locator, num_retries=num_retries) 748 749 def commit_all(self): 750 """Commit all outstanding buffer blocks. 751 752 This is a synchronous call, and will not return until all buffer blocks 753 are uploaded. Raises KeepWriteError() if any blocks failed to upload. 754 755 """ 756 self.repack_small_blocks(force=True, sync=True) 757 758 with self.lock: 759 items = list(self._bufferblocks.items()) 760 761 for k,v in items: 762 if v.state() != _BufferBlock.COMMITTED and v.owner: 763 # Ignore blocks with a list of owners, as if they're not in COMMITTED 764 # state, they're already being committed asynchronously. 765 if isinstance(v.owner, ArvadosFile): 766 v.owner.flush(sync=False) 767 768 with self.lock: 769 if self._put_queue is not None: 770 self._put_queue.join() 771 772 err = [] 773 for k,v in items: 774 if v.state() == _BufferBlock.ERROR: 775 err.append((v.locator(), v.error)) 776 if err: 777 raise KeepWriteError("Error writing some blocks", err, label="block") 778 779 for k,v in items: 780 # flush again with sync=True to remove committed bufferblocks from 781 # the segments. 782 if v.owner: 783 if isinstance(v.owner, ArvadosFile): 784 v.owner.flush(sync=True) 785 elif isinstance(v.owner, list) and len(v.owner) > 0: 786 # This bufferblock is referenced by many files as a result 787 # of repacking small blocks, so don't delete it when flushing 788 # its owners, just do it after flushing them all. 789 for owner in v.owner: 790 owner.flush(sync=True) 791 self.delete_bufferblock(k) 792 793 self.stop_threads() 794 795 def block_prefetch(self, locator): 796 """Initiate a background download of a block. 797 """ 798 799 if not self.prefetch_lookahead: 800 return 801 802 with self.lock: 803 if locator in self._bufferblocks: 804 return 805 806 self._keep.block_prefetch(locator) 807 808 809class ArvadosFile(object): 810 """Represent a file in a Collection. 811 812 ArvadosFile manages the underlying representation of a file in Keep as a 813 sequence of segments spanning a set of blocks, and implements random 814 read/write access. 815 816 This object may be accessed from multiple threads. 817 818 """ 819 820 __slots__ = ('parent', 'name', '_writers', '_committed', 821 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 822 823 def __init__(self, parent, name, stream=[], segments=[]): 824 """ 825 ArvadosFile constructor. 826 827 :stream: 828 a list of Range objects representing a block stream 829 830 :segments: 831 a list of Range objects representing segments 832 """ 833 self.parent = parent 834 self.name = name 835 self._writers = set() 836 self._committed = False 837 self._segments = [] 838 self.lock = parent.root_collection().lock 839 for s in segments: 840 self._add_segment(stream, s.locator, s.range_size) 841 self._current_bblock = None 842 self._read_counter = 0 843 844 def writable(self): 845 return self.parent.writable() 846 847 @synchronized 848 def permission_expired(self, as_of_dt=None): 849 """Returns True if any of the segment's locators is expired""" 850 for r in self._segments: 851 if KeepLocator(r.locator).permission_expired(as_of_dt): 852 return True 853 return False 854 855 @synchronized 856 def has_remote_blocks(self): 857 """Returns True if any of the segment's locators has a +R signature""" 858 859 for s in self._segments: 860 if '+R' in s.locator: 861 return True 862 return False 863 864 @synchronized 865 def _copy_remote_blocks(self, remote_blocks={}): 866 """Ask Keep to copy remote blocks and point to their local copies. 867 868 This is called from the parent Collection. 869 870 :remote_blocks: 871 Shared cache of remote to local block mappings. This is used to avoid 872 doing extra work when blocks are shared by more than one file in 873 different subdirectories. 874 """ 875 876 for s in self._segments: 877 if '+R' in s.locator: 878 try: 879 loc = remote_blocks[s.locator] 880 except KeyError: 881 loc = self.parent._my_keep().refresh_signature(s.locator) 882 remote_blocks[s.locator] = loc 883 s.locator = loc 884 self.parent.set_committed(False) 885 return remote_blocks 886 887 @synchronized 888 def segments(self): 889 return copy.copy(self._segments) 890 891 @synchronized 892 def clone(self, new_parent, new_name): 893 """Make a copy of this file.""" 894 cp = ArvadosFile(new_parent, new_name) 895 cp.replace_contents(self) 896 return cp 897 898 @must_be_writable 899 @synchronized 900 def replace_contents(self, other): 901 """Replace segments of this file with segments from another `ArvadosFile` object.""" 902 903 map_loc = {} 904 self._segments = [] 905 for other_segment in other.segments(): 906 new_loc = other_segment.locator 907 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 908 if other_segment.locator not in map_loc: 909 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 910 if bufferblock.state() != _BufferBlock.WRITABLE: 911 map_loc[other_segment.locator] = bufferblock.locator() 912 else: 913 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 914 new_loc = map_loc[other_segment.locator] 915 916 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 917 918 self.set_committed(False) 919 920 def __eq__(self, other): 921 if other is self: 922 return True 923 if not isinstance(other, ArvadosFile): 924 return False 925 926 othersegs = other.segments() 927 with self.lock: 928 if len(self._segments) != len(othersegs): 929 return False 930 for i in range(0, len(othersegs)): 931 seg1 = self._segments[i] 932 seg2 = othersegs[i] 933 loc1 = seg1.locator 934 loc2 = seg2.locator 935 936 if self.parent._my_block_manager().is_bufferblock(loc1): 937 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 938 939 if other.parent._my_block_manager().is_bufferblock(loc2): 940 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 941 942 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 943 seg1.range_start != seg2.range_start or 944 seg1.range_size != seg2.range_size or 945 seg1.segment_offset != seg2.segment_offset): 946 return False 947 948 return True 949 950 def __ne__(self, other): 951 return not self.__eq__(other) 952 953 @synchronized 954 def set_segments(self, segs): 955 self._segments = segs 956 957 @synchronized 958 def set_committed(self, value=True): 959 """Set committed flag. 960 961 If value is True, set committed to be True. 962 963 If value is False, set committed to be False for this and all parents. 964 """ 965 if value == self._committed: 966 return 967 self._committed = value 968 if self._committed is False and self.parent is not None: 969 self.parent.set_committed(False) 970 971 @synchronized 972 def committed(self): 973 """Get whether this is committed or not.""" 974 return self._committed 975 976 @synchronized 977 def add_writer(self, writer): 978 """Add an ArvadosFileWriter reference to the list of writers""" 979 if isinstance(writer, ArvadosFileWriter): 980 self._writers.add(writer) 981 982 @synchronized 983 def remove_writer(self, writer, flush): 984 """ 985 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 986 and do some block maintenance tasks. 987 """ 988 self._writers.remove(writer) 989 990 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 991 # File writer closed, not small enough for repacking 992 self.flush() 993 elif self.closed(): 994 # All writers closed and size is adequate for repacking 995 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 996 997 def closed(self): 998 """ 999 Get whether this is closed or not. When the writers list is empty, the file 1000 is supposed to be closed. 1001 """ 1002 return len(self._writers) == 0 1003 1004 @must_be_writable 1005 @synchronized 1006 def truncate(self, size): 1007 """Shrink or expand the size of the file. 1008 1009 If `size` is less than the size of the file, the file contents after 1010 `size` will be discarded. If `size` is greater than the current size 1011 of the file, it will be filled with zero bytes. 1012 1013 """ 1014 if size < self.size(): 1015 new_segs = [] 1016 for r in self._segments: 1017 range_end = r.range_start+r.range_size 1018 if r.range_start >= size: 1019 # segment is past the trucate size, all done 1020 break 1021 elif size < range_end: 1022 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1023 nr.segment_offset = r.segment_offset 1024 new_segs.append(nr) 1025 break 1026 else: 1027 new_segs.append(r) 1028 1029 self._segments = new_segs 1030 self.set_committed(False) 1031 elif size > self.size(): 1032 padding = self.parent._my_block_manager().get_padding_block() 1033 diff = size - self.size() 1034 while diff > config.KEEP_BLOCK_SIZE: 1035 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1036 diff -= config.KEEP_BLOCK_SIZE 1037 if diff > 0: 1038 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1039 self.set_committed(False) 1040 else: 1041 # size == self.size() 1042 pass 1043 1044 def readfrom(self, offset, size, num_retries, exact=False): 1045 """Read up to `size` bytes from the file starting at `offset`. 1046 1047 :exact: 1048 If False (default), return less data than requested if the read 1049 crosses a block boundary and the next block isn't cached. If True, 1050 only return less data than requested when hitting EOF. 1051 """ 1052 1053 with self.lock: 1054 if size == 0 or offset >= self.size(): 1055 return b'' 1056 readsegs = locators_and_ranges(self._segments, offset, size) 1057 1058 prefetch = None 1059 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1060 if prefetch_lookahead: 1061 # Doing prefetch on every read() call is surprisingly expensive 1062 # when we're trying to deliver data at 600+ MiBps and want 1063 # the read() fast path to be as lightweight as possible. 1064 # 1065 # Only prefetching every 128 read operations 1066 # dramatically reduces the overhead while still 1067 # getting the benefit of prefetching (e.g. when 1068 # reading 128 KiB at a time, it checks for prefetch 1069 # every 16 MiB). 1070 self._read_counter = (self._read_counter+1) % 128 1071 if self._read_counter == 1: 1072 prefetch = locators_and_ranges(self._segments, 1073 offset + size, 1074 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1075 limit=(1+prefetch_lookahead)) 1076 1077 locs = set() 1078 data = [] 1079 for lr in readsegs: 1080 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1081 if block: 1082 blockview = memoryview(block) 1083 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1084 locs.add(lr.locator) 1085 else: 1086 break 1087 1088 if prefetch: 1089 for lr in prefetch: 1090 if lr.locator not in locs: 1091 self.parent._my_block_manager().block_prefetch(lr.locator) 1092 locs.add(lr.locator) 1093 1094 if len(data) == 1: 1095 return data[0] 1096 else: 1097 return b''.join(data) 1098 1099 @must_be_writable 1100 @synchronized 1101 def writeto(self, offset, data, num_retries): 1102 """Write `data` to the file starting at `offset`. 1103 1104 This will update existing bytes and/or extend the size of the file as 1105 necessary. 1106 1107 """ 1108 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1109 data = data.encode() 1110 if len(data) == 0: 1111 return 1112 1113 if offset > self.size(): 1114 self.truncate(offset) 1115 1116 if len(data) > config.KEEP_BLOCK_SIZE: 1117 # Chunk it up into smaller writes 1118 n = 0 1119 dataview = memoryview(data) 1120 while n < len(data): 1121 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1122 n += config.KEEP_BLOCK_SIZE 1123 return 1124 1125 self.set_committed(False) 1126 1127 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1128 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1129 1130 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1131 self._current_bblock.repack_writes() 1132 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1133 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1134 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1135 1136 self._current_bblock.append(data) 1137 1138 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1139 1140 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1141 1142 return len(data) 1143 1144 @synchronized 1145 def flush(self, sync=True, num_retries=0): 1146 """Flush the current bufferblock to Keep. 1147 1148 :sync: 1149 If True, commit block synchronously, wait until buffer block has been written. 1150 If False, commit block asynchronously, return immediately after putting block into 1151 the keep put queue. 1152 """ 1153 if self.committed(): 1154 return 1155 1156 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1157 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1158 self._current_bblock.repack_writes() 1159 if self._current_bblock.state() != _BufferBlock.DELETED: 1160 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1161 1162 if sync: 1163 to_delete = set() 1164 for s in self._segments: 1165 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1166 if bb: 1167 if bb.state() != _BufferBlock.COMMITTED: 1168 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1169 to_delete.add(s.locator) 1170 s.locator = bb.locator() 1171 for s in to_delete: 1172 # Don't delete the bufferblock if it's owned by many files. It'll be 1173 # deleted after all of its owners are flush()ed. 1174 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1175 self.parent._my_block_manager().delete_bufferblock(s) 1176 1177 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1178 1179 @must_be_writable 1180 @synchronized 1181 def add_segment(self, blocks, pos, size): 1182 """Add a segment to the end of the file. 1183 1184 `pos` and `offset` reference a section of the stream described by 1185 `blocks` (a list of Range objects) 1186 1187 """ 1188 self._add_segment(blocks, pos, size) 1189 1190 def _add_segment(self, blocks, pos, size): 1191 """Internal implementation of add_segment.""" 1192 self.set_committed(False) 1193 for lr in locators_and_ranges(blocks, pos, size): 1194 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1195 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1196 self._segments.append(r) 1197 1198 @synchronized 1199 def size(self): 1200 """Get the file size.""" 1201 if self._segments: 1202 n = self._segments[-1] 1203 return n.range_start + n.range_size 1204 else: 1205 return 0 1206 1207 @synchronized 1208 def manifest_text(self, stream_name=".", portable_locators=False, 1209 normalize=False, only_committed=False): 1210 buf = "" 1211 filestream = [] 1212 for segment in self._segments: 1213 loc = segment.locator 1214 if self.parent._my_block_manager().is_bufferblock(loc): 1215 if only_committed: 1216 continue 1217 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1218 if portable_locators: 1219 loc = KeepLocator(loc).stripped() 1220 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1221 segment.segment_offset, segment.range_size)) 1222 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1223 buf += "\n" 1224 return buf 1225 1226 @must_be_writable 1227 @synchronized 1228 def _reparent(self, newparent, newname): 1229 self.set_committed(False) 1230 self.flush(sync=True) 1231 self.parent.remove(self.name) 1232 self.parent = newparent 1233 self.name = newname 1234 self.lock = self.parent.root_collection().lock 1235 1236 1237class ArvadosFileReader(ArvadosFileReaderBase): 1238 """Wraps ArvadosFile in a file-like object supporting reading only. 1239 1240 Be aware that this class is NOT thread safe as there is no locking around 1241 updating file pointer. 1242 1243 """ 1244 1245 def __init__(self, arvadosfile, mode="r", num_retries=None): 1246 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1247 self.arvadosfile = arvadosfile 1248 1249 def size(self): 1250 return self.arvadosfile.size() 1251 1252 def stream_name(self): 1253 return self.arvadosfile.parent.stream_name() 1254 1255 def readinto(self, b): 1256 data = self.read(len(b)) 1257 b[:len(data)] = data 1258 return len(data) 1259 1260 @_FileLikeObjectBase._before_close 1261 @retry_method 1262 def read(self, size=None, num_retries=None): 1263 """Read up to `size` bytes from the file and return the result. 1264 1265 Starts at the current file position. If `size` is None, read the 1266 entire remainder of the file. 1267 """ 1268 if size is None: 1269 data = [] 1270 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1271 while rd: 1272 data.append(rd) 1273 self._filepos += len(rd) 1274 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1275 return b''.join(data) 1276 else: 1277 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1278 self._filepos += len(data) 1279 return data 1280 1281 @_FileLikeObjectBase._before_close 1282 @retry_method 1283 def readfrom(self, offset, size, num_retries=None): 1284 """Read up to `size` bytes from the stream, starting at the specified file offset. 1285 1286 This method does not change the file position. 1287 """ 1288 return self.arvadosfile.readfrom(offset, size, num_retries) 1289 1290 def flush(self): 1291 pass 1292 1293 1294class ArvadosFileWriter(ArvadosFileReader): 1295 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1296 1297 Be aware that this class is NOT thread safe as there is no locking around 1298 updating file pointer. 1299 1300 """ 1301 1302 def __init__(self, arvadosfile, mode, num_retries=None): 1303 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1304 self.arvadosfile.add_writer(self) 1305 1306 def writable(self): 1307 return True 1308 1309 @_FileLikeObjectBase._before_close 1310 @retry_method 1311 def write(self, data, num_retries=None): 1312 if self.mode[0] == "a": 1313 self._filepos = self.size() 1314 self.arvadosfile.writeto(self._filepos, data, num_retries) 1315 self._filepos += len(data) 1316 return len(data) 1317 1318 @_FileLikeObjectBase._before_close 1319 @retry_method 1320 def writelines(self, seq, num_retries=None): 1321 for s in seq: 1322 self.write(s, num_retries=num_retries) 1323 1324 @_FileLikeObjectBase._before_close 1325 def truncate(self, size=None): 1326 if size is None: 1327 size = self._filepos 1328 self.arvadosfile.truncate(size) 1329 1330 @_FileLikeObjectBase._before_close 1331 def flush(self): 1332 self.arvadosfile.flush() 1333 1334 def close(self, flush=True): 1335 if not self.closed: 1336 self.arvadosfile.remove_writer(self, flush) 1337 super(ArvadosFileWriter, self).close() 1338 1339 1340class WrappableFile(object): 1341 """An interface to an Arvados file that's compatible with io wrappers. 1342 1343 """ 1344 def __init__(self, f): 1345 self.f = f 1346 self.closed = False 1347 def close(self): 1348 self.closed = True 1349 return self.f.close() 1350 def flush(self): 1351 return self.f.flush() 1352 def read(self, *args, **kwargs): 1353 return self.f.read(*args, **kwargs) 1354 def readable(self): 1355 return self.f.readable() 1356 def readinto(self, *args, **kwargs): 1357 return self.f.readinto(*args, **kwargs) 1358 def seek(self, *args, **kwargs): 1359 return self.f.seek(*args, **kwargs) 1360 def seekable(self): 1361 return self.f.seekable() 1362 def tell(self): 1363 return self.f.tell() 1364 def writable(self): 1365 return self.f.writable() 1366 def write(self, *args, **kwargs): 1367 return self.f.write(*args, **kwargs)
33def split(path): 34 """split(path) -> streamname, filename 35 36 Separate the stream name and file name in a /-separated stream path and 37 return a tuple (stream_name, file_name). If no stream name is available, 38 assume '.'. 39 40 """ 41 try: 42 stream_name, file_name = path.rsplit('/', 1) 43 except ValueError: # No / in string 44 stream_name, file_name = '.', path 45 return stream_name, file_name
split(path) -> streamname, filename
Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.
48class UnownedBlockError(Exception): 49 """Raised when there's an writable block without an owner on the BlockManager.""" 50 pass
Raised when there’s an writable block without an owner on the BlockManager.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
82class ArvadosFileReaderBase(_FileLikeObjectBase): 83 def __init__(self, name, mode, num_retries=None): 84 super(ArvadosFileReaderBase, self).__init__(name, mode) 85 self._filepos = 0 86 self.num_retries = num_retries 87 self._readline_cache = (None, None) 88 89 def __iter__(self): 90 while True: 91 data = self.readline() 92 if not data: 93 break 94 yield data 95 96 def decompressed_name(self): 97 return re.sub(r'\.(bz2|gz)$', '', self.name) 98 99 @_FileLikeObjectBase._before_close 100 def seek(self, pos, whence=os.SEEK_SET): 101 if whence == os.SEEK_CUR: 102 pos += self._filepos 103 elif whence == os.SEEK_END: 104 pos += self.size() 105 if pos < 0: 106 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 107 self._filepos = pos 108 return self._filepos 109 110 def tell(self): 111 return self._filepos 112 113 def readable(self): 114 return True 115 116 def writable(self): 117 return False 118 119 def seekable(self): 120 return True 121 122 @_FileLikeObjectBase._before_close 123 @retry_method 124 def readall(self, size=2**20, num_retries=None): 125 while True: 126 data = self.read(size, num_retries=num_retries) 127 if len(data) == 0: 128 break 129 yield data 130 131 @_FileLikeObjectBase._before_close 132 @retry_method 133 def readline(self, size=float('inf'), num_retries=None): 134 cache_pos, cache_data = self._readline_cache 135 if self.tell() == cache_pos: 136 data = [cache_data] 137 self._filepos += len(cache_data) 138 else: 139 data = [b''] 140 data_size = len(data[-1]) 141 while (data_size < size) and (b'\n' not in data[-1]): 142 next_read = self.read(2 ** 20, num_retries=num_retries) 143 if not next_read: 144 break 145 data.append(next_read) 146 data_size += len(next_read) 147 data = b''.join(data) 148 try: 149 nextline_index = data.index(b'\n') + 1 150 except ValueError: 151 nextline_index = len(data) 152 nextline_index = min(nextline_index, size) 153 self._filepos -= len(data) - nextline_index 154 self._readline_cache = (self.tell(), data[nextline_index:]) 155 return data[:nextline_index].decode() 156 157 @_FileLikeObjectBase._before_close 158 @retry_method 159 def decompress(self, decompress, size, num_retries=None): 160 for segment in self.readall(size, num_retries=num_retries): 161 data = decompress(segment) 162 if data: 163 yield data 164 165 @_FileLikeObjectBase._before_close 166 @retry_method 167 def readall_decompressed(self, size=2**20, num_retries=None): 168 self.seek(0) 169 if self.name.endswith('.bz2'): 170 dc = bz2.BZ2Decompressor() 171 return self.decompress(dc.decompress, size, 172 num_retries=num_retries) 173 elif self.name.endswith('.gz'): 174 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 175 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 176 size, num_retries=num_retries) 177 else: 178 return self.readall(size, num_retries=num_retries) 179 180 @_FileLikeObjectBase._before_close 181 @retry_method 182 def readlines(self, sizehint=float('inf'), num_retries=None): 183 data = [] 184 data_size = 0 185 for s in self.readall(num_retries=num_retries): 186 data.append(s) 187 data_size += len(s) 188 if data_size >= sizehint: 189 break 190 return b''.join(data).decode().splitlines(True) 191 192 def size(self): 193 raise IOError(errno.ENOSYS, "Not implemented") 194 195 def read(self, size, num_retries=None): 196 raise IOError(errno.ENOSYS, "Not implemented") 197 198 def readfrom(self, start, size, num_retries=None): 199 raise IOError(errno.ENOSYS, "Not implemented")
99 @_FileLikeObjectBase._before_close 100 def seek(self, pos, whence=os.SEEK_SET): 101 if whence == os.SEEK_CUR: 102 pos += self._filepos 103 elif whence == os.SEEK_END: 104 pos += self.size() 105 if pos < 0: 106 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 107 self._filepos = pos 108 return self._filepos
131 @_FileLikeObjectBase._before_close 132 @retry_method 133 def readline(self, size=float('inf'), num_retries=None): 134 cache_pos, cache_data = self._readline_cache 135 if self.tell() == cache_pos: 136 data = [cache_data] 137 self._filepos += len(cache_data) 138 else: 139 data = [b''] 140 data_size = len(data[-1]) 141 while (data_size < size) and (b'\n' not in data[-1]): 142 next_read = self.read(2 ** 20, num_retries=num_retries) 143 if not next_read: 144 break 145 data.append(next_read) 146 data_size += len(next_read) 147 data = b''.join(data) 148 try: 149 nextline_index = data.index(b'\n') + 1 150 except ValueError: 151 nextline_index = len(data) 152 nextline_index = min(nextline_index, size) 153 self._filepos -= len(data) - nextline_index 154 self._readline_cache = (self.tell(), data[nextline_index:]) 155 return data[:nextline_index].decode()
165 @_FileLikeObjectBase._before_close 166 @retry_method 167 def readall_decompressed(self, size=2**20, num_retries=None): 168 self.seek(0) 169 if self.name.endswith('.bz2'): 170 dc = bz2.BZ2Decompressor() 171 return self.decompress(dc.decompress, size, 172 num_retries=num_retries) 173 elif self.name.endswith('.gz'): 174 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 175 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 176 size, num_retries=num_retries) 177 else: 178 return self.readall(size, num_retries=num_retries)
180 @_FileLikeObjectBase._before_close 181 @retry_method 182 def readlines(self, sizehint=float('inf'), num_retries=None): 183 data = [] 184 data_size = 0 185 for s in self.readall(num_retries=num_retries): 186 data.append(s) 187 data_size += len(s) 188 if data_size >= sizehint: 189 break 190 return b''.join(data).decode().splitlines(True)
Inherited Members
202class StreamFileReader(ArvadosFileReaderBase): 203 class _NameAttribute(str): 204 # The Python file API provides a plain .name attribute. 205 # Older SDK provided a name() method. 206 # This class provides both, for maximum compatibility. 207 def __call__(self): 208 return self 209 210 def __init__(self, stream, segments, name): 211 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 212 self._stream = stream 213 self.segments = segments 214 215 def stream_name(self): 216 return self._stream.name() 217 218 def size(self): 219 n = self.segments[-1] 220 return n.range_start + n.range_size 221 222 @_FileLikeObjectBase._before_close 223 @retry_method 224 def read(self, size, num_retries=None): 225 """Read up to 'size' bytes from the stream, starting at the current file position""" 226 if size == 0: 227 return b'' 228 229 data = b'' 230 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 231 if available_chunks: 232 lr = available_chunks[0] 233 data = self._stream.readfrom(lr.locator+lr.segment_offset, 234 lr.segment_size, 235 num_retries=num_retries) 236 237 self._filepos += len(data) 238 return data 239 240 @_FileLikeObjectBase._before_close 241 @retry_method 242 def readfrom(self, start, size, num_retries=None): 243 """Read up to 'size' bytes from the stream, starting at 'start'""" 244 if size == 0: 245 return b'' 246 247 data = [] 248 for lr in locators_and_ranges(self.segments, start, size): 249 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 250 num_retries=num_retries)) 251 return b''.join(data) 252 253 def as_manifest(self): 254 segs = [] 255 for r in self.segments: 256 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 257 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
222 @_FileLikeObjectBase._before_close 223 @retry_method 224 def read(self, size, num_retries=None): 225 """Read up to 'size' bytes from the stream, starting at the current file position""" 226 if size == 0: 227 return b'' 228 229 data = b'' 230 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 231 if available_chunks: 232 lr = available_chunks[0] 233 data = self._stream.readfrom(lr.locator+lr.segment_offset, 234 lr.segment_size, 235 num_retries=num_retries) 236 237 self._filepos += len(data) 238 return data
Read up to ‘size’ bytes from the stream, starting at the current file position
240 @_FileLikeObjectBase._before_close 241 @retry_method 242 def readfrom(self, start, size, num_retries=None): 243 """Read up to 'size' bytes from the stream, starting at 'start'""" 244 if size == 0: 245 return b'' 246 247 data = [] 248 for lr in locators_and_ranges(self.segments, start, size): 249 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 250 num_retries=num_retries)) 251 return b''.join(data)
Read up to ‘size’ bytes from the stream, starting at ‘start’
268class StateChangeError(Exception): 269 def __init__(self, message, state, nextstate): 270 super(StateChangeError, self).__init__(message) 271 self.state = state 272 self.nextstate = nextstate
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
810class ArvadosFile(object): 811 """Represent a file in a Collection. 812 813 ArvadosFile manages the underlying representation of a file in Keep as a 814 sequence of segments spanning a set of blocks, and implements random 815 read/write access. 816 817 This object may be accessed from multiple threads. 818 819 """ 820 821 __slots__ = ('parent', 'name', '_writers', '_committed', 822 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 823 824 def __init__(self, parent, name, stream=[], segments=[]): 825 """ 826 ArvadosFile constructor. 827 828 :stream: 829 a list of Range objects representing a block stream 830 831 :segments: 832 a list of Range objects representing segments 833 """ 834 self.parent = parent 835 self.name = name 836 self._writers = set() 837 self._committed = False 838 self._segments = [] 839 self.lock = parent.root_collection().lock 840 for s in segments: 841 self._add_segment(stream, s.locator, s.range_size) 842 self._current_bblock = None 843 self._read_counter = 0 844 845 def writable(self): 846 return self.parent.writable() 847 848 @synchronized 849 def permission_expired(self, as_of_dt=None): 850 """Returns True if any of the segment's locators is expired""" 851 for r in self._segments: 852 if KeepLocator(r.locator).permission_expired(as_of_dt): 853 return True 854 return False 855 856 @synchronized 857 def has_remote_blocks(self): 858 """Returns True if any of the segment's locators has a +R signature""" 859 860 for s in self._segments: 861 if '+R' in s.locator: 862 return True 863 return False 864 865 @synchronized 866 def _copy_remote_blocks(self, remote_blocks={}): 867 """Ask Keep to copy remote blocks and point to their local copies. 868 869 This is called from the parent Collection. 870 871 :remote_blocks: 872 Shared cache of remote to local block mappings. This is used to avoid 873 doing extra work when blocks are shared by more than one file in 874 different subdirectories. 875 """ 876 877 for s in self._segments: 878 if '+R' in s.locator: 879 try: 880 loc = remote_blocks[s.locator] 881 except KeyError: 882 loc = self.parent._my_keep().refresh_signature(s.locator) 883 remote_blocks[s.locator] = loc 884 s.locator = loc 885 self.parent.set_committed(False) 886 return remote_blocks 887 888 @synchronized 889 def segments(self): 890 return copy.copy(self._segments) 891 892 @synchronized 893 def clone(self, new_parent, new_name): 894 """Make a copy of this file.""" 895 cp = ArvadosFile(new_parent, new_name) 896 cp.replace_contents(self) 897 return cp 898 899 @must_be_writable 900 @synchronized 901 def replace_contents(self, other): 902 """Replace segments of this file with segments from another `ArvadosFile` object.""" 903 904 map_loc = {} 905 self._segments = [] 906 for other_segment in other.segments(): 907 new_loc = other_segment.locator 908 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 909 if other_segment.locator not in map_loc: 910 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 911 if bufferblock.state() != _BufferBlock.WRITABLE: 912 map_loc[other_segment.locator] = bufferblock.locator() 913 else: 914 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 915 new_loc = map_loc[other_segment.locator] 916 917 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 918 919 self.set_committed(False) 920 921 def __eq__(self, other): 922 if other is self: 923 return True 924 if not isinstance(other, ArvadosFile): 925 return False 926 927 othersegs = other.segments() 928 with self.lock: 929 if len(self._segments) != len(othersegs): 930 return False 931 for i in range(0, len(othersegs)): 932 seg1 = self._segments[i] 933 seg2 = othersegs[i] 934 loc1 = seg1.locator 935 loc2 = seg2.locator 936 937 if self.parent._my_block_manager().is_bufferblock(loc1): 938 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 939 940 if other.parent._my_block_manager().is_bufferblock(loc2): 941 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 942 943 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 944 seg1.range_start != seg2.range_start or 945 seg1.range_size != seg2.range_size or 946 seg1.segment_offset != seg2.segment_offset): 947 return False 948 949 return True 950 951 def __ne__(self, other): 952 return not self.__eq__(other) 953 954 @synchronized 955 def set_segments(self, segs): 956 self._segments = segs 957 958 @synchronized 959 def set_committed(self, value=True): 960 """Set committed flag. 961 962 If value is True, set committed to be True. 963 964 If value is False, set committed to be False for this and all parents. 965 """ 966 if value == self._committed: 967 return 968 self._committed = value 969 if self._committed is False and self.parent is not None: 970 self.parent.set_committed(False) 971 972 @synchronized 973 def committed(self): 974 """Get whether this is committed or not.""" 975 return self._committed 976 977 @synchronized 978 def add_writer(self, writer): 979 """Add an ArvadosFileWriter reference to the list of writers""" 980 if isinstance(writer, ArvadosFileWriter): 981 self._writers.add(writer) 982 983 @synchronized 984 def remove_writer(self, writer, flush): 985 """ 986 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 987 and do some block maintenance tasks. 988 """ 989 self._writers.remove(writer) 990 991 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 992 # File writer closed, not small enough for repacking 993 self.flush() 994 elif self.closed(): 995 # All writers closed and size is adequate for repacking 996 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 997 998 def closed(self): 999 """ 1000 Get whether this is closed or not. When the writers list is empty, the file 1001 is supposed to be closed. 1002 """ 1003 return len(self._writers) == 0 1004 1005 @must_be_writable 1006 @synchronized 1007 def truncate(self, size): 1008 """Shrink or expand the size of the file. 1009 1010 If `size` is less than the size of the file, the file contents after 1011 `size` will be discarded. If `size` is greater than the current size 1012 of the file, it will be filled with zero bytes. 1013 1014 """ 1015 if size < self.size(): 1016 new_segs = [] 1017 for r in self._segments: 1018 range_end = r.range_start+r.range_size 1019 if r.range_start >= size: 1020 # segment is past the trucate size, all done 1021 break 1022 elif size < range_end: 1023 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1024 nr.segment_offset = r.segment_offset 1025 new_segs.append(nr) 1026 break 1027 else: 1028 new_segs.append(r) 1029 1030 self._segments = new_segs 1031 self.set_committed(False) 1032 elif size > self.size(): 1033 padding = self.parent._my_block_manager().get_padding_block() 1034 diff = size - self.size() 1035 while diff > config.KEEP_BLOCK_SIZE: 1036 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1037 diff -= config.KEEP_BLOCK_SIZE 1038 if diff > 0: 1039 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1040 self.set_committed(False) 1041 else: 1042 # size == self.size() 1043 pass 1044 1045 def readfrom(self, offset, size, num_retries, exact=False): 1046 """Read up to `size` bytes from the file starting at `offset`. 1047 1048 :exact: 1049 If False (default), return less data than requested if the read 1050 crosses a block boundary and the next block isn't cached. If True, 1051 only return less data than requested when hitting EOF. 1052 """ 1053 1054 with self.lock: 1055 if size == 0 or offset >= self.size(): 1056 return b'' 1057 readsegs = locators_and_ranges(self._segments, offset, size) 1058 1059 prefetch = None 1060 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1061 if prefetch_lookahead: 1062 # Doing prefetch on every read() call is surprisingly expensive 1063 # when we're trying to deliver data at 600+ MiBps and want 1064 # the read() fast path to be as lightweight as possible. 1065 # 1066 # Only prefetching every 128 read operations 1067 # dramatically reduces the overhead while still 1068 # getting the benefit of prefetching (e.g. when 1069 # reading 128 KiB at a time, it checks for prefetch 1070 # every 16 MiB). 1071 self._read_counter = (self._read_counter+1) % 128 1072 if self._read_counter == 1: 1073 prefetch = locators_and_ranges(self._segments, 1074 offset + size, 1075 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1076 limit=(1+prefetch_lookahead)) 1077 1078 locs = set() 1079 data = [] 1080 for lr in readsegs: 1081 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1082 if block: 1083 blockview = memoryview(block) 1084 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1085 locs.add(lr.locator) 1086 else: 1087 break 1088 1089 if prefetch: 1090 for lr in prefetch: 1091 if lr.locator not in locs: 1092 self.parent._my_block_manager().block_prefetch(lr.locator) 1093 locs.add(lr.locator) 1094 1095 if len(data) == 1: 1096 return data[0] 1097 else: 1098 return b''.join(data) 1099 1100 @must_be_writable 1101 @synchronized 1102 def writeto(self, offset, data, num_retries): 1103 """Write `data` to the file starting at `offset`. 1104 1105 This will update existing bytes and/or extend the size of the file as 1106 necessary. 1107 1108 """ 1109 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1110 data = data.encode() 1111 if len(data) == 0: 1112 return 1113 1114 if offset > self.size(): 1115 self.truncate(offset) 1116 1117 if len(data) > config.KEEP_BLOCK_SIZE: 1118 # Chunk it up into smaller writes 1119 n = 0 1120 dataview = memoryview(data) 1121 while n < len(data): 1122 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1123 n += config.KEEP_BLOCK_SIZE 1124 return 1125 1126 self.set_committed(False) 1127 1128 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1129 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1130 1131 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1132 self._current_bblock.repack_writes() 1133 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1134 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1135 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1136 1137 self._current_bblock.append(data) 1138 1139 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1140 1141 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1142 1143 return len(data) 1144 1145 @synchronized 1146 def flush(self, sync=True, num_retries=0): 1147 """Flush the current bufferblock to Keep. 1148 1149 :sync: 1150 If True, commit block synchronously, wait until buffer block has been written. 1151 If False, commit block asynchronously, return immediately after putting block into 1152 the keep put queue. 1153 """ 1154 if self.committed(): 1155 return 1156 1157 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1158 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1159 self._current_bblock.repack_writes() 1160 if self._current_bblock.state() != _BufferBlock.DELETED: 1161 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1162 1163 if sync: 1164 to_delete = set() 1165 for s in self._segments: 1166 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1167 if bb: 1168 if bb.state() != _BufferBlock.COMMITTED: 1169 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1170 to_delete.add(s.locator) 1171 s.locator = bb.locator() 1172 for s in to_delete: 1173 # Don't delete the bufferblock if it's owned by many files. It'll be 1174 # deleted after all of its owners are flush()ed. 1175 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1176 self.parent._my_block_manager().delete_bufferblock(s) 1177 1178 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1179 1180 @must_be_writable 1181 @synchronized 1182 def add_segment(self, blocks, pos, size): 1183 """Add a segment to the end of the file. 1184 1185 `pos` and `offset` reference a section of the stream described by 1186 `blocks` (a list of Range objects) 1187 1188 """ 1189 self._add_segment(blocks, pos, size) 1190 1191 def _add_segment(self, blocks, pos, size): 1192 """Internal implementation of add_segment.""" 1193 self.set_committed(False) 1194 for lr in locators_and_ranges(blocks, pos, size): 1195 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1196 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1197 self._segments.append(r) 1198 1199 @synchronized 1200 def size(self): 1201 """Get the file size.""" 1202 if self._segments: 1203 n = self._segments[-1] 1204 return n.range_start + n.range_size 1205 else: 1206 return 0 1207 1208 @synchronized 1209 def manifest_text(self, stream_name=".", portable_locators=False, 1210 normalize=False, only_committed=False): 1211 buf = "" 1212 filestream = [] 1213 for segment in self._segments: 1214 loc = segment.locator 1215 if self.parent._my_block_manager().is_bufferblock(loc): 1216 if only_committed: 1217 continue 1218 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1219 if portable_locators: 1220 loc = KeepLocator(loc).stripped() 1221 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1222 segment.segment_offset, segment.range_size)) 1223 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1224 buf += "\n" 1225 return buf 1226 1227 @must_be_writable 1228 @synchronized 1229 def _reparent(self, newparent, newname): 1230 self.set_committed(False) 1231 self.flush(sync=True) 1232 self.parent.remove(self.name) 1233 self.parent = newparent 1234 self.name = newname 1235 self.lock = self.parent.root_collection().lock
Represent a file in a Collection.
ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.
This object may be accessed from multiple threads.
824 def __init__(self, parent, name, stream=[], segments=[]): 825 """ 826 ArvadosFile constructor. 827 828 :stream: 829 a list of Range objects representing a block stream 830 831 :segments: 832 a list of Range objects representing segments 833 """ 834 self.parent = parent 835 self.name = name 836 self._writers = set() 837 self._committed = False 838 self._segments = [] 839 self.lock = parent.root_collection().lock 840 for s in segments: 841 self._add_segment(stream, s.locator, s.range_size) 842 self._current_bblock = None 843 self._read_counter = 0
ArvadosFile constructor.
:stream: a list of Range objects representing a block stream
:segments: a list of Range objects representing segments
848 @synchronized 849 def permission_expired(self, as_of_dt=None): 850 """Returns True if any of the segment's locators is expired""" 851 for r in self._segments: 852 if KeepLocator(r.locator).permission_expired(as_of_dt): 853 return True 854 return False
Returns True if any of the segment’s locators is expired
856 @synchronized 857 def has_remote_blocks(self): 858 """Returns True if any of the segment's locators has a +R signature""" 859 860 for s in self._segments: 861 if '+R' in s.locator: 862 return True 863 return False
Returns True if any of the segment’s locators has a +R signature
892 @synchronized 893 def clone(self, new_parent, new_name): 894 """Make a copy of this file.""" 895 cp = ArvadosFile(new_parent, new_name) 896 cp.replace_contents(self) 897 return cp
Make a copy of this file.
899 @must_be_writable 900 @synchronized 901 def replace_contents(self, other): 902 """Replace segments of this file with segments from another `ArvadosFile` object.""" 903 904 map_loc = {} 905 self._segments = [] 906 for other_segment in other.segments(): 907 new_loc = other_segment.locator 908 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 909 if other_segment.locator not in map_loc: 910 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 911 if bufferblock.state() != _BufferBlock.WRITABLE: 912 map_loc[other_segment.locator] = bufferblock.locator() 913 else: 914 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 915 new_loc = map_loc[other_segment.locator] 916 917 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 918 919 self.set_committed(False)
Replace segments of this file with segments from another ArvadosFile
object.
958 @synchronized 959 def set_committed(self, value=True): 960 """Set committed flag. 961 962 If value is True, set committed to be True. 963 964 If value is False, set committed to be False for this and all parents. 965 """ 966 if value == self._committed: 967 return 968 self._committed = value 969 if self._committed is False and self.parent is not None: 970 self.parent.set_committed(False)
Set committed flag.
If value is True, set committed to be True.
If value is False, set committed to be False for this and all parents.
972 @synchronized 973 def committed(self): 974 """Get whether this is committed or not.""" 975 return self._committed
Get whether this is committed or not.
977 @synchronized 978 def add_writer(self, writer): 979 """Add an ArvadosFileWriter reference to the list of writers""" 980 if isinstance(writer, ArvadosFileWriter): 981 self._writers.add(writer)
Add an ArvadosFileWriter reference to the list of writers
983 @synchronized 984 def remove_writer(self, writer, flush): 985 """ 986 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 987 and do some block maintenance tasks. 988 """ 989 self._writers.remove(writer) 990 991 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 992 # File writer closed, not small enough for repacking 993 self.flush() 994 elif self.closed(): 995 # All writers closed and size is adequate for repacking 996 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.
998 def closed(self): 999 """ 1000 Get whether this is closed or not. When the writers list is empty, the file 1001 is supposed to be closed. 1002 """ 1003 return len(self._writers) == 0
Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.
1005 @must_be_writable 1006 @synchronized 1007 def truncate(self, size): 1008 """Shrink or expand the size of the file. 1009 1010 If `size` is less than the size of the file, the file contents after 1011 `size` will be discarded. If `size` is greater than the current size 1012 of the file, it will be filled with zero bytes. 1013 1014 """ 1015 if size < self.size(): 1016 new_segs = [] 1017 for r in self._segments: 1018 range_end = r.range_start+r.range_size 1019 if r.range_start >= size: 1020 # segment is past the trucate size, all done 1021 break 1022 elif size < range_end: 1023 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1024 nr.segment_offset = r.segment_offset 1025 new_segs.append(nr) 1026 break 1027 else: 1028 new_segs.append(r) 1029 1030 self._segments = new_segs 1031 self.set_committed(False) 1032 elif size > self.size(): 1033 padding = self.parent._my_block_manager().get_padding_block() 1034 diff = size - self.size() 1035 while diff > config.KEEP_BLOCK_SIZE: 1036 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1037 diff -= config.KEEP_BLOCK_SIZE 1038 if diff > 0: 1039 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1040 self.set_committed(False) 1041 else: 1042 # size == self.size() 1043 pass
1045 def readfrom(self, offset, size, num_retries, exact=False): 1046 """Read up to `size` bytes from the file starting at `offset`. 1047 1048 :exact: 1049 If False (default), return less data than requested if the read 1050 crosses a block boundary and the next block isn't cached. If True, 1051 only return less data than requested when hitting EOF. 1052 """ 1053 1054 with self.lock: 1055 if size == 0 or offset >= self.size(): 1056 return b'' 1057 readsegs = locators_and_ranges(self._segments, offset, size) 1058 1059 prefetch = None 1060 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1061 if prefetch_lookahead: 1062 # Doing prefetch on every read() call is surprisingly expensive 1063 # when we're trying to deliver data at 600+ MiBps and want 1064 # the read() fast path to be as lightweight as possible. 1065 # 1066 # Only prefetching every 128 read operations 1067 # dramatically reduces the overhead while still 1068 # getting the benefit of prefetching (e.g. when 1069 # reading 128 KiB at a time, it checks for prefetch 1070 # every 16 MiB). 1071 self._read_counter = (self._read_counter+1) % 128 1072 if self._read_counter == 1: 1073 prefetch = locators_and_ranges(self._segments, 1074 offset + size, 1075 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1076 limit=(1+prefetch_lookahead)) 1077 1078 locs = set() 1079 data = [] 1080 for lr in readsegs: 1081 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1082 if block: 1083 blockview = memoryview(block) 1084 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1085 locs.add(lr.locator) 1086 else: 1087 break 1088 1089 if prefetch: 1090 for lr in prefetch: 1091 if lr.locator not in locs: 1092 self.parent._my_block_manager().block_prefetch(lr.locator) 1093 locs.add(lr.locator) 1094 1095 if len(data) == 1: 1096 return data[0] 1097 else: 1098 return b''.join(data)
Read up to size
bytes from the file starting at offset
.
:exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.
1100 @must_be_writable 1101 @synchronized 1102 def writeto(self, offset, data, num_retries): 1103 """Write `data` to the file starting at `offset`. 1104 1105 This will update existing bytes and/or extend the size of the file as 1106 necessary. 1107 1108 """ 1109 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1110 data = data.encode() 1111 if len(data) == 0: 1112 return 1113 1114 if offset > self.size(): 1115 self.truncate(offset) 1116 1117 if len(data) > config.KEEP_BLOCK_SIZE: 1118 # Chunk it up into smaller writes 1119 n = 0 1120 dataview = memoryview(data) 1121 while n < len(data): 1122 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1123 n += config.KEEP_BLOCK_SIZE 1124 return 1125 1126 self.set_committed(False) 1127 1128 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1129 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1130 1131 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1132 self._current_bblock.repack_writes() 1133 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1134 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1135 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1136 1137 self._current_bblock.append(data) 1138 1139 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1140 1141 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1142 1143 return len(data)
Write data
to the file starting at offset
.
This will update existing bytes and/or extend the size of the file as necessary.
1145 @synchronized 1146 def flush(self, sync=True, num_retries=0): 1147 """Flush the current bufferblock to Keep. 1148 1149 :sync: 1150 If True, commit block synchronously, wait until buffer block has been written. 1151 If False, commit block asynchronously, return immediately after putting block into 1152 the keep put queue. 1153 """ 1154 if self.committed(): 1155 return 1156 1157 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1158 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1159 self._current_bblock.repack_writes() 1160 if self._current_bblock.state() != _BufferBlock.DELETED: 1161 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1162 1163 if sync: 1164 to_delete = set() 1165 for s in self._segments: 1166 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1167 if bb: 1168 if bb.state() != _BufferBlock.COMMITTED: 1169 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1170 to_delete.add(s.locator) 1171 s.locator = bb.locator() 1172 for s in to_delete: 1173 # Don't delete the bufferblock if it's owned by many files. It'll be 1174 # deleted after all of its owners are flush()ed. 1175 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1176 self.parent._my_block_manager().delete_bufferblock(s) 1177 1178 self.parent.notify(MOD, self.parent, self.name, (self, self))
Flush the current bufferblock to Keep.
:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.
1180 @must_be_writable 1181 @synchronized 1182 def add_segment(self, blocks, pos, size): 1183 """Add a segment to the end of the file. 1184 1185 `pos` and `offset` reference a section of the stream described by 1186 `blocks` (a list of Range objects) 1187 1188 """ 1189 self._add_segment(blocks, pos, size)
Add a segment to the end of the file.
pos
and offset
reference a section of the stream described by
blocks
(a list of Range objects)
1199 @synchronized 1200 def size(self): 1201 """Get the file size.""" 1202 if self._segments: 1203 n = self._segments[-1] 1204 return n.range_start + n.range_size 1205 else: 1206 return 0
Get the file size.
1208 @synchronized 1209 def manifest_text(self, stream_name=".", portable_locators=False, 1210 normalize=False, only_committed=False): 1211 buf = "" 1212 filestream = [] 1213 for segment in self._segments: 1214 loc = segment.locator 1215 if self.parent._my_block_manager().is_bufferblock(loc): 1216 if only_committed: 1217 continue 1218 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1219 if portable_locators: 1220 loc = KeepLocator(loc).stripped() 1221 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1222 segment.segment_offset, segment.range_size)) 1223 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1224 buf += "\n" 1225 return buf
1238class ArvadosFileReader(ArvadosFileReaderBase): 1239 """Wraps ArvadosFile in a file-like object supporting reading only. 1240 1241 Be aware that this class is NOT thread safe as there is no locking around 1242 updating file pointer. 1243 1244 """ 1245 1246 def __init__(self, arvadosfile, mode="r", num_retries=None): 1247 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1248 self.arvadosfile = arvadosfile 1249 1250 def size(self): 1251 return self.arvadosfile.size() 1252 1253 def stream_name(self): 1254 return self.arvadosfile.parent.stream_name() 1255 1256 def readinto(self, b): 1257 data = self.read(len(b)) 1258 b[:len(data)] = data 1259 return len(data) 1260 1261 @_FileLikeObjectBase._before_close 1262 @retry_method 1263 def read(self, size=None, num_retries=None): 1264 """Read up to `size` bytes from the file and return the result. 1265 1266 Starts at the current file position. If `size` is None, read the 1267 entire remainder of the file. 1268 """ 1269 if size is None: 1270 data = [] 1271 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1272 while rd: 1273 data.append(rd) 1274 self._filepos += len(rd) 1275 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1276 return b''.join(data) 1277 else: 1278 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1279 self._filepos += len(data) 1280 return data 1281 1282 @_FileLikeObjectBase._before_close 1283 @retry_method 1284 def readfrom(self, offset, size, num_retries=None): 1285 """Read up to `size` bytes from the stream, starting at the specified file offset. 1286 1287 This method does not change the file position. 1288 """ 1289 return self.arvadosfile.readfrom(offset, size, num_retries) 1290 1291 def flush(self): 1292 pass
Wraps ArvadosFile in a file-like object supporting reading only.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1261 @_FileLikeObjectBase._before_close 1262 @retry_method 1263 def read(self, size=None, num_retries=None): 1264 """Read up to `size` bytes from the file and return the result. 1265 1266 Starts at the current file position. If `size` is None, read the 1267 entire remainder of the file. 1268 """ 1269 if size is None: 1270 data = [] 1271 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1272 while rd: 1273 data.append(rd) 1274 self._filepos += len(rd) 1275 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1276 return b''.join(data) 1277 else: 1278 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1279 self._filepos += len(data) 1280 return data
1282 @_FileLikeObjectBase._before_close 1283 @retry_method 1284 def readfrom(self, offset, size, num_retries=None): 1285 """Read up to `size` bytes from the stream, starting at the specified file offset. 1286 1287 This method does not change the file position. 1288 """ 1289 return self.arvadosfile.readfrom(offset, size, num_retries)
Read up to size
bytes from the stream, starting at the specified file offset.
This method does not change the file position.
1295class ArvadosFileWriter(ArvadosFileReader): 1296 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1297 1298 Be aware that this class is NOT thread safe as there is no locking around 1299 updating file pointer. 1300 1301 """ 1302 1303 def __init__(self, arvadosfile, mode, num_retries=None): 1304 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1305 self.arvadosfile.add_writer(self) 1306 1307 def writable(self): 1308 return True 1309 1310 @_FileLikeObjectBase._before_close 1311 @retry_method 1312 def write(self, data, num_retries=None): 1313 if self.mode[0] == "a": 1314 self._filepos = self.size() 1315 self.arvadosfile.writeto(self._filepos, data, num_retries) 1316 self._filepos += len(data) 1317 return len(data) 1318 1319 @_FileLikeObjectBase._before_close 1320 @retry_method 1321 def writelines(self, seq, num_retries=None): 1322 for s in seq: 1323 self.write(s, num_retries=num_retries) 1324 1325 @_FileLikeObjectBase._before_close 1326 def truncate(self, size=None): 1327 if size is None: 1328 size = self._filepos 1329 self.arvadosfile.truncate(size) 1330 1331 @_FileLikeObjectBase._before_close 1332 def flush(self): 1333 self.arvadosfile.flush() 1334 1335 def close(self, flush=True): 1336 if not self.closed: 1337 self.arvadosfile.remove_writer(self, flush) 1338 super(ArvadosFileWriter, self).close()
Wraps ArvadosFile in a file-like object supporting both reading and writing.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1341class WrappableFile(object): 1342 """An interface to an Arvados file that's compatible with io wrappers. 1343 1344 """ 1345 def __init__(self, f): 1346 self.f = f 1347 self.closed = False 1348 def close(self): 1349 self.closed = True 1350 return self.f.close() 1351 def flush(self): 1352 return self.f.flush() 1353 def read(self, *args, **kwargs): 1354 return self.f.read(*args, **kwargs) 1355 def readable(self): 1356 return self.f.readable() 1357 def readinto(self, *args, **kwargs): 1358 return self.f.readinto(*args, **kwargs) 1359 def seek(self, *args, **kwargs): 1360 return self.f.seek(*args, **kwargs) 1361 def seekable(self): 1362 return self.f.seekable() 1363 def tell(self): 1364 return self.f.tell() 1365 def writable(self): 1366 return self.f.writable() 1367 def write(self, *args, **kwargs): 1368 return self.f.write(*args, **kwargs)
An interface to an Arvados file that’s compatible with io wrappers.