arvados.arvfile
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import absolute_import 6from __future__ import division 7from future import standard_library 8from future.utils import listitems, listvalues 9standard_library.install_aliases() 10from builtins import range 11from builtins import object 12import bz2 13import collections 14import copy 15import errno 16import functools 17import hashlib 18import logging 19import os 20import queue 21import re 22import sys 23import threading 24import uuid 25import zlib 26 27from . import config 28from .errors import KeepWriteError, AssertionError, ArgumentError 29from .keep import KeepLocator 30from ._normalize_stream import normalize_stream 31from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange 32from .retry import retry_method 33 34MOD = "mod" 35WRITE = "write" 36 37_logger = logging.getLogger('arvados.arvfile') 38 39def split(path): 40 """split(path) -> streamname, filename 41 42 Separate the stream name and file name in a /-separated stream path and 43 return a tuple (stream_name, file_name). If no stream name is available, 44 assume '.'. 45 46 """ 47 try: 48 stream_name, file_name = path.rsplit('/', 1) 49 except ValueError: # No / in string 50 stream_name, file_name = '.', path 51 return stream_name, file_name 52 53 54class UnownedBlockError(Exception): 55 """Raised when there's an writable block without an owner on the BlockManager.""" 56 pass 57 58 59class _FileLikeObjectBase(object): 60 def __init__(self, name, mode): 61 self.name = name 62 self.mode = mode 63 self.closed = False 64 65 @staticmethod 66 def _before_close(orig_func): 67 @functools.wraps(orig_func) 68 def before_close_wrapper(self, *args, **kwargs): 69 if self.closed: 70 raise ValueError("I/O operation on closed stream file") 71 return orig_func(self, *args, **kwargs) 72 return before_close_wrapper 73 74 def __enter__(self): 75 return self 76 77 def __exit__(self, exc_type, exc_value, traceback): 78 try: 79 self.close() 80 except Exception: 81 if exc_type is None: 82 raise 83 84 def close(self): 85 self.closed = True 86 87 88class ArvadosFileReaderBase(_FileLikeObjectBase): 89 def __init__(self, name, mode, num_retries=None): 90 super(ArvadosFileReaderBase, self).__init__(name, mode) 91 self._filepos = 0 92 self.num_retries = num_retries 93 self._readline_cache = (None, None) 94 95 def __iter__(self): 96 while True: 97 data = self.readline() 98 if not data: 99 break 100 yield data 101 102 def decompressed_name(self): 103 return re.sub(r'\.(bz2|gz)$', '', self.name) 104 105 @_FileLikeObjectBase._before_close 106 def seek(self, pos, whence=os.SEEK_SET): 107 if whence == os.SEEK_CUR: 108 pos += self._filepos 109 elif whence == os.SEEK_END: 110 pos += self.size() 111 if pos < 0: 112 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 113 self._filepos = pos 114 return self._filepos 115 116 def tell(self): 117 return self._filepos 118 119 def readable(self): 120 return True 121 122 def writable(self): 123 return False 124 125 def seekable(self): 126 return True 127 128 @_FileLikeObjectBase._before_close 129 @retry_method 130 def readall(self, size=2**20, num_retries=None): 131 while True: 132 data = self.read(size, num_retries=num_retries) 133 if len(data) == 0: 134 break 135 yield data 136 137 @_FileLikeObjectBase._before_close 138 @retry_method 139 def readline(self, size=float('inf'), num_retries=None): 140 cache_pos, cache_data = self._readline_cache 141 if self.tell() == cache_pos: 142 data = [cache_data] 143 self._filepos += len(cache_data) 144 else: 145 data = [b''] 146 data_size = len(data[-1]) 147 while (data_size < size) and (b'\n' not in data[-1]): 148 next_read = self.read(2 ** 20, num_retries=num_retries) 149 if not next_read: 150 break 151 data.append(next_read) 152 data_size += len(next_read) 153 data = b''.join(data) 154 try: 155 nextline_index = data.index(b'\n') + 1 156 except ValueError: 157 nextline_index = len(data) 158 nextline_index = min(nextline_index, size) 159 self._filepos -= len(data) - nextline_index 160 self._readline_cache = (self.tell(), data[nextline_index:]) 161 return data[:nextline_index].decode() 162 163 @_FileLikeObjectBase._before_close 164 @retry_method 165 def decompress(self, decompress, size, num_retries=None): 166 for segment in self.readall(size, num_retries=num_retries): 167 data = decompress(segment) 168 if data: 169 yield data 170 171 @_FileLikeObjectBase._before_close 172 @retry_method 173 def readall_decompressed(self, size=2**20, num_retries=None): 174 self.seek(0) 175 if self.name.endswith('.bz2'): 176 dc = bz2.BZ2Decompressor() 177 return self.decompress(dc.decompress, size, 178 num_retries=num_retries) 179 elif self.name.endswith('.gz'): 180 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 181 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 182 size, num_retries=num_retries) 183 else: 184 return self.readall(size, num_retries=num_retries) 185 186 @_FileLikeObjectBase._before_close 187 @retry_method 188 def readlines(self, sizehint=float('inf'), num_retries=None): 189 data = [] 190 data_size = 0 191 for s in self.readall(num_retries=num_retries): 192 data.append(s) 193 data_size += len(s) 194 if data_size >= sizehint: 195 break 196 return b''.join(data).decode().splitlines(True) 197 198 def size(self): 199 raise IOError(errno.ENOSYS, "Not implemented") 200 201 def read(self, size, num_retries=None): 202 raise IOError(errno.ENOSYS, "Not implemented") 203 204 def readfrom(self, start, size, num_retries=None): 205 raise IOError(errno.ENOSYS, "Not implemented") 206 207 208class StreamFileReader(ArvadosFileReaderBase): 209 class _NameAttribute(str): 210 # The Python file API provides a plain .name attribute. 211 # Older SDK provided a name() method. 212 # This class provides both, for maximum compatibility. 213 def __call__(self): 214 return self 215 216 def __init__(self, stream, segments, name): 217 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 218 self._stream = stream 219 self.segments = segments 220 221 def stream_name(self): 222 return self._stream.name() 223 224 def size(self): 225 n = self.segments[-1] 226 return n.range_start + n.range_size 227 228 @_FileLikeObjectBase._before_close 229 @retry_method 230 def read(self, size, num_retries=None): 231 """Read up to 'size' bytes from the stream, starting at the current file position""" 232 if size == 0: 233 return b'' 234 235 data = b'' 236 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 237 if available_chunks: 238 lr = available_chunks[0] 239 data = self._stream.readfrom(lr.locator+lr.segment_offset, 240 lr.segment_size, 241 num_retries=num_retries) 242 243 self._filepos += len(data) 244 return data 245 246 @_FileLikeObjectBase._before_close 247 @retry_method 248 def readfrom(self, start, size, num_retries=None): 249 """Read up to 'size' bytes from the stream, starting at 'start'""" 250 if size == 0: 251 return b'' 252 253 data = [] 254 for lr in locators_and_ranges(self.segments, start, size): 255 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 256 num_retries=num_retries)) 257 return b''.join(data) 258 259 def as_manifest(self): 260 segs = [] 261 for r in self.segments: 262 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 263 return " ".join(normalize_stream(".", {self.name: segs})) + "\n" 264 265 266def synchronized(orig_func): 267 @functools.wraps(orig_func) 268 def synchronized_wrapper(self, *args, **kwargs): 269 with self.lock: 270 return orig_func(self, *args, **kwargs) 271 return synchronized_wrapper 272 273 274class StateChangeError(Exception): 275 def __init__(self, message, state, nextstate): 276 super(StateChangeError, self).__init__(message) 277 self.state = state 278 self.nextstate = nextstate 279 280class _BufferBlock(object): 281 """A stand-in for a Keep block that is in the process of being written. 282 283 Writers can append to it, get the size, and compute the Keep locator. 284 There are three valid states: 285 286 WRITABLE 287 Can append to block. 288 289 PENDING 290 Block is in the process of being uploaded to Keep, append is an error. 291 292 COMMITTED 293 The block has been written to Keep, its internal buffer has been 294 released, fetching the block will fetch it via keep client (since we 295 discarded the internal copy), and identifiers referring to the BufferBlock 296 can be replaced with the block locator. 297 298 """ 299 300 WRITABLE = 0 301 PENDING = 1 302 COMMITTED = 2 303 ERROR = 3 304 DELETED = 4 305 306 def __init__(self, blockid, starting_capacity, owner): 307 """ 308 :blockid: 309 the identifier for this block 310 311 :starting_capacity: 312 the initial buffer capacity 313 314 :owner: 315 ArvadosFile that owns this block 316 317 """ 318 self.blockid = blockid 319 self.buffer_block = bytearray(starting_capacity) 320 self.buffer_view = memoryview(self.buffer_block) 321 self.write_pointer = 0 322 self._state = _BufferBlock.WRITABLE 323 self._locator = None 324 self.owner = owner 325 self.lock = threading.Lock() 326 self.wait_for_commit = threading.Event() 327 self.error = None 328 329 @synchronized 330 def append(self, data): 331 """Append some data to the buffer. 332 333 Only valid if the block is in WRITABLE state. Implements an expanding 334 buffer, doubling capacity as needed to accomdate all the data. 335 336 """ 337 if self._state == _BufferBlock.WRITABLE: 338 if not isinstance(data, bytes) and not isinstance(data, memoryview): 339 data = data.encode() 340 while (self.write_pointer+len(data)) > len(self.buffer_block): 341 new_buffer_block = bytearray(len(self.buffer_block) * 2) 342 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] 343 self.buffer_block = new_buffer_block 344 self.buffer_view = memoryview(self.buffer_block) 345 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data 346 self.write_pointer += len(data) 347 self._locator = None 348 else: 349 raise AssertionError("Buffer block is not writable") 350 351 STATE_TRANSITIONS = frozenset([ 352 (WRITABLE, PENDING), 353 (PENDING, COMMITTED), 354 (PENDING, ERROR), 355 (ERROR, PENDING)]) 356 357 @synchronized 358 def set_state(self, nextstate, val=None): 359 if (self._state, nextstate) not in self.STATE_TRANSITIONS: 360 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) 361 self._state = nextstate 362 363 if self._state == _BufferBlock.PENDING: 364 self.wait_for_commit.clear() 365 366 if self._state == _BufferBlock.COMMITTED: 367 self._locator = val 368 self.buffer_view = None 369 self.buffer_block = None 370 self.wait_for_commit.set() 371 372 if self._state == _BufferBlock.ERROR: 373 self.error = val 374 self.wait_for_commit.set() 375 376 @synchronized 377 def state(self): 378 return self._state 379 380 def size(self): 381 """The amount of data written to the buffer.""" 382 return self.write_pointer 383 384 @synchronized 385 def locator(self): 386 """The Keep locator for this buffer's contents.""" 387 if self._locator is None: 388 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) 389 return self._locator 390 391 @synchronized 392 def clone(self, new_blockid, owner): 393 if self._state == _BufferBlock.COMMITTED: 394 raise AssertionError("Cannot duplicate committed buffer block") 395 bufferblock = _BufferBlock(new_blockid, self.size(), owner) 396 bufferblock.append(self.buffer_view[0:self.size()]) 397 return bufferblock 398 399 @synchronized 400 def clear(self): 401 self._state = _BufferBlock.DELETED 402 self.owner = None 403 self.buffer_block = None 404 self.buffer_view = None 405 406 @synchronized 407 def repack_writes(self): 408 """Optimize buffer block by repacking segments in file sequence. 409 410 When the client makes random writes, they appear in the buffer block in 411 the sequence they were written rather than the sequence they appear in 412 the file. This makes for inefficient, fragmented manifests. Attempt 413 to optimize by repacking writes in file sequence. 414 415 """ 416 if self._state != _BufferBlock.WRITABLE: 417 raise AssertionError("Cannot repack non-writable block") 418 419 segs = self.owner.segments() 420 421 # Collect the segments that reference the buffer block. 422 bufferblock_segs = [s for s in segs if s.locator == self.blockid] 423 424 # Collect total data referenced by segments (could be smaller than 425 # bufferblock size if a portion of the file was written and 426 # then overwritten). 427 write_total = sum([s.range_size for s in bufferblock_segs]) 428 429 if write_total < self.size() or len(bufferblock_segs) > 1: 430 # If there's more than one segment referencing this block, it is 431 # due to out-of-order writes and will produce a fragmented 432 # manifest, so try to optimize by re-packing into a new buffer. 433 contents = self.buffer_view[0:self.write_pointer].tobytes() 434 new_bb = _BufferBlock(None, write_total, None) 435 for t in bufferblock_segs: 436 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) 437 t.segment_offset = new_bb.size() - t.range_size 438 439 self.buffer_block = new_bb.buffer_block 440 self.buffer_view = new_bb.buffer_view 441 self.write_pointer = new_bb.write_pointer 442 self._locator = None 443 new_bb.clear() 444 self.owner.set_segments(segs) 445 446 def __repr__(self): 447 return "<BufferBlock %s>" % (self.blockid) 448 449 450class NoopLock(object): 451 def __enter__(self): 452 return self 453 454 def __exit__(self, exc_type, exc_value, traceback): 455 pass 456 457 def acquire(self, blocking=False): 458 pass 459 460 def release(self): 461 pass 462 463 464def must_be_writable(orig_func): 465 @functools.wraps(orig_func) 466 def must_be_writable_wrapper(self, *args, **kwargs): 467 if not self.writable(): 468 raise IOError(errno.EROFS, "Collection is read-only.") 469 return orig_func(self, *args, **kwargs) 470 return must_be_writable_wrapper 471 472 473class _BlockManager(object): 474 """BlockManager handles buffer blocks. 475 476 Also handles background block uploads, and background block prefetch for a 477 Collection of ArvadosFiles. 478 479 """ 480 481 DEFAULT_PUT_THREADS = 2 482 483 def __init__(self, keep, 484 copies=None, 485 put_threads=None, 486 num_retries=None, 487 storage_classes_func=None): 488 """keep: KeepClient object to use""" 489 self._keep = keep 490 self._bufferblocks = collections.OrderedDict() 491 self._put_queue = None 492 self._put_threads = None 493 self.lock = threading.Lock() 494 self.prefetch_lookahead = self._keep.num_prefetch_threads 495 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS 496 self.copies = copies 497 self.storage_classes = storage_classes_func or (lambda: []) 498 self._pending_write_size = 0 499 self.threads_lock = threading.Lock() 500 self.padding_block = None 501 self.num_retries = num_retries 502 503 @synchronized 504 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 505 """Allocate a new, empty bufferblock in WRITABLE state and return it. 506 507 :blockid: 508 optional block identifier, otherwise one will be automatically assigned 509 510 :starting_capacity: 511 optional capacity, otherwise will use default capacity 512 513 :owner: 514 ArvadosFile that owns this block 515 516 """ 517 return self._alloc_bufferblock(blockid, starting_capacity, owner) 518 519 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 520 if blockid is None: 521 blockid = str(uuid.uuid4()) 522 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) 523 self._bufferblocks[bufferblock.blockid] = bufferblock 524 return bufferblock 525 526 @synchronized 527 def dup_block(self, block, owner): 528 """Create a new bufferblock initialized with the content of an existing bufferblock. 529 530 :block: 531 the buffer block to copy. 532 533 :owner: 534 ArvadosFile that owns the new block 535 536 """ 537 new_blockid = str(uuid.uuid4()) 538 bufferblock = block.clone(new_blockid, owner) 539 self._bufferblocks[bufferblock.blockid] = bufferblock 540 return bufferblock 541 542 @synchronized 543 def is_bufferblock(self, locator): 544 return locator in self._bufferblocks 545 546 def _commit_bufferblock_worker(self): 547 """Background uploader thread.""" 548 549 while True: 550 try: 551 bufferblock = self._put_queue.get() 552 if bufferblock is None: 553 return 554 555 if self.copies is None: 556 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 557 else: 558 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 559 bufferblock.set_state(_BufferBlock.COMMITTED, loc) 560 except Exception as e: 561 bufferblock.set_state(_BufferBlock.ERROR, e) 562 finally: 563 if self._put_queue is not None: 564 self._put_queue.task_done() 565 566 def start_put_threads(self): 567 with self.threads_lock: 568 if self._put_threads is None: 569 # Start uploader threads. 570 571 # If we don't limit the Queue size, the upload queue can quickly 572 # grow to take up gigabytes of RAM if the writing process is 573 # generating data more quickly than it can be sent to the Keep 574 # servers. 575 # 576 # With two upload threads and a queue size of 2, this means up to 4 577 # blocks pending. If they are full 64 MiB blocks, that means up to 578 # 256 MiB of internal buffering, which is the same size as the 579 # default download block cache in KeepClient. 580 self._put_queue = queue.Queue(maxsize=2) 581 582 self._put_threads = [] 583 for i in range(0, self.num_put_threads): 584 thread = threading.Thread(target=self._commit_bufferblock_worker) 585 self._put_threads.append(thread) 586 thread.daemon = True 587 thread.start() 588 589 @synchronized 590 def stop_threads(self): 591 """Shut down and wait for background upload and download threads to finish.""" 592 593 if self._put_threads is not None: 594 for t in self._put_threads: 595 self._put_queue.put(None) 596 for t in self._put_threads: 597 t.join() 598 self._put_threads = None 599 self._put_queue = None 600 601 def __enter__(self): 602 return self 603 604 def __exit__(self, exc_type, exc_value, traceback): 605 self.stop_threads() 606 607 @synchronized 608 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): 609 """Packs small blocks together before uploading""" 610 611 self._pending_write_size += closed_file_size 612 613 # Check if there are enough small blocks for filling up one in full 614 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): 615 return 616 617 # Search blocks ready for getting packed together before being 618 # committed to Keep. 619 # A WRITABLE block always has an owner. 620 # A WRITABLE block with its owner.closed() implies that its 621 # size is <= KEEP_BLOCK_SIZE/2. 622 try: 623 small_blocks = [b for b in listvalues(self._bufferblocks) 624 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] 625 except AttributeError: 626 # Writable blocks without owner shouldn't exist. 627 raise UnownedBlockError() 628 629 if len(small_blocks) <= 1: 630 # Not enough small blocks for repacking 631 return 632 633 for bb in small_blocks: 634 bb.repack_writes() 635 636 # Update the pending write size count with its true value, just in case 637 # some small file was opened, written and closed several times. 638 self._pending_write_size = sum([b.size() for b in small_blocks]) 639 640 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: 641 return 642 643 new_bb = self._alloc_bufferblock() 644 new_bb.owner = [] 645 files = [] 646 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: 647 bb = small_blocks.pop(0) 648 new_bb.owner.append(bb.owner) 649 self._pending_write_size -= bb.size() 650 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) 651 files.append((bb, new_bb.write_pointer - bb.size())) 652 653 self.commit_bufferblock(new_bb, sync=sync) 654 655 for bb, new_bb_segment_offset in files: 656 newsegs = bb.owner.segments() 657 for s in newsegs: 658 if s.locator == bb.blockid: 659 s.locator = new_bb.blockid 660 s.segment_offset = new_bb_segment_offset+s.segment_offset 661 bb.owner.set_segments(newsegs) 662 self._delete_bufferblock(bb.blockid) 663 664 def commit_bufferblock(self, block, sync): 665 """Initiate a background upload of a bufferblock. 666 667 :block: 668 The block object to upload 669 670 :sync: 671 If `sync` is True, upload the block synchronously. 672 If `sync` is False, upload the block asynchronously. This will 673 return immediately unless the upload queue is at capacity, in 674 which case it will wait on an upload queue slot. 675 676 """ 677 try: 678 # Mark the block as PENDING so to disallow any more appends. 679 block.set_state(_BufferBlock.PENDING) 680 except StateChangeError as e: 681 if e.state == _BufferBlock.PENDING: 682 if sync: 683 block.wait_for_commit.wait() 684 else: 685 return 686 if block.state() == _BufferBlock.COMMITTED: 687 return 688 elif block.state() == _BufferBlock.ERROR: 689 raise block.error 690 else: 691 raise 692 693 if sync: 694 try: 695 if self.copies is None: 696 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 697 else: 698 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 699 block.set_state(_BufferBlock.COMMITTED, loc) 700 except Exception as e: 701 block.set_state(_BufferBlock.ERROR, e) 702 raise 703 else: 704 self.start_put_threads() 705 self._put_queue.put(block) 706 707 @synchronized 708 def get_bufferblock(self, locator): 709 return self._bufferblocks.get(locator) 710 711 @synchronized 712 def get_padding_block(self): 713 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding 714 when using truncate() to extend the size of a file. 715 716 For reference (and possible future optimization), the md5sum of the 717 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 718 719 """ 720 721 if self.padding_block is None: 722 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) 723 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE 724 self.commit_bufferblock(self.padding_block, False) 725 return self.padding_block 726 727 @synchronized 728 def delete_bufferblock(self, locator): 729 self._delete_bufferblock(locator) 730 731 def _delete_bufferblock(self, locator): 732 if locator in self._bufferblocks: 733 bb = self._bufferblocks[locator] 734 bb.clear() 735 del self._bufferblocks[locator] 736 737 def get_block_contents(self, locator, num_retries, cache_only=False): 738 """Fetch a block. 739 740 First checks to see if the locator is a BufferBlock and return that, if 741 not, passes the request through to KeepClient.get(). 742 743 """ 744 with self.lock: 745 if locator in self._bufferblocks: 746 bufferblock = self._bufferblocks[locator] 747 if bufferblock.state() != _BufferBlock.COMMITTED: 748 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() 749 else: 750 locator = bufferblock._locator 751 if cache_only: 752 return self._keep.get_from_cache(locator) 753 else: 754 return self._keep.get(locator, num_retries=num_retries) 755 756 def commit_all(self): 757 """Commit all outstanding buffer blocks. 758 759 This is a synchronous call, and will not return until all buffer blocks 760 are uploaded. Raises KeepWriteError() if any blocks failed to upload. 761 762 """ 763 self.repack_small_blocks(force=True, sync=True) 764 765 with self.lock: 766 items = listitems(self._bufferblocks) 767 768 for k,v in items: 769 if v.state() != _BufferBlock.COMMITTED and v.owner: 770 # Ignore blocks with a list of owners, as if they're not in COMMITTED 771 # state, they're already being committed asynchronously. 772 if isinstance(v.owner, ArvadosFile): 773 v.owner.flush(sync=False) 774 775 with self.lock: 776 if self._put_queue is not None: 777 self._put_queue.join() 778 779 err = [] 780 for k,v in items: 781 if v.state() == _BufferBlock.ERROR: 782 err.append((v.locator(), v.error)) 783 if err: 784 raise KeepWriteError("Error writing some blocks", err, label="block") 785 786 for k,v in items: 787 # flush again with sync=True to remove committed bufferblocks from 788 # the segments. 789 if v.owner: 790 if isinstance(v.owner, ArvadosFile): 791 v.owner.flush(sync=True) 792 elif isinstance(v.owner, list) and len(v.owner) > 0: 793 # This bufferblock is referenced by many files as a result 794 # of repacking small blocks, so don't delete it when flushing 795 # its owners, just do it after flushing them all. 796 for owner in v.owner: 797 owner.flush(sync=True) 798 self.delete_bufferblock(k) 799 800 self.stop_threads() 801 802 def block_prefetch(self, locator): 803 """Initiate a background download of a block. 804 """ 805 806 if not self.prefetch_lookahead: 807 return 808 809 with self.lock: 810 if locator in self._bufferblocks: 811 return 812 813 self._keep.block_prefetch(locator) 814 815 816class ArvadosFile(object): 817 """Represent a file in a Collection. 818 819 ArvadosFile manages the underlying representation of a file in Keep as a 820 sequence of segments spanning a set of blocks, and implements random 821 read/write access. 822 823 This object may be accessed from multiple threads. 824 825 """ 826 827 __slots__ = ('parent', 'name', '_writers', '_committed', 828 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 829 830 def __init__(self, parent, name, stream=[], segments=[]): 831 """ 832 ArvadosFile constructor. 833 834 :stream: 835 a list of Range objects representing a block stream 836 837 :segments: 838 a list of Range objects representing segments 839 """ 840 self.parent = parent 841 self.name = name 842 self._writers = set() 843 self._committed = False 844 self._segments = [] 845 self.lock = parent.root_collection().lock 846 for s in segments: 847 self._add_segment(stream, s.locator, s.range_size) 848 self._current_bblock = None 849 self._read_counter = 0 850 851 def writable(self): 852 return self.parent.writable() 853 854 @synchronized 855 def permission_expired(self, as_of_dt=None): 856 """Returns True if any of the segment's locators is expired""" 857 for r in self._segments: 858 if KeepLocator(r.locator).permission_expired(as_of_dt): 859 return True 860 return False 861 862 @synchronized 863 def has_remote_blocks(self): 864 """Returns True if any of the segment's locators has a +R signature""" 865 866 for s in self._segments: 867 if '+R' in s.locator: 868 return True 869 return False 870 871 @synchronized 872 def _copy_remote_blocks(self, remote_blocks={}): 873 """Ask Keep to copy remote blocks and point to their local copies. 874 875 This is called from the parent Collection. 876 877 :remote_blocks: 878 Shared cache of remote to local block mappings. This is used to avoid 879 doing extra work when blocks are shared by more than one file in 880 different subdirectories. 881 """ 882 883 for s in self._segments: 884 if '+R' in s.locator: 885 try: 886 loc = remote_blocks[s.locator] 887 except KeyError: 888 loc = self.parent._my_keep().refresh_signature(s.locator) 889 remote_blocks[s.locator] = loc 890 s.locator = loc 891 self.parent.set_committed(False) 892 return remote_blocks 893 894 @synchronized 895 def segments(self): 896 return copy.copy(self._segments) 897 898 @synchronized 899 def clone(self, new_parent, new_name): 900 """Make a copy of this file.""" 901 cp = ArvadosFile(new_parent, new_name) 902 cp.replace_contents(self) 903 return cp 904 905 @must_be_writable 906 @synchronized 907 def replace_contents(self, other): 908 """Replace segments of this file with segments from another `ArvadosFile` object.""" 909 910 map_loc = {} 911 self._segments = [] 912 for other_segment in other.segments(): 913 new_loc = other_segment.locator 914 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 915 if other_segment.locator not in map_loc: 916 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 917 if bufferblock.state() != _BufferBlock.WRITABLE: 918 map_loc[other_segment.locator] = bufferblock.locator() 919 else: 920 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 921 new_loc = map_loc[other_segment.locator] 922 923 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 924 925 self.set_committed(False) 926 927 def __eq__(self, other): 928 if other is self: 929 return True 930 if not isinstance(other, ArvadosFile): 931 return False 932 933 othersegs = other.segments() 934 with self.lock: 935 if len(self._segments) != len(othersegs): 936 return False 937 for i in range(0, len(othersegs)): 938 seg1 = self._segments[i] 939 seg2 = othersegs[i] 940 loc1 = seg1.locator 941 loc2 = seg2.locator 942 943 if self.parent._my_block_manager().is_bufferblock(loc1): 944 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 945 946 if other.parent._my_block_manager().is_bufferblock(loc2): 947 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 948 949 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 950 seg1.range_start != seg2.range_start or 951 seg1.range_size != seg2.range_size or 952 seg1.segment_offset != seg2.segment_offset): 953 return False 954 955 return True 956 957 def __ne__(self, other): 958 return not self.__eq__(other) 959 960 @synchronized 961 def set_segments(self, segs): 962 self._segments = segs 963 964 @synchronized 965 def set_committed(self, value=True): 966 """Set committed flag. 967 968 If value is True, set committed to be True. 969 970 If value is False, set committed to be False for this and all parents. 971 """ 972 if value == self._committed: 973 return 974 self._committed = value 975 if self._committed is False and self.parent is not None: 976 self.parent.set_committed(False) 977 978 @synchronized 979 def committed(self): 980 """Get whether this is committed or not.""" 981 return self._committed 982 983 @synchronized 984 def add_writer(self, writer): 985 """Add an ArvadosFileWriter reference to the list of writers""" 986 if isinstance(writer, ArvadosFileWriter): 987 self._writers.add(writer) 988 989 @synchronized 990 def remove_writer(self, writer, flush): 991 """ 992 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 993 and do some block maintenance tasks. 994 """ 995 self._writers.remove(writer) 996 997 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 998 # File writer closed, not small enough for repacking 999 self.flush() 1000 elif self.closed(): 1001 # All writers closed and size is adequate for repacking 1002 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 1003 1004 def closed(self): 1005 """ 1006 Get whether this is closed or not. When the writers list is empty, the file 1007 is supposed to be closed. 1008 """ 1009 return len(self._writers) == 0 1010 1011 @must_be_writable 1012 @synchronized 1013 def truncate(self, size): 1014 """Shrink or expand the size of the file. 1015 1016 If `size` is less than the size of the file, the file contents after 1017 `size` will be discarded. If `size` is greater than the current size 1018 of the file, it will be filled with zero bytes. 1019 1020 """ 1021 if size < self.size(): 1022 new_segs = [] 1023 for r in self._segments: 1024 range_end = r.range_start+r.range_size 1025 if r.range_start >= size: 1026 # segment is past the trucate size, all done 1027 break 1028 elif size < range_end: 1029 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1030 nr.segment_offset = r.segment_offset 1031 new_segs.append(nr) 1032 break 1033 else: 1034 new_segs.append(r) 1035 1036 self._segments = new_segs 1037 self.set_committed(False) 1038 elif size > self.size(): 1039 padding = self.parent._my_block_manager().get_padding_block() 1040 diff = size - self.size() 1041 while diff > config.KEEP_BLOCK_SIZE: 1042 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1043 diff -= config.KEEP_BLOCK_SIZE 1044 if diff > 0: 1045 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1046 self.set_committed(False) 1047 else: 1048 # size == self.size() 1049 pass 1050 1051 def readfrom(self, offset, size, num_retries, exact=False): 1052 """Read up to `size` bytes from the file starting at `offset`. 1053 1054 :exact: 1055 If False (default), return less data than requested if the read 1056 crosses a block boundary and the next block isn't cached. If True, 1057 only return less data than requested when hitting EOF. 1058 """ 1059 1060 with self.lock: 1061 if size == 0 or offset >= self.size(): 1062 return b'' 1063 readsegs = locators_and_ranges(self._segments, offset, size) 1064 1065 prefetch = None 1066 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1067 if prefetch_lookahead: 1068 # Doing prefetch on every read() call is surprisingly expensive 1069 # when we're trying to deliver data at 600+ MiBps and want 1070 # the read() fast path to be as lightweight as possible. 1071 # 1072 # Only prefetching every 128 read operations 1073 # dramatically reduces the overhead while still 1074 # getting the benefit of prefetching (e.g. when 1075 # reading 128 KiB at a time, it checks for prefetch 1076 # every 16 MiB). 1077 self._read_counter = (self._read_counter+1) % 128 1078 if self._read_counter == 1: 1079 prefetch = locators_and_ranges(self._segments, 1080 offset + size, 1081 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1082 limit=(1+prefetch_lookahead)) 1083 1084 locs = set() 1085 data = [] 1086 for lr in readsegs: 1087 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1088 if block: 1089 blockview = memoryview(block) 1090 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1091 locs.add(lr.locator) 1092 else: 1093 break 1094 1095 if prefetch: 1096 for lr in prefetch: 1097 if lr.locator not in locs: 1098 self.parent._my_block_manager().block_prefetch(lr.locator) 1099 locs.add(lr.locator) 1100 1101 if len(data) == 1: 1102 return data[0] 1103 else: 1104 return b''.join(data) 1105 1106 @must_be_writable 1107 @synchronized 1108 def writeto(self, offset, data, num_retries): 1109 """Write `data` to the file starting at `offset`. 1110 1111 This will update existing bytes and/or extend the size of the file as 1112 necessary. 1113 1114 """ 1115 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1116 data = data.encode() 1117 if len(data) == 0: 1118 return 1119 1120 if offset > self.size(): 1121 self.truncate(offset) 1122 1123 if len(data) > config.KEEP_BLOCK_SIZE: 1124 # Chunk it up into smaller writes 1125 n = 0 1126 dataview = memoryview(data) 1127 while n < len(data): 1128 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1129 n += config.KEEP_BLOCK_SIZE 1130 return 1131 1132 self.set_committed(False) 1133 1134 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1135 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1136 1137 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1138 self._current_bblock.repack_writes() 1139 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1140 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1141 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1142 1143 self._current_bblock.append(data) 1144 1145 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1146 1147 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1148 1149 return len(data) 1150 1151 @synchronized 1152 def flush(self, sync=True, num_retries=0): 1153 """Flush the current bufferblock to Keep. 1154 1155 :sync: 1156 If True, commit block synchronously, wait until buffer block has been written. 1157 If False, commit block asynchronously, return immediately after putting block into 1158 the keep put queue. 1159 """ 1160 if self.committed(): 1161 return 1162 1163 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1164 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1165 self._current_bblock.repack_writes() 1166 if self._current_bblock.state() != _BufferBlock.DELETED: 1167 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1168 1169 if sync: 1170 to_delete = set() 1171 for s in self._segments: 1172 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1173 if bb: 1174 if bb.state() != _BufferBlock.COMMITTED: 1175 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1176 to_delete.add(s.locator) 1177 s.locator = bb.locator() 1178 for s in to_delete: 1179 # Don't delete the bufferblock if it's owned by many files. It'll be 1180 # deleted after all of its owners are flush()ed. 1181 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1182 self.parent._my_block_manager().delete_bufferblock(s) 1183 1184 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1185 1186 @must_be_writable 1187 @synchronized 1188 def add_segment(self, blocks, pos, size): 1189 """Add a segment to the end of the file. 1190 1191 `pos` and `offset` reference a section of the stream described by 1192 `blocks` (a list of Range objects) 1193 1194 """ 1195 self._add_segment(blocks, pos, size) 1196 1197 def _add_segment(self, blocks, pos, size): 1198 """Internal implementation of add_segment.""" 1199 self.set_committed(False) 1200 for lr in locators_and_ranges(blocks, pos, size): 1201 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1202 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1203 self._segments.append(r) 1204 1205 @synchronized 1206 def size(self): 1207 """Get the file size.""" 1208 if self._segments: 1209 n = self._segments[-1] 1210 return n.range_start + n.range_size 1211 else: 1212 return 0 1213 1214 @synchronized 1215 def manifest_text(self, stream_name=".", portable_locators=False, 1216 normalize=False, only_committed=False): 1217 buf = "" 1218 filestream = [] 1219 for segment in self._segments: 1220 loc = segment.locator 1221 if self.parent._my_block_manager().is_bufferblock(loc): 1222 if only_committed: 1223 continue 1224 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1225 if portable_locators: 1226 loc = KeepLocator(loc).stripped() 1227 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1228 segment.segment_offset, segment.range_size)) 1229 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1230 buf += "\n" 1231 return buf 1232 1233 @must_be_writable 1234 @synchronized 1235 def _reparent(self, newparent, newname): 1236 self.set_committed(False) 1237 self.flush(sync=True) 1238 self.parent.remove(self.name) 1239 self.parent = newparent 1240 self.name = newname 1241 self.lock = self.parent.root_collection().lock 1242 1243 1244class ArvadosFileReader(ArvadosFileReaderBase): 1245 """Wraps ArvadosFile in a file-like object supporting reading only. 1246 1247 Be aware that this class is NOT thread safe as there is no locking around 1248 updating file pointer. 1249 1250 """ 1251 1252 def __init__(self, arvadosfile, mode="r", num_retries=None): 1253 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1254 self.arvadosfile = arvadosfile 1255 1256 def size(self): 1257 return self.arvadosfile.size() 1258 1259 def stream_name(self): 1260 return self.arvadosfile.parent.stream_name() 1261 1262 def readinto(self, b): 1263 data = self.read(len(b)) 1264 b[:len(data)] = data 1265 return len(data) 1266 1267 @_FileLikeObjectBase._before_close 1268 @retry_method 1269 def read(self, size=None, num_retries=None): 1270 """Read up to `size` bytes from the file and return the result. 1271 1272 Starts at the current file position. If `size` is None, read the 1273 entire remainder of the file. 1274 """ 1275 if size is None: 1276 data = [] 1277 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1278 while rd: 1279 data.append(rd) 1280 self._filepos += len(rd) 1281 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1282 return b''.join(data) 1283 else: 1284 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1285 self._filepos += len(data) 1286 return data 1287 1288 @_FileLikeObjectBase._before_close 1289 @retry_method 1290 def readfrom(self, offset, size, num_retries=None): 1291 """Read up to `size` bytes from the stream, starting at the specified file offset. 1292 1293 This method does not change the file position. 1294 """ 1295 return self.arvadosfile.readfrom(offset, size, num_retries) 1296 1297 def flush(self): 1298 pass 1299 1300 1301class ArvadosFileWriter(ArvadosFileReader): 1302 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1303 1304 Be aware that this class is NOT thread safe as there is no locking around 1305 updating file pointer. 1306 1307 """ 1308 1309 def __init__(self, arvadosfile, mode, num_retries=None): 1310 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1311 self.arvadosfile.add_writer(self) 1312 1313 def writable(self): 1314 return True 1315 1316 @_FileLikeObjectBase._before_close 1317 @retry_method 1318 def write(self, data, num_retries=None): 1319 if self.mode[0] == "a": 1320 self._filepos = self.size() 1321 self.arvadosfile.writeto(self._filepos, data, num_retries) 1322 self._filepos += len(data) 1323 return len(data) 1324 1325 @_FileLikeObjectBase._before_close 1326 @retry_method 1327 def writelines(self, seq, num_retries=None): 1328 for s in seq: 1329 self.write(s, num_retries=num_retries) 1330 1331 @_FileLikeObjectBase._before_close 1332 def truncate(self, size=None): 1333 if size is None: 1334 size = self._filepos 1335 self.arvadosfile.truncate(size) 1336 1337 @_FileLikeObjectBase._before_close 1338 def flush(self): 1339 self.arvadosfile.flush() 1340 1341 def close(self, flush=True): 1342 if not self.closed: 1343 self.arvadosfile.remove_writer(self, flush) 1344 super(ArvadosFileWriter, self).close() 1345 1346 1347class WrappableFile(object): 1348 """An interface to an Arvados file that's compatible with io wrappers. 1349 1350 """ 1351 def __init__(self, f): 1352 self.f = f 1353 self.closed = False 1354 def close(self): 1355 self.closed = True 1356 return self.f.close() 1357 def flush(self): 1358 return self.f.flush() 1359 def read(self, *args, **kwargs): 1360 return self.f.read(*args, **kwargs) 1361 def readable(self): 1362 return self.f.readable() 1363 def readinto(self, *args, **kwargs): 1364 return self.f.readinto(*args, **kwargs) 1365 def seek(self, *args, **kwargs): 1366 return self.f.seek(*args, **kwargs) 1367 def seekable(self): 1368 return self.f.seekable() 1369 def tell(self): 1370 return self.f.tell() 1371 def writable(self): 1372 return self.f.writable() 1373 def write(self, *args, **kwargs): 1374 return self.f.write(*args, **kwargs)
40def split(path): 41 """split(path) -> streamname, filename 42 43 Separate the stream name and file name in a /-separated stream path and 44 return a tuple (stream_name, file_name). If no stream name is available, 45 assume '.'. 46 47 """ 48 try: 49 stream_name, file_name = path.rsplit('/', 1) 50 except ValueError: # No / in string 51 stream_name, file_name = '.', path 52 return stream_name, file_name
split(path) -> streamname, filename
Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.
55class UnownedBlockError(Exception): 56 """Raised when there's an writable block without an owner on the BlockManager.""" 57 pass
Raised when there’s an writable block without an owner on the BlockManager.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
89class ArvadosFileReaderBase(_FileLikeObjectBase): 90 def __init__(self, name, mode, num_retries=None): 91 super(ArvadosFileReaderBase, self).__init__(name, mode) 92 self._filepos = 0 93 self.num_retries = num_retries 94 self._readline_cache = (None, None) 95 96 def __iter__(self): 97 while True: 98 data = self.readline() 99 if not data: 100 break 101 yield data 102 103 def decompressed_name(self): 104 return re.sub(r'\.(bz2|gz)$', '', self.name) 105 106 @_FileLikeObjectBase._before_close 107 def seek(self, pos, whence=os.SEEK_SET): 108 if whence == os.SEEK_CUR: 109 pos += self._filepos 110 elif whence == os.SEEK_END: 111 pos += self.size() 112 if pos < 0: 113 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 114 self._filepos = pos 115 return self._filepos 116 117 def tell(self): 118 return self._filepos 119 120 def readable(self): 121 return True 122 123 def writable(self): 124 return False 125 126 def seekable(self): 127 return True 128 129 @_FileLikeObjectBase._before_close 130 @retry_method 131 def readall(self, size=2**20, num_retries=None): 132 while True: 133 data = self.read(size, num_retries=num_retries) 134 if len(data) == 0: 135 break 136 yield data 137 138 @_FileLikeObjectBase._before_close 139 @retry_method 140 def readline(self, size=float('inf'), num_retries=None): 141 cache_pos, cache_data = self._readline_cache 142 if self.tell() == cache_pos: 143 data = [cache_data] 144 self._filepos += len(cache_data) 145 else: 146 data = [b''] 147 data_size = len(data[-1]) 148 while (data_size < size) and (b'\n' not in data[-1]): 149 next_read = self.read(2 ** 20, num_retries=num_retries) 150 if not next_read: 151 break 152 data.append(next_read) 153 data_size += len(next_read) 154 data = b''.join(data) 155 try: 156 nextline_index = data.index(b'\n') + 1 157 except ValueError: 158 nextline_index = len(data) 159 nextline_index = min(nextline_index, size) 160 self._filepos -= len(data) - nextline_index 161 self._readline_cache = (self.tell(), data[nextline_index:]) 162 return data[:nextline_index].decode() 163 164 @_FileLikeObjectBase._before_close 165 @retry_method 166 def decompress(self, decompress, size, num_retries=None): 167 for segment in self.readall(size, num_retries=num_retries): 168 data = decompress(segment) 169 if data: 170 yield data 171 172 @_FileLikeObjectBase._before_close 173 @retry_method 174 def readall_decompressed(self, size=2**20, num_retries=None): 175 self.seek(0) 176 if self.name.endswith('.bz2'): 177 dc = bz2.BZ2Decompressor() 178 return self.decompress(dc.decompress, size, 179 num_retries=num_retries) 180 elif self.name.endswith('.gz'): 181 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 182 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 183 size, num_retries=num_retries) 184 else: 185 return self.readall(size, num_retries=num_retries) 186 187 @_FileLikeObjectBase._before_close 188 @retry_method 189 def readlines(self, sizehint=float('inf'), num_retries=None): 190 data = [] 191 data_size = 0 192 for s in self.readall(num_retries=num_retries): 193 data.append(s) 194 data_size += len(s) 195 if data_size >= sizehint: 196 break 197 return b''.join(data).decode().splitlines(True) 198 199 def size(self): 200 raise IOError(errno.ENOSYS, "Not implemented") 201 202 def read(self, size, num_retries=None): 203 raise IOError(errno.ENOSYS, "Not implemented") 204 205 def readfrom(self, start, size, num_retries=None): 206 raise IOError(errno.ENOSYS, "Not implemented")
106 @_FileLikeObjectBase._before_close 107 def seek(self, pos, whence=os.SEEK_SET): 108 if whence == os.SEEK_CUR: 109 pos += self._filepos 110 elif whence == os.SEEK_END: 111 pos += self.size() 112 if pos < 0: 113 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 114 self._filepos = pos 115 return self._filepos
138 @_FileLikeObjectBase._before_close 139 @retry_method 140 def readline(self, size=float('inf'), num_retries=None): 141 cache_pos, cache_data = self._readline_cache 142 if self.tell() == cache_pos: 143 data = [cache_data] 144 self._filepos += len(cache_data) 145 else: 146 data = [b''] 147 data_size = len(data[-1]) 148 while (data_size < size) and (b'\n' not in data[-1]): 149 next_read = self.read(2 ** 20, num_retries=num_retries) 150 if not next_read: 151 break 152 data.append(next_read) 153 data_size += len(next_read) 154 data = b''.join(data) 155 try: 156 nextline_index = data.index(b'\n') + 1 157 except ValueError: 158 nextline_index = len(data) 159 nextline_index = min(nextline_index, size) 160 self._filepos -= len(data) - nextline_index 161 self._readline_cache = (self.tell(), data[nextline_index:]) 162 return data[:nextline_index].decode()
172 @_FileLikeObjectBase._before_close 173 @retry_method 174 def readall_decompressed(self, size=2**20, num_retries=None): 175 self.seek(0) 176 if self.name.endswith('.bz2'): 177 dc = bz2.BZ2Decompressor() 178 return self.decompress(dc.decompress, size, 179 num_retries=num_retries) 180 elif self.name.endswith('.gz'): 181 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 182 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 183 size, num_retries=num_retries) 184 else: 185 return self.readall(size, num_retries=num_retries)
187 @_FileLikeObjectBase._before_close 188 @retry_method 189 def readlines(self, sizehint=float('inf'), num_retries=None): 190 data = [] 191 data_size = 0 192 for s in self.readall(num_retries=num_retries): 193 data.append(s) 194 data_size += len(s) 195 if data_size >= sizehint: 196 break 197 return b''.join(data).decode().splitlines(True)
Inherited Members
209class StreamFileReader(ArvadosFileReaderBase): 210 class _NameAttribute(str): 211 # The Python file API provides a plain .name attribute. 212 # Older SDK provided a name() method. 213 # This class provides both, for maximum compatibility. 214 def __call__(self): 215 return self 216 217 def __init__(self, stream, segments, name): 218 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 219 self._stream = stream 220 self.segments = segments 221 222 def stream_name(self): 223 return self._stream.name() 224 225 def size(self): 226 n = self.segments[-1] 227 return n.range_start + n.range_size 228 229 @_FileLikeObjectBase._before_close 230 @retry_method 231 def read(self, size, num_retries=None): 232 """Read up to 'size' bytes from the stream, starting at the current file position""" 233 if size == 0: 234 return b'' 235 236 data = b'' 237 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 238 if available_chunks: 239 lr = available_chunks[0] 240 data = self._stream.readfrom(lr.locator+lr.segment_offset, 241 lr.segment_size, 242 num_retries=num_retries) 243 244 self._filepos += len(data) 245 return data 246 247 @_FileLikeObjectBase._before_close 248 @retry_method 249 def readfrom(self, start, size, num_retries=None): 250 """Read up to 'size' bytes from the stream, starting at 'start'""" 251 if size == 0: 252 return b'' 253 254 data = [] 255 for lr in locators_and_ranges(self.segments, start, size): 256 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 257 num_retries=num_retries)) 258 return b''.join(data) 259 260 def as_manifest(self): 261 segs = [] 262 for r in self.segments: 263 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 264 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
229 @_FileLikeObjectBase._before_close 230 @retry_method 231 def read(self, size, num_retries=None): 232 """Read up to 'size' bytes from the stream, starting at the current file position""" 233 if size == 0: 234 return b'' 235 236 data = b'' 237 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 238 if available_chunks: 239 lr = available_chunks[0] 240 data = self._stream.readfrom(lr.locator+lr.segment_offset, 241 lr.segment_size, 242 num_retries=num_retries) 243 244 self._filepos += len(data) 245 return data
Read up to ‘size’ bytes from the stream, starting at the current file position
247 @_FileLikeObjectBase._before_close 248 @retry_method 249 def readfrom(self, start, size, num_retries=None): 250 """Read up to 'size' bytes from the stream, starting at 'start'""" 251 if size == 0: 252 return b'' 253 254 data = [] 255 for lr in locators_and_ranges(self.segments, start, size): 256 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 257 num_retries=num_retries)) 258 return b''.join(data)
Read up to ‘size’ bytes from the stream, starting at ‘start’
275class StateChangeError(Exception): 276 def __init__(self, message, state, nextstate): 277 super(StateChangeError, self).__init__(message) 278 self.state = state 279 self.nextstate = nextstate
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
817class ArvadosFile(object): 818 """Represent a file in a Collection. 819 820 ArvadosFile manages the underlying representation of a file in Keep as a 821 sequence of segments spanning a set of blocks, and implements random 822 read/write access. 823 824 This object may be accessed from multiple threads. 825 826 """ 827 828 __slots__ = ('parent', 'name', '_writers', '_committed', 829 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 830 831 def __init__(self, parent, name, stream=[], segments=[]): 832 """ 833 ArvadosFile constructor. 834 835 :stream: 836 a list of Range objects representing a block stream 837 838 :segments: 839 a list of Range objects representing segments 840 """ 841 self.parent = parent 842 self.name = name 843 self._writers = set() 844 self._committed = False 845 self._segments = [] 846 self.lock = parent.root_collection().lock 847 for s in segments: 848 self._add_segment(stream, s.locator, s.range_size) 849 self._current_bblock = None 850 self._read_counter = 0 851 852 def writable(self): 853 return self.parent.writable() 854 855 @synchronized 856 def permission_expired(self, as_of_dt=None): 857 """Returns True if any of the segment's locators is expired""" 858 for r in self._segments: 859 if KeepLocator(r.locator).permission_expired(as_of_dt): 860 return True 861 return False 862 863 @synchronized 864 def has_remote_blocks(self): 865 """Returns True if any of the segment's locators has a +R signature""" 866 867 for s in self._segments: 868 if '+R' in s.locator: 869 return True 870 return False 871 872 @synchronized 873 def _copy_remote_blocks(self, remote_blocks={}): 874 """Ask Keep to copy remote blocks and point to their local copies. 875 876 This is called from the parent Collection. 877 878 :remote_blocks: 879 Shared cache of remote to local block mappings. This is used to avoid 880 doing extra work when blocks are shared by more than one file in 881 different subdirectories. 882 """ 883 884 for s in self._segments: 885 if '+R' in s.locator: 886 try: 887 loc = remote_blocks[s.locator] 888 except KeyError: 889 loc = self.parent._my_keep().refresh_signature(s.locator) 890 remote_blocks[s.locator] = loc 891 s.locator = loc 892 self.parent.set_committed(False) 893 return remote_blocks 894 895 @synchronized 896 def segments(self): 897 return copy.copy(self._segments) 898 899 @synchronized 900 def clone(self, new_parent, new_name): 901 """Make a copy of this file.""" 902 cp = ArvadosFile(new_parent, new_name) 903 cp.replace_contents(self) 904 return cp 905 906 @must_be_writable 907 @synchronized 908 def replace_contents(self, other): 909 """Replace segments of this file with segments from another `ArvadosFile` object.""" 910 911 map_loc = {} 912 self._segments = [] 913 for other_segment in other.segments(): 914 new_loc = other_segment.locator 915 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 916 if other_segment.locator not in map_loc: 917 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 918 if bufferblock.state() != _BufferBlock.WRITABLE: 919 map_loc[other_segment.locator] = bufferblock.locator() 920 else: 921 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 922 new_loc = map_loc[other_segment.locator] 923 924 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 925 926 self.set_committed(False) 927 928 def __eq__(self, other): 929 if other is self: 930 return True 931 if not isinstance(other, ArvadosFile): 932 return False 933 934 othersegs = other.segments() 935 with self.lock: 936 if len(self._segments) != len(othersegs): 937 return False 938 for i in range(0, len(othersegs)): 939 seg1 = self._segments[i] 940 seg2 = othersegs[i] 941 loc1 = seg1.locator 942 loc2 = seg2.locator 943 944 if self.parent._my_block_manager().is_bufferblock(loc1): 945 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 946 947 if other.parent._my_block_manager().is_bufferblock(loc2): 948 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 949 950 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 951 seg1.range_start != seg2.range_start or 952 seg1.range_size != seg2.range_size or 953 seg1.segment_offset != seg2.segment_offset): 954 return False 955 956 return True 957 958 def __ne__(self, other): 959 return not self.__eq__(other) 960 961 @synchronized 962 def set_segments(self, segs): 963 self._segments = segs 964 965 @synchronized 966 def set_committed(self, value=True): 967 """Set committed flag. 968 969 If value is True, set committed to be True. 970 971 If value is False, set committed to be False for this and all parents. 972 """ 973 if value == self._committed: 974 return 975 self._committed = value 976 if self._committed is False and self.parent is not None: 977 self.parent.set_committed(False) 978 979 @synchronized 980 def committed(self): 981 """Get whether this is committed or not.""" 982 return self._committed 983 984 @synchronized 985 def add_writer(self, writer): 986 """Add an ArvadosFileWriter reference to the list of writers""" 987 if isinstance(writer, ArvadosFileWriter): 988 self._writers.add(writer) 989 990 @synchronized 991 def remove_writer(self, writer, flush): 992 """ 993 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 994 and do some block maintenance tasks. 995 """ 996 self._writers.remove(writer) 997 998 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 999 # File writer closed, not small enough for repacking 1000 self.flush() 1001 elif self.closed(): 1002 # All writers closed and size is adequate for repacking 1003 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 1004 1005 def closed(self): 1006 """ 1007 Get whether this is closed or not. When the writers list is empty, the file 1008 is supposed to be closed. 1009 """ 1010 return len(self._writers) == 0 1011 1012 @must_be_writable 1013 @synchronized 1014 def truncate(self, size): 1015 """Shrink or expand the size of the file. 1016 1017 If `size` is less than the size of the file, the file contents after 1018 `size` will be discarded. If `size` is greater than the current size 1019 of the file, it will be filled with zero bytes. 1020 1021 """ 1022 if size < self.size(): 1023 new_segs = [] 1024 for r in self._segments: 1025 range_end = r.range_start+r.range_size 1026 if r.range_start >= size: 1027 # segment is past the trucate size, all done 1028 break 1029 elif size < range_end: 1030 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1031 nr.segment_offset = r.segment_offset 1032 new_segs.append(nr) 1033 break 1034 else: 1035 new_segs.append(r) 1036 1037 self._segments = new_segs 1038 self.set_committed(False) 1039 elif size > self.size(): 1040 padding = self.parent._my_block_manager().get_padding_block() 1041 diff = size - self.size() 1042 while diff > config.KEEP_BLOCK_SIZE: 1043 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1044 diff -= config.KEEP_BLOCK_SIZE 1045 if diff > 0: 1046 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1047 self.set_committed(False) 1048 else: 1049 # size == self.size() 1050 pass 1051 1052 def readfrom(self, offset, size, num_retries, exact=False): 1053 """Read up to `size` bytes from the file starting at `offset`. 1054 1055 :exact: 1056 If False (default), return less data than requested if the read 1057 crosses a block boundary and the next block isn't cached. If True, 1058 only return less data than requested when hitting EOF. 1059 """ 1060 1061 with self.lock: 1062 if size == 0 or offset >= self.size(): 1063 return b'' 1064 readsegs = locators_and_ranges(self._segments, offset, size) 1065 1066 prefetch = None 1067 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1068 if prefetch_lookahead: 1069 # Doing prefetch on every read() call is surprisingly expensive 1070 # when we're trying to deliver data at 600+ MiBps and want 1071 # the read() fast path to be as lightweight as possible. 1072 # 1073 # Only prefetching every 128 read operations 1074 # dramatically reduces the overhead while still 1075 # getting the benefit of prefetching (e.g. when 1076 # reading 128 KiB at a time, it checks for prefetch 1077 # every 16 MiB). 1078 self._read_counter = (self._read_counter+1) % 128 1079 if self._read_counter == 1: 1080 prefetch = locators_and_ranges(self._segments, 1081 offset + size, 1082 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1083 limit=(1+prefetch_lookahead)) 1084 1085 locs = set() 1086 data = [] 1087 for lr in readsegs: 1088 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1089 if block: 1090 blockview = memoryview(block) 1091 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1092 locs.add(lr.locator) 1093 else: 1094 break 1095 1096 if prefetch: 1097 for lr in prefetch: 1098 if lr.locator not in locs: 1099 self.parent._my_block_manager().block_prefetch(lr.locator) 1100 locs.add(lr.locator) 1101 1102 if len(data) == 1: 1103 return data[0] 1104 else: 1105 return b''.join(data) 1106 1107 @must_be_writable 1108 @synchronized 1109 def writeto(self, offset, data, num_retries): 1110 """Write `data` to the file starting at `offset`. 1111 1112 This will update existing bytes and/or extend the size of the file as 1113 necessary. 1114 1115 """ 1116 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1117 data = data.encode() 1118 if len(data) == 0: 1119 return 1120 1121 if offset > self.size(): 1122 self.truncate(offset) 1123 1124 if len(data) > config.KEEP_BLOCK_SIZE: 1125 # Chunk it up into smaller writes 1126 n = 0 1127 dataview = memoryview(data) 1128 while n < len(data): 1129 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1130 n += config.KEEP_BLOCK_SIZE 1131 return 1132 1133 self.set_committed(False) 1134 1135 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1136 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1137 1138 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1139 self._current_bblock.repack_writes() 1140 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1141 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1142 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1143 1144 self._current_bblock.append(data) 1145 1146 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1147 1148 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1149 1150 return len(data) 1151 1152 @synchronized 1153 def flush(self, sync=True, num_retries=0): 1154 """Flush the current bufferblock to Keep. 1155 1156 :sync: 1157 If True, commit block synchronously, wait until buffer block has been written. 1158 If False, commit block asynchronously, return immediately after putting block into 1159 the keep put queue. 1160 """ 1161 if self.committed(): 1162 return 1163 1164 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1165 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1166 self._current_bblock.repack_writes() 1167 if self._current_bblock.state() != _BufferBlock.DELETED: 1168 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1169 1170 if sync: 1171 to_delete = set() 1172 for s in self._segments: 1173 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1174 if bb: 1175 if bb.state() != _BufferBlock.COMMITTED: 1176 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1177 to_delete.add(s.locator) 1178 s.locator = bb.locator() 1179 for s in to_delete: 1180 # Don't delete the bufferblock if it's owned by many files. It'll be 1181 # deleted after all of its owners are flush()ed. 1182 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1183 self.parent._my_block_manager().delete_bufferblock(s) 1184 1185 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1186 1187 @must_be_writable 1188 @synchronized 1189 def add_segment(self, blocks, pos, size): 1190 """Add a segment to the end of the file. 1191 1192 `pos` and `offset` reference a section of the stream described by 1193 `blocks` (a list of Range objects) 1194 1195 """ 1196 self._add_segment(blocks, pos, size) 1197 1198 def _add_segment(self, blocks, pos, size): 1199 """Internal implementation of add_segment.""" 1200 self.set_committed(False) 1201 for lr in locators_and_ranges(blocks, pos, size): 1202 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1203 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1204 self._segments.append(r) 1205 1206 @synchronized 1207 def size(self): 1208 """Get the file size.""" 1209 if self._segments: 1210 n = self._segments[-1] 1211 return n.range_start + n.range_size 1212 else: 1213 return 0 1214 1215 @synchronized 1216 def manifest_text(self, stream_name=".", portable_locators=False, 1217 normalize=False, only_committed=False): 1218 buf = "" 1219 filestream = [] 1220 for segment in self._segments: 1221 loc = segment.locator 1222 if self.parent._my_block_manager().is_bufferblock(loc): 1223 if only_committed: 1224 continue 1225 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1226 if portable_locators: 1227 loc = KeepLocator(loc).stripped() 1228 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1229 segment.segment_offset, segment.range_size)) 1230 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1231 buf += "\n" 1232 return buf 1233 1234 @must_be_writable 1235 @synchronized 1236 def _reparent(self, newparent, newname): 1237 self.set_committed(False) 1238 self.flush(sync=True) 1239 self.parent.remove(self.name) 1240 self.parent = newparent 1241 self.name = newname 1242 self.lock = self.parent.root_collection().lock
Represent a file in a Collection.
ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.
This object may be accessed from multiple threads.
831 def __init__(self, parent, name, stream=[], segments=[]): 832 """ 833 ArvadosFile constructor. 834 835 :stream: 836 a list of Range objects representing a block stream 837 838 :segments: 839 a list of Range objects representing segments 840 """ 841 self.parent = parent 842 self.name = name 843 self._writers = set() 844 self._committed = False 845 self._segments = [] 846 self.lock = parent.root_collection().lock 847 for s in segments: 848 self._add_segment(stream, s.locator, s.range_size) 849 self._current_bblock = None 850 self._read_counter = 0
ArvadosFile constructor.
:stream: a list of Range objects representing a block stream
:segments: a list of Range objects representing segments
855 @synchronized 856 def permission_expired(self, as_of_dt=None): 857 """Returns True if any of the segment's locators is expired""" 858 for r in self._segments: 859 if KeepLocator(r.locator).permission_expired(as_of_dt): 860 return True 861 return False
Returns True if any of the segment’s locators is expired
863 @synchronized 864 def has_remote_blocks(self): 865 """Returns True if any of the segment's locators has a +R signature""" 866 867 for s in self._segments: 868 if '+R' in s.locator: 869 return True 870 return False
Returns True if any of the segment’s locators has a +R signature
899 @synchronized 900 def clone(self, new_parent, new_name): 901 """Make a copy of this file.""" 902 cp = ArvadosFile(new_parent, new_name) 903 cp.replace_contents(self) 904 return cp
Make a copy of this file.
906 @must_be_writable 907 @synchronized 908 def replace_contents(self, other): 909 """Replace segments of this file with segments from another `ArvadosFile` object.""" 910 911 map_loc = {} 912 self._segments = [] 913 for other_segment in other.segments(): 914 new_loc = other_segment.locator 915 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 916 if other_segment.locator not in map_loc: 917 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 918 if bufferblock.state() != _BufferBlock.WRITABLE: 919 map_loc[other_segment.locator] = bufferblock.locator() 920 else: 921 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 922 new_loc = map_loc[other_segment.locator] 923 924 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 925 926 self.set_committed(False)
Replace segments of this file with segments from another ArvadosFile
object.
965 @synchronized 966 def set_committed(self, value=True): 967 """Set committed flag. 968 969 If value is True, set committed to be True. 970 971 If value is False, set committed to be False for this and all parents. 972 """ 973 if value == self._committed: 974 return 975 self._committed = value 976 if self._committed is False and self.parent is not None: 977 self.parent.set_committed(False)
Set committed flag.
If value is True, set committed to be True.
If value is False, set committed to be False for this and all parents.
979 @synchronized 980 def committed(self): 981 """Get whether this is committed or not.""" 982 return self._committed
Get whether this is committed or not.
984 @synchronized 985 def add_writer(self, writer): 986 """Add an ArvadosFileWriter reference to the list of writers""" 987 if isinstance(writer, ArvadosFileWriter): 988 self._writers.add(writer)
Add an ArvadosFileWriter reference to the list of writers
990 @synchronized 991 def remove_writer(self, writer, flush): 992 """ 993 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 994 and do some block maintenance tasks. 995 """ 996 self._writers.remove(writer) 997 998 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 999 # File writer closed, not small enough for repacking 1000 self.flush() 1001 elif self.closed(): 1002 # All writers closed and size is adequate for repacking 1003 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.
1005 def closed(self): 1006 """ 1007 Get whether this is closed or not. When the writers list is empty, the file 1008 is supposed to be closed. 1009 """ 1010 return len(self._writers) == 0
Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.
1012 @must_be_writable 1013 @synchronized 1014 def truncate(self, size): 1015 """Shrink or expand the size of the file. 1016 1017 If `size` is less than the size of the file, the file contents after 1018 `size` will be discarded. If `size` is greater than the current size 1019 of the file, it will be filled with zero bytes. 1020 1021 """ 1022 if size < self.size(): 1023 new_segs = [] 1024 for r in self._segments: 1025 range_end = r.range_start+r.range_size 1026 if r.range_start >= size: 1027 # segment is past the trucate size, all done 1028 break 1029 elif size < range_end: 1030 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1031 nr.segment_offset = r.segment_offset 1032 new_segs.append(nr) 1033 break 1034 else: 1035 new_segs.append(r) 1036 1037 self._segments = new_segs 1038 self.set_committed(False) 1039 elif size > self.size(): 1040 padding = self.parent._my_block_manager().get_padding_block() 1041 diff = size - self.size() 1042 while diff > config.KEEP_BLOCK_SIZE: 1043 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1044 diff -= config.KEEP_BLOCK_SIZE 1045 if diff > 0: 1046 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1047 self.set_committed(False) 1048 else: 1049 # size == self.size() 1050 pass
1052 def readfrom(self, offset, size, num_retries, exact=False): 1053 """Read up to `size` bytes from the file starting at `offset`. 1054 1055 :exact: 1056 If False (default), return less data than requested if the read 1057 crosses a block boundary and the next block isn't cached. If True, 1058 only return less data than requested when hitting EOF. 1059 """ 1060 1061 with self.lock: 1062 if size == 0 or offset >= self.size(): 1063 return b'' 1064 readsegs = locators_and_ranges(self._segments, offset, size) 1065 1066 prefetch = None 1067 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1068 if prefetch_lookahead: 1069 # Doing prefetch on every read() call is surprisingly expensive 1070 # when we're trying to deliver data at 600+ MiBps and want 1071 # the read() fast path to be as lightweight as possible. 1072 # 1073 # Only prefetching every 128 read operations 1074 # dramatically reduces the overhead while still 1075 # getting the benefit of prefetching (e.g. when 1076 # reading 128 KiB at a time, it checks for prefetch 1077 # every 16 MiB). 1078 self._read_counter = (self._read_counter+1) % 128 1079 if self._read_counter == 1: 1080 prefetch = locators_and_ranges(self._segments, 1081 offset + size, 1082 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1083 limit=(1+prefetch_lookahead)) 1084 1085 locs = set() 1086 data = [] 1087 for lr in readsegs: 1088 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1089 if block: 1090 blockview = memoryview(block) 1091 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1092 locs.add(lr.locator) 1093 else: 1094 break 1095 1096 if prefetch: 1097 for lr in prefetch: 1098 if lr.locator not in locs: 1099 self.parent._my_block_manager().block_prefetch(lr.locator) 1100 locs.add(lr.locator) 1101 1102 if len(data) == 1: 1103 return data[0] 1104 else: 1105 return b''.join(data)
Read up to size
bytes from the file starting at offset
.
:exact: If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.
1107 @must_be_writable 1108 @synchronized 1109 def writeto(self, offset, data, num_retries): 1110 """Write `data` to the file starting at `offset`. 1111 1112 This will update existing bytes and/or extend the size of the file as 1113 necessary. 1114 1115 """ 1116 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1117 data = data.encode() 1118 if len(data) == 0: 1119 return 1120 1121 if offset > self.size(): 1122 self.truncate(offset) 1123 1124 if len(data) > config.KEEP_BLOCK_SIZE: 1125 # Chunk it up into smaller writes 1126 n = 0 1127 dataview = memoryview(data) 1128 while n < len(data): 1129 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1130 n += config.KEEP_BLOCK_SIZE 1131 return 1132 1133 self.set_committed(False) 1134 1135 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1136 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1137 1138 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1139 self._current_bblock.repack_writes() 1140 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1141 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1142 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1143 1144 self._current_bblock.append(data) 1145 1146 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1147 1148 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1149 1150 return len(data)
Write data
to the file starting at offset
.
This will update existing bytes and/or extend the size of the file as necessary.
1152 @synchronized 1153 def flush(self, sync=True, num_retries=0): 1154 """Flush the current bufferblock to Keep. 1155 1156 :sync: 1157 If True, commit block synchronously, wait until buffer block has been written. 1158 If False, commit block asynchronously, return immediately after putting block into 1159 the keep put queue. 1160 """ 1161 if self.committed(): 1162 return 1163 1164 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1165 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1166 self._current_bblock.repack_writes() 1167 if self._current_bblock.state() != _BufferBlock.DELETED: 1168 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1169 1170 if sync: 1171 to_delete = set() 1172 for s in self._segments: 1173 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1174 if bb: 1175 if bb.state() != _BufferBlock.COMMITTED: 1176 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1177 to_delete.add(s.locator) 1178 s.locator = bb.locator() 1179 for s in to_delete: 1180 # Don't delete the bufferblock if it's owned by many files. It'll be 1181 # deleted after all of its owners are flush()ed. 1182 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1183 self.parent._my_block_manager().delete_bufferblock(s) 1184 1185 self.parent.notify(MOD, self.parent, self.name, (self, self))
Flush the current bufferblock to Keep.
:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.
1187 @must_be_writable 1188 @synchronized 1189 def add_segment(self, blocks, pos, size): 1190 """Add a segment to the end of the file. 1191 1192 `pos` and `offset` reference a section of the stream described by 1193 `blocks` (a list of Range objects) 1194 1195 """ 1196 self._add_segment(blocks, pos, size)
Add a segment to the end of the file.
pos
and offset
reference a section of the stream described by
blocks
(a list of Range objects)
1206 @synchronized 1207 def size(self): 1208 """Get the file size.""" 1209 if self._segments: 1210 n = self._segments[-1] 1211 return n.range_start + n.range_size 1212 else: 1213 return 0
Get the file size.
1215 @synchronized 1216 def manifest_text(self, stream_name=".", portable_locators=False, 1217 normalize=False, only_committed=False): 1218 buf = "" 1219 filestream = [] 1220 for segment in self._segments: 1221 loc = segment.locator 1222 if self.parent._my_block_manager().is_bufferblock(loc): 1223 if only_committed: 1224 continue 1225 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1226 if portable_locators: 1227 loc = KeepLocator(loc).stripped() 1228 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1229 segment.segment_offset, segment.range_size)) 1230 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1231 buf += "\n" 1232 return buf
1245class ArvadosFileReader(ArvadosFileReaderBase): 1246 """Wraps ArvadosFile in a file-like object supporting reading only. 1247 1248 Be aware that this class is NOT thread safe as there is no locking around 1249 updating file pointer. 1250 1251 """ 1252 1253 def __init__(self, arvadosfile, mode="r", num_retries=None): 1254 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1255 self.arvadosfile = arvadosfile 1256 1257 def size(self): 1258 return self.arvadosfile.size() 1259 1260 def stream_name(self): 1261 return self.arvadosfile.parent.stream_name() 1262 1263 def readinto(self, b): 1264 data = self.read(len(b)) 1265 b[:len(data)] = data 1266 return len(data) 1267 1268 @_FileLikeObjectBase._before_close 1269 @retry_method 1270 def read(self, size=None, num_retries=None): 1271 """Read up to `size` bytes from the file and return the result. 1272 1273 Starts at the current file position. If `size` is None, read the 1274 entire remainder of the file. 1275 """ 1276 if size is None: 1277 data = [] 1278 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1279 while rd: 1280 data.append(rd) 1281 self._filepos += len(rd) 1282 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1283 return b''.join(data) 1284 else: 1285 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1286 self._filepos += len(data) 1287 return data 1288 1289 @_FileLikeObjectBase._before_close 1290 @retry_method 1291 def readfrom(self, offset, size, num_retries=None): 1292 """Read up to `size` bytes from the stream, starting at the specified file offset. 1293 1294 This method does not change the file position. 1295 """ 1296 return self.arvadosfile.readfrom(offset, size, num_retries) 1297 1298 def flush(self): 1299 pass
Wraps ArvadosFile in a file-like object supporting reading only.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1268 @_FileLikeObjectBase._before_close 1269 @retry_method 1270 def read(self, size=None, num_retries=None): 1271 """Read up to `size` bytes from the file and return the result. 1272 1273 Starts at the current file position. If `size` is None, read the 1274 entire remainder of the file. 1275 """ 1276 if size is None: 1277 data = [] 1278 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1279 while rd: 1280 data.append(rd) 1281 self._filepos += len(rd) 1282 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1283 return b''.join(data) 1284 else: 1285 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1286 self._filepos += len(data) 1287 return data
1289 @_FileLikeObjectBase._before_close 1290 @retry_method 1291 def readfrom(self, offset, size, num_retries=None): 1292 """Read up to `size` bytes from the stream, starting at the specified file offset. 1293 1294 This method does not change the file position. 1295 """ 1296 return self.arvadosfile.readfrom(offset, size, num_retries)
Read up to size
bytes from the stream, starting at the specified file offset.
This method does not change the file position.
1302class ArvadosFileWriter(ArvadosFileReader): 1303 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1304 1305 Be aware that this class is NOT thread safe as there is no locking around 1306 updating file pointer. 1307 1308 """ 1309 1310 def __init__(self, arvadosfile, mode, num_retries=None): 1311 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1312 self.arvadosfile.add_writer(self) 1313 1314 def writable(self): 1315 return True 1316 1317 @_FileLikeObjectBase._before_close 1318 @retry_method 1319 def write(self, data, num_retries=None): 1320 if self.mode[0] == "a": 1321 self._filepos = self.size() 1322 self.arvadosfile.writeto(self._filepos, data, num_retries) 1323 self._filepos += len(data) 1324 return len(data) 1325 1326 @_FileLikeObjectBase._before_close 1327 @retry_method 1328 def writelines(self, seq, num_retries=None): 1329 for s in seq: 1330 self.write(s, num_retries=num_retries) 1331 1332 @_FileLikeObjectBase._before_close 1333 def truncate(self, size=None): 1334 if size is None: 1335 size = self._filepos 1336 self.arvadosfile.truncate(size) 1337 1338 @_FileLikeObjectBase._before_close 1339 def flush(self): 1340 self.arvadosfile.flush() 1341 1342 def close(self, flush=True): 1343 if not self.closed: 1344 self.arvadosfile.remove_writer(self, flush) 1345 super(ArvadosFileWriter, self).close()
Wraps ArvadosFile in a file-like object supporting both reading and writing.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1348class WrappableFile(object): 1349 """An interface to an Arvados file that's compatible with io wrappers. 1350 1351 """ 1352 def __init__(self, f): 1353 self.f = f 1354 self.closed = False 1355 def close(self): 1356 self.closed = True 1357 return self.f.close() 1358 def flush(self): 1359 return self.f.flush() 1360 def read(self, *args, **kwargs): 1361 return self.f.read(*args, **kwargs) 1362 def readable(self): 1363 return self.f.readable() 1364 def readinto(self, *args, **kwargs): 1365 return self.f.readinto(*args, **kwargs) 1366 def seek(self, *args, **kwargs): 1367 return self.f.seek(*args, **kwargs) 1368 def seekable(self): 1369 return self.f.seekable() 1370 def tell(self): 1371 return self.f.tell() 1372 def writable(self): 1373 return self.f.writable() 1374 def write(self, *args, **kwargs): 1375 return self.f.write(*args, **kwargs)
An interface to an Arvados file that’s compatible with io wrappers.