arvados.arvfile
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import absolute_import 6from __future__ import division 7from future import standard_library 8from future.utils import listitems, listvalues 9standard_library.install_aliases() 10from builtins import range 11from builtins import object 12import bz2 13import collections 14import copy 15import errno 16import functools 17import hashlib 18import logging 19import os 20import queue 21import re 22import sys 23import threading 24import uuid 25import zlib 26 27from . import config 28from .errors import KeepWriteError, AssertionError, ArgumentError 29from .keep import KeepLocator 30from ._normalize_stream import normalize_stream 31from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange 32from .retry import retry_method 33 34MOD = "mod" 35WRITE = "write" 36 37_logger = logging.getLogger('arvados.arvfile') 38 39def split(path): 40 """split(path) -> streamname, filename 41 42 Separate the stream name and file name in a /-separated stream path and 43 return a tuple (stream_name, file_name). If no stream name is available, 44 assume '.'. 45 46 """ 47 try: 48 stream_name, file_name = path.rsplit('/', 1) 49 except ValueError: # No / in string 50 stream_name, file_name = '.', path 51 return stream_name, file_name 52 53 54class UnownedBlockError(Exception): 55 """Raised when there's an writable block without an owner on the BlockManager.""" 56 pass 57 58 59class _FileLikeObjectBase(object): 60 def __init__(self, name, mode): 61 self.name = name 62 self.mode = mode 63 self.closed = False 64 65 @staticmethod 66 def _before_close(orig_func): 67 @functools.wraps(orig_func) 68 def before_close_wrapper(self, *args, **kwargs): 69 if self.closed: 70 raise ValueError("I/O operation on closed stream file") 71 return orig_func(self, *args, **kwargs) 72 return before_close_wrapper 73 74 def __enter__(self): 75 return self 76 77 def __exit__(self, exc_type, exc_value, traceback): 78 try: 79 self.close() 80 except Exception: 81 if exc_type is None: 82 raise 83 84 def close(self): 85 self.closed = True 86 87 88class ArvadosFileReaderBase(_FileLikeObjectBase): 89 def __init__(self, name, mode, num_retries=None): 90 super(ArvadosFileReaderBase, self).__init__(name, mode) 91 self._filepos = 0 92 self.num_retries = num_retries 93 self._readline_cache = (None, None) 94 95 def __iter__(self): 96 while True: 97 data = self.readline() 98 if not data: 99 break 100 yield data 101 102 def decompressed_name(self): 103 return re.sub(r'\.(bz2|gz)$', '', self.name) 104 105 @_FileLikeObjectBase._before_close 106 def seek(self, pos, whence=os.SEEK_SET): 107 if whence == os.SEEK_CUR: 108 pos += self._filepos 109 elif whence == os.SEEK_END: 110 pos += self.size() 111 if pos < 0: 112 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 113 self._filepos = pos 114 return self._filepos 115 116 def tell(self): 117 return self._filepos 118 119 def readable(self): 120 return True 121 122 def writable(self): 123 return False 124 125 def seekable(self): 126 return True 127 128 @_FileLikeObjectBase._before_close 129 @retry_method 130 def readall(self, size=2**20, num_retries=None): 131 while True: 132 data = self.read(size, num_retries=num_retries) 133 if len(data) == 0: 134 break 135 yield data 136 137 @_FileLikeObjectBase._before_close 138 @retry_method 139 def readline(self, size=float('inf'), num_retries=None): 140 cache_pos, cache_data = self._readline_cache 141 if self.tell() == cache_pos: 142 data = [cache_data] 143 self._filepos += len(cache_data) 144 else: 145 data = [b''] 146 data_size = len(data[-1]) 147 while (data_size < size) and (b'\n' not in data[-1]): 148 next_read = self.read(2 ** 20, num_retries=num_retries) 149 if not next_read: 150 break 151 data.append(next_read) 152 data_size += len(next_read) 153 data = b''.join(data) 154 try: 155 nextline_index = data.index(b'\n') + 1 156 except ValueError: 157 nextline_index = len(data) 158 nextline_index = min(nextline_index, size) 159 self._filepos -= len(data) - nextline_index 160 self._readline_cache = (self.tell(), data[nextline_index:]) 161 return data[:nextline_index].decode() 162 163 @_FileLikeObjectBase._before_close 164 @retry_method 165 def decompress(self, decompress, size, num_retries=None): 166 for segment in self.readall(size, num_retries=num_retries): 167 data = decompress(segment) 168 if data: 169 yield data 170 171 @_FileLikeObjectBase._before_close 172 @retry_method 173 def readall_decompressed(self, size=2**20, num_retries=None): 174 self.seek(0) 175 if self.name.endswith('.bz2'): 176 dc = bz2.BZ2Decompressor() 177 return self.decompress(dc.decompress, size, 178 num_retries=num_retries) 179 elif self.name.endswith('.gz'): 180 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 181 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 182 size, num_retries=num_retries) 183 else: 184 return self.readall(size, num_retries=num_retries) 185 186 @_FileLikeObjectBase._before_close 187 @retry_method 188 def readlines(self, sizehint=float('inf'), num_retries=None): 189 data = [] 190 data_size = 0 191 for s in self.readall(num_retries=num_retries): 192 data.append(s) 193 data_size += len(s) 194 if data_size >= sizehint: 195 break 196 return b''.join(data).decode().splitlines(True) 197 198 def size(self): 199 raise IOError(errno.ENOSYS, "Not implemented") 200 201 def read(self, size, num_retries=None): 202 raise IOError(errno.ENOSYS, "Not implemented") 203 204 def readfrom(self, start, size, num_retries=None): 205 raise IOError(errno.ENOSYS, "Not implemented") 206 207 208class StreamFileReader(ArvadosFileReaderBase): 209 class _NameAttribute(str): 210 # The Python file API provides a plain .name attribute. 211 # Older SDK provided a name() method. 212 # This class provides both, for maximum compatibility. 213 def __call__(self): 214 return self 215 216 def __init__(self, stream, segments, name): 217 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 218 self._stream = stream 219 self.segments = segments 220 221 def stream_name(self): 222 return self._stream.name() 223 224 def size(self): 225 n = self.segments[-1] 226 return n.range_start + n.range_size 227 228 @_FileLikeObjectBase._before_close 229 @retry_method 230 def read(self, size, num_retries=None): 231 """Read up to 'size' bytes from the stream, starting at the current file position""" 232 if size == 0: 233 return b'' 234 235 data = b'' 236 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 237 if available_chunks: 238 lr = available_chunks[0] 239 data = self._stream.readfrom(lr.locator+lr.segment_offset, 240 lr.segment_size, 241 num_retries=num_retries) 242 243 self._filepos += len(data) 244 return data 245 246 @_FileLikeObjectBase._before_close 247 @retry_method 248 def readfrom(self, start, size, num_retries=None): 249 """Read up to 'size' bytes from the stream, starting at 'start'""" 250 if size == 0: 251 return b'' 252 253 data = [] 254 for lr in locators_and_ranges(self.segments, start, size): 255 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 256 num_retries=num_retries)) 257 return b''.join(data) 258 259 def as_manifest(self): 260 segs = [] 261 for r in self.segments: 262 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 263 return " ".join(normalize_stream(".", {self.name: segs})) + "\n" 264 265 266def synchronized(orig_func): 267 @functools.wraps(orig_func) 268 def synchronized_wrapper(self, *args, **kwargs): 269 with self.lock: 270 return orig_func(self, *args, **kwargs) 271 return synchronized_wrapper 272 273 274class StateChangeError(Exception): 275 def __init__(self, message, state, nextstate): 276 super(StateChangeError, self).__init__(message) 277 self.state = state 278 self.nextstate = nextstate 279 280class _BufferBlock(object): 281 """A stand-in for a Keep block that is in the process of being written. 282 283 Writers can append to it, get the size, and compute the Keep locator. 284 There are three valid states: 285 286 WRITABLE 287 Can append to block. 288 289 PENDING 290 Block is in the process of being uploaded to Keep, append is an error. 291 292 COMMITTED 293 The block has been written to Keep, its internal buffer has been 294 released, fetching the block will fetch it via keep client (since we 295 discarded the internal copy), and identifiers referring to the BufferBlock 296 can be replaced with the block locator. 297 298 """ 299 300 WRITABLE = 0 301 PENDING = 1 302 COMMITTED = 2 303 ERROR = 3 304 DELETED = 4 305 306 def __init__(self, blockid, starting_capacity, owner): 307 """ 308 :blockid: 309 the identifier for this block 310 311 :starting_capacity: 312 the initial buffer capacity 313 314 :owner: 315 ArvadosFile that owns this block 316 317 """ 318 self.blockid = blockid 319 self.buffer_block = bytearray(starting_capacity) 320 self.buffer_view = memoryview(self.buffer_block) 321 self.write_pointer = 0 322 self._state = _BufferBlock.WRITABLE 323 self._locator = None 324 self.owner = owner 325 self.lock = threading.Lock() 326 self.wait_for_commit = threading.Event() 327 self.error = None 328 329 @synchronized 330 def append(self, data): 331 """Append some data to the buffer. 332 333 Only valid if the block is in WRITABLE state. Implements an expanding 334 buffer, doubling capacity as needed to accomdate all the data. 335 336 """ 337 if self._state == _BufferBlock.WRITABLE: 338 if not isinstance(data, bytes) and not isinstance(data, memoryview): 339 data = data.encode() 340 while (self.write_pointer+len(data)) > len(self.buffer_block): 341 new_buffer_block = bytearray(len(self.buffer_block) * 2) 342 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] 343 self.buffer_block = new_buffer_block 344 self.buffer_view = memoryview(self.buffer_block) 345 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data 346 self.write_pointer += len(data) 347 self._locator = None 348 else: 349 raise AssertionError("Buffer block is not writable") 350 351 STATE_TRANSITIONS = frozenset([ 352 (WRITABLE, PENDING), 353 (PENDING, COMMITTED), 354 (PENDING, ERROR), 355 (ERROR, PENDING)]) 356 357 @synchronized 358 def set_state(self, nextstate, val=None): 359 if (self._state, nextstate) not in self.STATE_TRANSITIONS: 360 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) 361 self._state = nextstate 362 363 if self._state == _BufferBlock.PENDING: 364 self.wait_for_commit.clear() 365 366 if self._state == _BufferBlock.COMMITTED: 367 self._locator = val 368 self.buffer_view = None 369 self.buffer_block = None 370 self.wait_for_commit.set() 371 372 if self._state == _BufferBlock.ERROR: 373 self.error = val 374 self.wait_for_commit.set() 375 376 @synchronized 377 def state(self): 378 return self._state 379 380 def size(self): 381 """The amount of data written to the buffer.""" 382 return self.write_pointer 383 384 @synchronized 385 def locator(self): 386 """The Keep locator for this buffer's contents.""" 387 if self._locator is None: 388 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) 389 return self._locator 390 391 @synchronized 392 def clone(self, new_blockid, owner): 393 if self._state == _BufferBlock.COMMITTED: 394 raise AssertionError("Cannot duplicate committed buffer block") 395 bufferblock = _BufferBlock(new_blockid, self.size(), owner) 396 bufferblock.append(self.buffer_view[0:self.size()]) 397 return bufferblock 398 399 @synchronized 400 def clear(self): 401 self._state = _BufferBlock.DELETED 402 self.owner = None 403 self.buffer_block = None 404 self.buffer_view = None 405 406 @synchronized 407 def repack_writes(self): 408 """Optimize buffer block by repacking segments in file sequence. 409 410 When the client makes random writes, they appear in the buffer block in 411 the sequence they were written rather than the sequence they appear in 412 the file. This makes for inefficient, fragmented manifests. Attempt 413 to optimize by repacking writes in file sequence. 414 415 """ 416 if self._state != _BufferBlock.WRITABLE: 417 raise AssertionError("Cannot repack non-writable block") 418 419 segs = self.owner.segments() 420 421 # Collect the segments that reference the buffer block. 422 bufferblock_segs = [s for s in segs if s.locator == self.blockid] 423 424 # Collect total data referenced by segments (could be smaller than 425 # bufferblock size if a portion of the file was written and 426 # then overwritten). 427 write_total = sum([s.range_size for s in bufferblock_segs]) 428 429 if write_total < self.size() or len(bufferblock_segs) > 1: 430 # If there's more than one segment referencing this block, it is 431 # due to out-of-order writes and will produce a fragmented 432 # manifest, so try to optimize by re-packing into a new buffer. 433 contents = self.buffer_view[0:self.write_pointer].tobytes() 434 new_bb = _BufferBlock(None, write_total, None) 435 for t in bufferblock_segs: 436 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) 437 t.segment_offset = new_bb.size() - t.range_size 438 439 self.buffer_block = new_bb.buffer_block 440 self.buffer_view = new_bb.buffer_view 441 self.write_pointer = new_bb.write_pointer 442 self._locator = None 443 new_bb.clear() 444 self.owner.set_segments(segs) 445 446 def __repr__(self): 447 return "<BufferBlock %s>" % (self.blockid) 448 449 450class NoopLock(object): 451 def __enter__(self): 452 return self 453 454 def __exit__(self, exc_type, exc_value, traceback): 455 pass 456 457 def acquire(self, blocking=False): 458 pass 459 460 def release(self): 461 pass 462 463 464def must_be_writable(orig_func): 465 @functools.wraps(orig_func) 466 def must_be_writable_wrapper(self, *args, **kwargs): 467 if not self.writable(): 468 raise IOError(errno.EROFS, "Collection is read-only.") 469 return orig_func(self, *args, **kwargs) 470 return must_be_writable_wrapper 471 472 473class _BlockManager(object): 474 """BlockManager handles buffer blocks. 475 476 Also handles background block uploads, and background block prefetch for a 477 Collection of ArvadosFiles. 478 479 """ 480 481 DEFAULT_PUT_THREADS = 2 482 483 def __init__(self, keep, 484 copies=None, 485 put_threads=None, 486 num_retries=None, 487 storage_classes_func=None): 488 """keep: KeepClient object to use""" 489 self._keep = keep 490 self._bufferblocks = collections.OrderedDict() 491 self._put_queue = None 492 self._put_threads = None 493 self.lock = threading.Lock() 494 self.prefetch_lookahead = self._keep.num_prefetch_threads 495 self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS 496 self.copies = copies 497 self.storage_classes = storage_classes_func or (lambda: []) 498 self._pending_write_size = 0 499 self.threads_lock = threading.Lock() 500 self.padding_block = None 501 self.num_retries = num_retries 502 503 @synchronized 504 def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 505 """Allocate a new, empty bufferblock in WRITABLE state and return it. 506 507 :blockid: 508 optional block identifier, otherwise one will be automatically assigned 509 510 :starting_capacity: 511 optional capacity, otherwise will use default capacity 512 513 :owner: 514 ArvadosFile that owns this block 515 516 """ 517 return self._alloc_bufferblock(blockid, starting_capacity, owner) 518 519 def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): 520 if blockid is None: 521 blockid = str(uuid.uuid4()) 522 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) 523 self._bufferblocks[bufferblock.blockid] = bufferblock 524 return bufferblock 525 526 @synchronized 527 def dup_block(self, block, owner): 528 """Create a new bufferblock initialized with the content of an existing bufferblock. 529 530 :block: 531 the buffer block to copy. 532 533 :owner: 534 ArvadosFile that owns the new block 535 536 """ 537 new_blockid = str(uuid.uuid4()) 538 bufferblock = block.clone(new_blockid, owner) 539 self._bufferblocks[bufferblock.blockid] = bufferblock 540 return bufferblock 541 542 @synchronized 543 def is_bufferblock(self, locator): 544 return locator in self._bufferblocks 545 546 def _commit_bufferblock_worker(self): 547 """Background uploader thread.""" 548 549 while True: 550 try: 551 bufferblock = self._put_queue.get() 552 if bufferblock is None: 553 return 554 555 if self.copies is None: 556 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 557 else: 558 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 559 bufferblock.set_state(_BufferBlock.COMMITTED, loc) 560 except Exception as e: 561 bufferblock.set_state(_BufferBlock.ERROR, e) 562 finally: 563 if self._put_queue is not None: 564 self._put_queue.task_done() 565 566 def start_put_threads(self): 567 with self.threads_lock: 568 if self._put_threads is None: 569 # Start uploader threads. 570 571 # If we don't limit the Queue size, the upload queue can quickly 572 # grow to take up gigabytes of RAM if the writing process is 573 # generating data more quickly than it can be sent to the Keep 574 # servers. 575 # 576 # With two upload threads and a queue size of 2, this means up to 4 577 # blocks pending. If they are full 64 MiB blocks, that means up to 578 # 256 MiB of internal buffering, which is the same size as the 579 # default download block cache in KeepClient. 580 self._put_queue = queue.Queue(maxsize=2) 581 582 self._put_threads = [] 583 for i in range(0, self.num_put_threads): 584 thread = threading.Thread(target=self._commit_bufferblock_worker) 585 self._put_threads.append(thread) 586 thread.daemon = True 587 thread.start() 588 589 @synchronized 590 def stop_threads(self): 591 """Shut down and wait for background upload and download threads to finish.""" 592 593 if self._put_threads is not None: 594 for t in self._put_threads: 595 self._put_queue.put(None) 596 for t in self._put_threads: 597 t.join() 598 self._put_threads = None 599 self._put_queue = None 600 601 def __enter__(self): 602 return self 603 604 def __exit__(self, exc_type, exc_value, traceback): 605 self.stop_threads() 606 607 @synchronized 608 def repack_small_blocks(self, force=False, sync=False, closed_file_size=0): 609 """Packs small blocks together before uploading""" 610 611 self._pending_write_size += closed_file_size 612 613 # Check if there are enough small blocks for filling up one in full 614 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): 615 return 616 617 # Search blocks ready for getting packed together before being 618 # committed to Keep. 619 # A WRITABLE block always has an owner. 620 # A WRITABLE block with its owner.closed() implies that its 621 # size is <= KEEP_BLOCK_SIZE/2. 622 try: 623 small_blocks = [b for b in listvalues(self._bufferblocks) 624 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] 625 except AttributeError: 626 # Writable blocks without owner shouldn't exist. 627 raise UnownedBlockError() 628 629 if len(small_blocks) <= 1: 630 # Not enough small blocks for repacking 631 return 632 633 for bb in small_blocks: 634 bb.repack_writes() 635 636 # Update the pending write size count with its true value, just in case 637 # some small file was opened, written and closed several times. 638 self._pending_write_size = sum([b.size() for b in small_blocks]) 639 640 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: 641 return 642 643 new_bb = self._alloc_bufferblock() 644 new_bb.owner = [] 645 files = [] 646 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: 647 bb = small_blocks.pop(0) 648 new_bb.owner.append(bb.owner) 649 self._pending_write_size -= bb.size() 650 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) 651 files.append((bb, new_bb.write_pointer - bb.size())) 652 653 self.commit_bufferblock(new_bb, sync=sync) 654 655 for bb, new_bb_segment_offset in files: 656 newsegs = bb.owner.segments() 657 for s in newsegs: 658 if s.locator == bb.blockid: 659 s.locator = new_bb.blockid 660 s.segment_offset = new_bb_segment_offset+s.segment_offset 661 bb.owner.set_segments(newsegs) 662 self._delete_bufferblock(bb.blockid) 663 664 def commit_bufferblock(self, block, sync): 665 """Initiate a background upload of a bufferblock. 666 667 :block: 668 The block object to upload 669 670 :sync: 671 If `sync` is True, upload the block synchronously. 672 If `sync` is False, upload the block asynchronously. This will 673 return immediately unless the upload queue is at capacity, in 674 which case it will wait on an upload queue slot. 675 676 """ 677 try: 678 # Mark the block as PENDING so to disallow any more appends. 679 block.set_state(_BufferBlock.PENDING) 680 except StateChangeError as e: 681 if e.state == _BufferBlock.PENDING: 682 if sync: 683 block.wait_for_commit.wait() 684 else: 685 return 686 if block.state() == _BufferBlock.COMMITTED: 687 return 688 elif block.state() == _BufferBlock.ERROR: 689 raise block.error 690 else: 691 raise 692 693 if sync: 694 try: 695 if self.copies is None: 696 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes()) 697 else: 698 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()) 699 block.set_state(_BufferBlock.COMMITTED, loc) 700 except Exception as e: 701 block.set_state(_BufferBlock.ERROR, e) 702 raise 703 else: 704 self.start_put_threads() 705 self._put_queue.put(block) 706 707 @synchronized 708 def get_bufferblock(self, locator): 709 return self._bufferblocks.get(locator) 710 711 @synchronized 712 def get_padding_block(self): 713 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding 714 when using truncate() to extend the size of a file. 715 716 For reference (and possible future optimization), the md5sum of the 717 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 718 719 """ 720 721 if self.padding_block is None: 722 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) 723 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE 724 self.commit_bufferblock(self.padding_block, False) 725 return self.padding_block 726 727 @synchronized 728 def delete_bufferblock(self, locator): 729 self._delete_bufferblock(locator) 730 731 def _delete_bufferblock(self, locator): 732 if locator in self._bufferblocks: 733 bb = self._bufferblocks[locator] 734 bb.clear() 735 del self._bufferblocks[locator] 736 737 def get_block_contents(self, locator, num_retries, cache_only=False): 738 """Fetch a block. 739 740 First checks to see if the locator is a BufferBlock and return that, if 741 not, passes the request through to KeepClient.get(). 742 743 """ 744 with self.lock: 745 if locator in self._bufferblocks: 746 bufferblock = self._bufferblocks[locator] 747 if bufferblock.state() != _BufferBlock.COMMITTED: 748 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() 749 else: 750 locator = bufferblock._locator 751 if cache_only: 752 return self._keep.get_from_cache(locator) 753 else: 754 return self._keep.get(locator, num_retries=num_retries) 755 756 def commit_all(self): 757 """Commit all outstanding buffer blocks. 758 759 This is a synchronous call, and will not return until all buffer blocks 760 are uploaded. Raises KeepWriteError() if any blocks failed to upload. 761 762 """ 763 self.repack_small_blocks(force=True, sync=True) 764 765 with self.lock: 766 items = listitems(self._bufferblocks) 767 768 for k,v in items: 769 if v.state() != _BufferBlock.COMMITTED and v.owner: 770 # Ignore blocks with a list of owners, as if they're not in COMMITTED 771 # state, they're already being committed asynchronously. 772 if isinstance(v.owner, ArvadosFile): 773 v.owner.flush(sync=False) 774 775 with self.lock: 776 if self._put_queue is not None: 777 self._put_queue.join() 778 779 err = [] 780 for k,v in items: 781 if v.state() == _BufferBlock.ERROR: 782 err.append((v.locator(), v.error)) 783 if err: 784 raise KeepWriteError("Error writing some blocks", err, label="block") 785 786 for k,v in items: 787 # flush again with sync=True to remove committed bufferblocks from 788 # the segments. 789 if v.owner: 790 if isinstance(v.owner, ArvadosFile): 791 v.owner.flush(sync=True) 792 elif isinstance(v.owner, list) and len(v.owner) > 0: 793 # This bufferblock is referenced by many files as a result 794 # of repacking small blocks, so don't delete it when flushing 795 # its owners, just do it after flushing them all. 796 for owner in v.owner: 797 owner.flush(sync=True) 798 self.delete_bufferblock(k) 799 800 self.stop_threads() 801 802 def block_prefetch(self, locator): 803 """Initiate a background download of a block. 804 """ 805 806 if not self.prefetch_lookahead: 807 return 808 809 with self.lock: 810 if locator in self._bufferblocks: 811 return 812 813 self._keep.block_prefetch(locator) 814 815 816class ArvadosFile(object): 817 """Represent a file in a Collection. 818 819 ArvadosFile manages the underlying representation of a file in Keep as a 820 sequence of segments spanning a set of blocks, and implements random 821 read/write access. 822 823 This object may be accessed from multiple threads. 824 825 """ 826 827 __slots__ = ('parent', 'name', '_writers', '_committed', 828 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 829 830 def __init__(self, parent, name, stream=[], segments=[]): 831 """ 832 ArvadosFile constructor. 833 834 :stream: 835 a list of Range objects representing a block stream 836 837 :segments: 838 a list of Range objects representing segments 839 """ 840 self.parent = parent 841 self.name = name 842 self._writers = set() 843 self._committed = False 844 self._segments = [] 845 self.lock = parent.root_collection().lock 846 for s in segments: 847 self._add_segment(stream, s.locator, s.range_size) 848 self._current_bblock = None 849 self._read_counter = 0 850 851 def writable(self): 852 return self.parent.writable() 853 854 @synchronized 855 def permission_expired(self, as_of_dt=None): 856 """Returns True if any of the segment's locators is expired""" 857 for r in self._segments: 858 if KeepLocator(r.locator).permission_expired(as_of_dt): 859 return True 860 return False 861 862 @synchronized 863 def has_remote_blocks(self): 864 """Returns True if any of the segment's locators has a +R signature""" 865 866 for s in self._segments: 867 if '+R' in s.locator: 868 return True 869 return False 870 871 @synchronized 872 def _copy_remote_blocks(self, remote_blocks={}): 873 """Ask Keep to copy remote blocks and point to their local copies. 874 875 This is called from the parent Collection. 876 877 :remote_blocks: 878 Shared cache of remote to local block mappings. This is used to avoid 879 doing extra work when blocks are shared by more than one file in 880 different subdirectories. 881 """ 882 883 for s in self._segments: 884 if '+R' in s.locator: 885 try: 886 loc = remote_blocks[s.locator] 887 except KeyError: 888 loc = self.parent._my_keep().refresh_signature(s.locator) 889 remote_blocks[s.locator] = loc 890 s.locator = loc 891 self.parent.set_committed(False) 892 return remote_blocks 893 894 @synchronized 895 def segments(self): 896 return copy.copy(self._segments) 897 898 @synchronized 899 def clone(self, new_parent, new_name): 900 """Make a copy of this file.""" 901 cp = ArvadosFile(new_parent, new_name) 902 cp.replace_contents(self) 903 return cp 904 905 @must_be_writable 906 @synchronized 907 def replace_contents(self, other): 908 """Replace segments of this file with segments from another `ArvadosFile` object.""" 909 910 map_loc = {} 911 self._segments = [] 912 for other_segment in other.segments(): 913 new_loc = other_segment.locator 914 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 915 if other_segment.locator not in map_loc: 916 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 917 if bufferblock.state() != _BufferBlock.WRITABLE: 918 map_loc[other_segment.locator] = bufferblock.locator() 919 else: 920 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 921 new_loc = map_loc[other_segment.locator] 922 923 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 924 925 self.set_committed(False) 926 927 def __eq__(self, other): 928 if other is self: 929 return True 930 if not isinstance(other, ArvadosFile): 931 return False 932 933 othersegs = other.segments() 934 with self.lock: 935 if len(self._segments) != len(othersegs): 936 return False 937 for i in range(0, len(othersegs)): 938 seg1 = self._segments[i] 939 seg2 = othersegs[i] 940 loc1 = seg1.locator 941 loc2 = seg2.locator 942 943 if self.parent._my_block_manager().is_bufferblock(loc1): 944 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 945 946 if other.parent._my_block_manager().is_bufferblock(loc2): 947 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 948 949 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 950 seg1.range_start != seg2.range_start or 951 seg1.range_size != seg2.range_size or 952 seg1.segment_offset != seg2.segment_offset): 953 return False 954 955 return True 956 957 def __ne__(self, other): 958 return not self.__eq__(other) 959 960 @synchronized 961 def set_segments(self, segs): 962 self._segments = segs 963 964 @synchronized 965 def set_committed(self, value=True): 966 """Set committed flag. 967 968 If value is True, set committed to be True. 969 970 If value is False, set committed to be False for this and all parents. 971 """ 972 if value == self._committed: 973 return 974 self._committed = value 975 if self._committed is False and self.parent is not None: 976 self.parent.set_committed(False) 977 978 @synchronized 979 def committed(self): 980 """Get whether this is committed or not.""" 981 return self._committed 982 983 @synchronized 984 def add_writer(self, writer): 985 """Add an ArvadosFileWriter reference to the list of writers""" 986 if isinstance(writer, ArvadosFileWriter): 987 self._writers.add(writer) 988 989 @synchronized 990 def remove_writer(self, writer, flush): 991 """ 992 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 993 and do some block maintenance tasks. 994 """ 995 self._writers.remove(writer) 996 997 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 998 # File writer closed, not small enough for repacking 999 self.flush() 1000 elif self.closed(): 1001 # All writers closed and size is adequate for repacking 1002 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 1003 1004 def closed(self): 1005 """ 1006 Get whether this is closed or not. When the writers list is empty, the file 1007 is supposed to be closed. 1008 """ 1009 return len(self._writers) == 0 1010 1011 @must_be_writable 1012 @synchronized 1013 def truncate(self, size): 1014 """Shrink or expand the size of the file. 1015 1016 If `size` is less than the size of the file, the file contents after 1017 `size` will be discarded. If `size` is greater than the current size 1018 of the file, it will be filled with zero bytes. 1019 1020 """ 1021 if size < self.size(): 1022 new_segs = [] 1023 for r in self._segments: 1024 range_end = r.range_start+r.range_size 1025 if r.range_start >= size: 1026 # segment is past the trucate size, all done 1027 break 1028 elif size < range_end: 1029 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1030 nr.segment_offset = r.segment_offset 1031 new_segs.append(nr) 1032 break 1033 else: 1034 new_segs.append(r) 1035 1036 self._segments = new_segs 1037 self.set_committed(False) 1038 elif size > self.size(): 1039 padding = self.parent._my_block_manager().get_padding_block() 1040 diff = size - self.size() 1041 while diff > config.KEEP_BLOCK_SIZE: 1042 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1043 diff -= config.KEEP_BLOCK_SIZE 1044 if diff > 0: 1045 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1046 self.set_committed(False) 1047 else: 1048 # size == self.size() 1049 pass 1050 1051 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 1052 """Read up to `size` bytes from the file starting at `offset`. 1053 1054 Arguments: 1055 1056 * exact: bool --- If False (default), return less data than 1057 requested if the read crosses a block boundary and the next 1058 block isn't cached. If True, only return less data than 1059 requested when hitting EOF. 1060 1061 * return_memoryview: bool --- If False (default) return a 1062 `bytes` object, which may entail making a copy in some 1063 situations. If True, return a `memoryview` object which may 1064 avoid making a copy, but may be incompatible with code 1065 expecting a `bytes` object. 1066 1067 """ 1068 1069 with self.lock: 1070 if size == 0 or offset >= self.size(): 1071 return memoryview(b'') if return_memoryview else b'' 1072 readsegs = locators_and_ranges(self._segments, offset, size) 1073 1074 prefetch = None 1075 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1076 if prefetch_lookahead: 1077 # Doing prefetch on every read() call is surprisingly expensive 1078 # when we're trying to deliver data at 600+ MiBps and want 1079 # the read() fast path to be as lightweight as possible. 1080 # 1081 # Only prefetching every 128 read operations 1082 # dramatically reduces the overhead while still 1083 # getting the benefit of prefetching (e.g. when 1084 # reading 128 KiB at a time, it checks for prefetch 1085 # every 16 MiB). 1086 self._read_counter = (self._read_counter+1) % 128 1087 if self._read_counter == 1: 1088 prefetch = locators_and_ranges(self._segments, 1089 offset + size, 1090 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1091 limit=(1+prefetch_lookahead)) 1092 1093 locs = set() 1094 data = [] 1095 for lr in readsegs: 1096 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1097 if block: 1098 blockview = memoryview(block) 1099 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1100 locs.add(lr.locator) 1101 else: 1102 break 1103 1104 if prefetch: 1105 for lr in prefetch: 1106 if lr.locator not in locs: 1107 self.parent._my_block_manager().block_prefetch(lr.locator) 1108 locs.add(lr.locator) 1109 1110 if len(data) == 1: 1111 return data[0] if return_memoryview else data[0].tobytes() 1112 else: 1113 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1114 1115 1116 @must_be_writable 1117 @synchronized 1118 def writeto(self, offset, data, num_retries): 1119 """Write `data` to the file starting at `offset`. 1120 1121 This will update existing bytes and/or extend the size of the file as 1122 necessary. 1123 1124 """ 1125 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1126 data = data.encode() 1127 if len(data) == 0: 1128 return 1129 1130 if offset > self.size(): 1131 self.truncate(offset) 1132 1133 if len(data) > config.KEEP_BLOCK_SIZE: 1134 # Chunk it up into smaller writes 1135 n = 0 1136 dataview = memoryview(data) 1137 while n < len(data): 1138 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1139 n += config.KEEP_BLOCK_SIZE 1140 return 1141 1142 self.set_committed(False) 1143 1144 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1145 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1146 1147 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1148 self._current_bblock.repack_writes() 1149 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1150 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1151 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1152 1153 self._current_bblock.append(data) 1154 1155 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1156 1157 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1158 1159 return len(data) 1160 1161 @synchronized 1162 def flush(self, sync=True, num_retries=0): 1163 """Flush the current bufferblock to Keep. 1164 1165 :sync: 1166 If True, commit block synchronously, wait until buffer block has been written. 1167 If False, commit block asynchronously, return immediately after putting block into 1168 the keep put queue. 1169 """ 1170 if self.committed(): 1171 return 1172 1173 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1174 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1175 self._current_bblock.repack_writes() 1176 if self._current_bblock.state() != _BufferBlock.DELETED: 1177 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1178 1179 if sync: 1180 to_delete = set() 1181 for s in self._segments: 1182 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1183 if bb: 1184 if bb.state() != _BufferBlock.COMMITTED: 1185 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1186 to_delete.add(s.locator) 1187 s.locator = bb.locator() 1188 for s in to_delete: 1189 # Don't delete the bufferblock if it's owned by many files. It'll be 1190 # deleted after all of its owners are flush()ed. 1191 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1192 self.parent._my_block_manager().delete_bufferblock(s) 1193 1194 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1195 1196 @must_be_writable 1197 @synchronized 1198 def add_segment(self, blocks, pos, size): 1199 """Add a segment to the end of the file. 1200 1201 `pos` and `offset` reference a section of the stream described by 1202 `blocks` (a list of Range objects) 1203 1204 """ 1205 self._add_segment(blocks, pos, size) 1206 1207 def _add_segment(self, blocks, pos, size): 1208 """Internal implementation of add_segment.""" 1209 self.set_committed(False) 1210 for lr in locators_and_ranges(blocks, pos, size): 1211 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1212 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1213 self._segments.append(r) 1214 1215 @synchronized 1216 def size(self): 1217 """Get the file size.""" 1218 if self._segments: 1219 n = self._segments[-1] 1220 return n.range_start + n.range_size 1221 else: 1222 return 0 1223 1224 @synchronized 1225 def manifest_text(self, stream_name=".", portable_locators=False, 1226 normalize=False, only_committed=False): 1227 buf = "" 1228 filestream = [] 1229 for segment in self._segments: 1230 loc = segment.locator 1231 if self.parent._my_block_manager().is_bufferblock(loc): 1232 if only_committed: 1233 continue 1234 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1235 if portable_locators: 1236 loc = KeepLocator(loc).stripped() 1237 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1238 segment.segment_offset, segment.range_size)) 1239 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1240 buf += "\n" 1241 return buf 1242 1243 @must_be_writable 1244 @synchronized 1245 def _reparent(self, newparent, newname): 1246 self.set_committed(False) 1247 self.flush(sync=True) 1248 self.parent.remove(self.name) 1249 self.parent = newparent 1250 self.name = newname 1251 self.lock = self.parent.root_collection().lock 1252 1253 1254class ArvadosFileReader(ArvadosFileReaderBase): 1255 """Wraps ArvadosFile in a file-like object supporting reading only. 1256 1257 Be aware that this class is NOT thread safe as there is no locking around 1258 updating file pointer. 1259 1260 """ 1261 1262 def __init__(self, arvadosfile, mode="r", num_retries=None): 1263 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1264 self.arvadosfile = arvadosfile 1265 1266 def size(self): 1267 return self.arvadosfile.size() 1268 1269 def stream_name(self): 1270 return self.arvadosfile.parent.stream_name() 1271 1272 def readinto(self, b): 1273 data = self.read(len(b)) 1274 b[:len(data)] = data 1275 return len(data) 1276 1277 @_FileLikeObjectBase._before_close 1278 @retry_method 1279 def read(self, size=-1, num_retries=None, return_memoryview=False): 1280 """Read up to `size` bytes from the file and return the result. 1281 1282 Starts at the current file position. If `size` is negative or None, 1283 read the entire remainder of the file. 1284 1285 Returns None if the file pointer is at the end of the file. 1286 1287 Returns a `bytes` object, unless `return_memoryview` is True, 1288 in which case it returns a memory view, which may avoid an 1289 unnecessary data copy in some situations. 1290 1291 """ 1292 if size < 0 or size is None: 1293 data = [] 1294 # 1295 # specify exact=False, return_memoryview=True here so that we 1296 # only copy data once into the final buffer. 1297 # 1298 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1299 while rd: 1300 data.append(rd) 1301 self._filepos += len(rd) 1302 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1303 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1304 else: 1305 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1306 self._filepos += len(data) 1307 return data 1308 1309 @_FileLikeObjectBase._before_close 1310 @retry_method 1311 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1312 """Read up to `size` bytes from the stream, starting at the specified file offset. 1313 1314 This method does not change the file position. 1315 1316 Returns a `bytes` object, unless `return_memoryview` is True, 1317 in which case it returns a memory view, which may avoid an 1318 unnecessary data copy in some situations. 1319 1320 """ 1321 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview) 1322 1323 def flush(self): 1324 pass 1325 1326 1327class ArvadosFileWriter(ArvadosFileReader): 1328 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1329 1330 Be aware that this class is NOT thread safe as there is no locking around 1331 updating file pointer. 1332 1333 """ 1334 1335 def __init__(self, arvadosfile, mode, num_retries=None): 1336 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1337 self.arvadosfile.add_writer(self) 1338 1339 def writable(self): 1340 return True 1341 1342 @_FileLikeObjectBase._before_close 1343 @retry_method 1344 def write(self, data, num_retries=None): 1345 if self.mode[0] == "a": 1346 self._filepos = self.size() 1347 self.arvadosfile.writeto(self._filepos, data, num_retries) 1348 self._filepos += len(data) 1349 return len(data) 1350 1351 @_FileLikeObjectBase._before_close 1352 @retry_method 1353 def writelines(self, seq, num_retries=None): 1354 for s in seq: 1355 self.write(s, num_retries=num_retries) 1356 1357 @_FileLikeObjectBase._before_close 1358 def truncate(self, size=None): 1359 if size is None: 1360 size = self._filepos 1361 self.arvadosfile.truncate(size) 1362 1363 @_FileLikeObjectBase._before_close 1364 def flush(self): 1365 self.arvadosfile.flush() 1366 1367 def close(self, flush=True): 1368 if not self.closed: 1369 self.arvadosfile.remove_writer(self, flush) 1370 super(ArvadosFileWriter, self).close() 1371 1372 1373class WrappableFile(object): 1374 """An interface to an Arvados file that's compatible with io wrappers. 1375 1376 """ 1377 def __init__(self, f): 1378 self.f = f 1379 self.closed = False 1380 def close(self): 1381 self.closed = True 1382 return self.f.close() 1383 def flush(self): 1384 return self.f.flush() 1385 def read(self, *args, **kwargs): 1386 return self.f.read(*args, **kwargs) 1387 def readable(self): 1388 return self.f.readable() 1389 def readinto(self, *args, **kwargs): 1390 return self.f.readinto(*args, **kwargs) 1391 def seek(self, *args, **kwargs): 1392 return self.f.seek(*args, **kwargs) 1393 def seekable(self): 1394 return self.f.seekable() 1395 def tell(self): 1396 return self.f.tell() 1397 def writable(self): 1398 return self.f.writable() 1399 def write(self, *args, **kwargs): 1400 return self.f.write(*args, **kwargs)
40def split(path): 41 """split(path) -> streamname, filename 42 43 Separate the stream name and file name in a /-separated stream path and 44 return a tuple (stream_name, file_name). If no stream name is available, 45 assume '.'. 46 47 """ 48 try: 49 stream_name, file_name = path.rsplit('/', 1) 50 except ValueError: # No / in string 51 stream_name, file_name = '.', path 52 return stream_name, file_name
split(path) -> streamname, filename
Separate the stream name and file name in a /-separated stream path and return a tuple (stream_name, file_name). If no stream name is available, assume ‘.’.
55class UnownedBlockError(Exception): 56 """Raised when there's an writable block without an owner on the BlockManager.""" 57 pass
Raised when there’s an writable block without an owner on the BlockManager.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
89class ArvadosFileReaderBase(_FileLikeObjectBase): 90 def __init__(self, name, mode, num_retries=None): 91 super(ArvadosFileReaderBase, self).__init__(name, mode) 92 self._filepos = 0 93 self.num_retries = num_retries 94 self._readline_cache = (None, None) 95 96 def __iter__(self): 97 while True: 98 data = self.readline() 99 if not data: 100 break 101 yield data 102 103 def decompressed_name(self): 104 return re.sub(r'\.(bz2|gz)$', '', self.name) 105 106 @_FileLikeObjectBase._before_close 107 def seek(self, pos, whence=os.SEEK_SET): 108 if whence == os.SEEK_CUR: 109 pos += self._filepos 110 elif whence == os.SEEK_END: 111 pos += self.size() 112 if pos < 0: 113 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 114 self._filepos = pos 115 return self._filepos 116 117 def tell(self): 118 return self._filepos 119 120 def readable(self): 121 return True 122 123 def writable(self): 124 return False 125 126 def seekable(self): 127 return True 128 129 @_FileLikeObjectBase._before_close 130 @retry_method 131 def readall(self, size=2**20, num_retries=None): 132 while True: 133 data = self.read(size, num_retries=num_retries) 134 if len(data) == 0: 135 break 136 yield data 137 138 @_FileLikeObjectBase._before_close 139 @retry_method 140 def readline(self, size=float('inf'), num_retries=None): 141 cache_pos, cache_data = self._readline_cache 142 if self.tell() == cache_pos: 143 data = [cache_data] 144 self._filepos += len(cache_data) 145 else: 146 data = [b''] 147 data_size = len(data[-1]) 148 while (data_size < size) and (b'\n' not in data[-1]): 149 next_read = self.read(2 ** 20, num_retries=num_retries) 150 if not next_read: 151 break 152 data.append(next_read) 153 data_size += len(next_read) 154 data = b''.join(data) 155 try: 156 nextline_index = data.index(b'\n') + 1 157 except ValueError: 158 nextline_index = len(data) 159 nextline_index = min(nextline_index, size) 160 self._filepos -= len(data) - nextline_index 161 self._readline_cache = (self.tell(), data[nextline_index:]) 162 return data[:nextline_index].decode() 163 164 @_FileLikeObjectBase._before_close 165 @retry_method 166 def decompress(self, decompress, size, num_retries=None): 167 for segment in self.readall(size, num_retries=num_retries): 168 data = decompress(segment) 169 if data: 170 yield data 171 172 @_FileLikeObjectBase._before_close 173 @retry_method 174 def readall_decompressed(self, size=2**20, num_retries=None): 175 self.seek(0) 176 if self.name.endswith('.bz2'): 177 dc = bz2.BZ2Decompressor() 178 return self.decompress(dc.decompress, size, 179 num_retries=num_retries) 180 elif self.name.endswith('.gz'): 181 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 182 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 183 size, num_retries=num_retries) 184 else: 185 return self.readall(size, num_retries=num_retries) 186 187 @_FileLikeObjectBase._before_close 188 @retry_method 189 def readlines(self, sizehint=float('inf'), num_retries=None): 190 data = [] 191 data_size = 0 192 for s in self.readall(num_retries=num_retries): 193 data.append(s) 194 data_size += len(s) 195 if data_size >= sizehint: 196 break 197 return b''.join(data).decode().splitlines(True) 198 199 def size(self): 200 raise IOError(errno.ENOSYS, "Not implemented") 201 202 def read(self, size, num_retries=None): 203 raise IOError(errno.ENOSYS, "Not implemented") 204 205 def readfrom(self, start, size, num_retries=None): 206 raise IOError(errno.ENOSYS, "Not implemented")
106 @_FileLikeObjectBase._before_close 107 def seek(self, pos, whence=os.SEEK_SET): 108 if whence == os.SEEK_CUR: 109 pos += self._filepos 110 elif whence == os.SEEK_END: 111 pos += self.size() 112 if pos < 0: 113 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 114 self._filepos = pos 115 return self._filepos
138 @_FileLikeObjectBase._before_close 139 @retry_method 140 def readline(self, size=float('inf'), num_retries=None): 141 cache_pos, cache_data = self._readline_cache 142 if self.tell() == cache_pos: 143 data = [cache_data] 144 self._filepos += len(cache_data) 145 else: 146 data = [b''] 147 data_size = len(data[-1]) 148 while (data_size < size) and (b'\n' not in data[-1]): 149 next_read = self.read(2 ** 20, num_retries=num_retries) 150 if not next_read: 151 break 152 data.append(next_read) 153 data_size += len(next_read) 154 data = b''.join(data) 155 try: 156 nextline_index = data.index(b'\n') + 1 157 except ValueError: 158 nextline_index = len(data) 159 nextline_index = min(nextline_index, size) 160 self._filepos -= len(data) - nextline_index 161 self._readline_cache = (self.tell(), data[nextline_index:]) 162 return data[:nextline_index].decode()
172 @_FileLikeObjectBase._before_close 173 @retry_method 174 def readall_decompressed(self, size=2**20, num_retries=None): 175 self.seek(0) 176 if self.name.endswith('.bz2'): 177 dc = bz2.BZ2Decompressor() 178 return self.decompress(dc.decompress, size, 179 num_retries=num_retries) 180 elif self.name.endswith('.gz'): 181 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 182 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 183 size, num_retries=num_retries) 184 else: 185 return self.readall(size, num_retries=num_retries)
187 @_FileLikeObjectBase._before_close 188 @retry_method 189 def readlines(self, sizehint=float('inf'), num_retries=None): 190 data = [] 191 data_size = 0 192 for s in self.readall(num_retries=num_retries): 193 data.append(s) 194 data_size += len(s) 195 if data_size >= sizehint: 196 break 197 return b''.join(data).decode().splitlines(True)
Inherited Members
209class StreamFileReader(ArvadosFileReaderBase): 210 class _NameAttribute(str): 211 # The Python file API provides a plain .name attribute. 212 # Older SDK provided a name() method. 213 # This class provides both, for maximum compatibility. 214 def __call__(self): 215 return self 216 217 def __init__(self, stream, segments, name): 218 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 219 self._stream = stream 220 self.segments = segments 221 222 def stream_name(self): 223 return self._stream.name() 224 225 def size(self): 226 n = self.segments[-1] 227 return n.range_start + n.range_size 228 229 @_FileLikeObjectBase._before_close 230 @retry_method 231 def read(self, size, num_retries=None): 232 """Read up to 'size' bytes from the stream, starting at the current file position""" 233 if size == 0: 234 return b'' 235 236 data = b'' 237 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 238 if available_chunks: 239 lr = available_chunks[0] 240 data = self._stream.readfrom(lr.locator+lr.segment_offset, 241 lr.segment_size, 242 num_retries=num_retries) 243 244 self._filepos += len(data) 245 return data 246 247 @_FileLikeObjectBase._before_close 248 @retry_method 249 def readfrom(self, start, size, num_retries=None): 250 """Read up to 'size' bytes from the stream, starting at 'start'""" 251 if size == 0: 252 return b'' 253 254 data = [] 255 for lr in locators_and_ranges(self.segments, start, size): 256 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 257 num_retries=num_retries)) 258 return b''.join(data) 259 260 def as_manifest(self): 261 segs = [] 262 for r in self.segments: 263 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 264 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
229 @_FileLikeObjectBase._before_close 230 @retry_method 231 def read(self, size, num_retries=None): 232 """Read up to 'size' bytes from the stream, starting at the current file position""" 233 if size == 0: 234 return b'' 235 236 data = b'' 237 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 238 if available_chunks: 239 lr = available_chunks[0] 240 data = self._stream.readfrom(lr.locator+lr.segment_offset, 241 lr.segment_size, 242 num_retries=num_retries) 243 244 self._filepos += len(data) 245 return data
Read up to ‘size’ bytes from the stream, starting at the current file position
247 @_FileLikeObjectBase._before_close 248 @retry_method 249 def readfrom(self, start, size, num_retries=None): 250 """Read up to 'size' bytes from the stream, starting at 'start'""" 251 if size == 0: 252 return b'' 253 254 data = [] 255 for lr in locators_and_ranges(self.segments, start, size): 256 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 257 num_retries=num_retries)) 258 return b''.join(data)
Read up to ‘size’ bytes from the stream, starting at ‘start’
275class StateChangeError(Exception): 276 def __init__(self, message, state, nextstate): 277 super(StateChangeError, self).__init__(message) 278 self.state = state 279 self.nextstate = nextstate
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
817class ArvadosFile(object): 818 """Represent a file in a Collection. 819 820 ArvadosFile manages the underlying representation of a file in Keep as a 821 sequence of segments spanning a set of blocks, and implements random 822 read/write access. 823 824 This object may be accessed from multiple threads. 825 826 """ 827 828 __slots__ = ('parent', 'name', '_writers', '_committed', 829 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter') 830 831 def __init__(self, parent, name, stream=[], segments=[]): 832 """ 833 ArvadosFile constructor. 834 835 :stream: 836 a list of Range objects representing a block stream 837 838 :segments: 839 a list of Range objects representing segments 840 """ 841 self.parent = parent 842 self.name = name 843 self._writers = set() 844 self._committed = False 845 self._segments = [] 846 self.lock = parent.root_collection().lock 847 for s in segments: 848 self._add_segment(stream, s.locator, s.range_size) 849 self._current_bblock = None 850 self._read_counter = 0 851 852 def writable(self): 853 return self.parent.writable() 854 855 @synchronized 856 def permission_expired(self, as_of_dt=None): 857 """Returns True if any of the segment's locators is expired""" 858 for r in self._segments: 859 if KeepLocator(r.locator).permission_expired(as_of_dt): 860 return True 861 return False 862 863 @synchronized 864 def has_remote_blocks(self): 865 """Returns True if any of the segment's locators has a +R signature""" 866 867 for s in self._segments: 868 if '+R' in s.locator: 869 return True 870 return False 871 872 @synchronized 873 def _copy_remote_blocks(self, remote_blocks={}): 874 """Ask Keep to copy remote blocks and point to their local copies. 875 876 This is called from the parent Collection. 877 878 :remote_blocks: 879 Shared cache of remote to local block mappings. This is used to avoid 880 doing extra work when blocks are shared by more than one file in 881 different subdirectories. 882 """ 883 884 for s in self._segments: 885 if '+R' in s.locator: 886 try: 887 loc = remote_blocks[s.locator] 888 except KeyError: 889 loc = self.parent._my_keep().refresh_signature(s.locator) 890 remote_blocks[s.locator] = loc 891 s.locator = loc 892 self.parent.set_committed(False) 893 return remote_blocks 894 895 @synchronized 896 def segments(self): 897 return copy.copy(self._segments) 898 899 @synchronized 900 def clone(self, new_parent, new_name): 901 """Make a copy of this file.""" 902 cp = ArvadosFile(new_parent, new_name) 903 cp.replace_contents(self) 904 return cp 905 906 @must_be_writable 907 @synchronized 908 def replace_contents(self, other): 909 """Replace segments of this file with segments from another `ArvadosFile` object.""" 910 911 map_loc = {} 912 self._segments = [] 913 for other_segment in other.segments(): 914 new_loc = other_segment.locator 915 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 916 if other_segment.locator not in map_loc: 917 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 918 if bufferblock.state() != _BufferBlock.WRITABLE: 919 map_loc[other_segment.locator] = bufferblock.locator() 920 else: 921 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 922 new_loc = map_loc[other_segment.locator] 923 924 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 925 926 self.set_committed(False) 927 928 def __eq__(self, other): 929 if other is self: 930 return True 931 if not isinstance(other, ArvadosFile): 932 return False 933 934 othersegs = other.segments() 935 with self.lock: 936 if len(self._segments) != len(othersegs): 937 return False 938 for i in range(0, len(othersegs)): 939 seg1 = self._segments[i] 940 seg2 = othersegs[i] 941 loc1 = seg1.locator 942 loc2 = seg2.locator 943 944 if self.parent._my_block_manager().is_bufferblock(loc1): 945 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 946 947 if other.parent._my_block_manager().is_bufferblock(loc2): 948 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 949 950 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 951 seg1.range_start != seg2.range_start or 952 seg1.range_size != seg2.range_size or 953 seg1.segment_offset != seg2.segment_offset): 954 return False 955 956 return True 957 958 def __ne__(self, other): 959 return not self.__eq__(other) 960 961 @synchronized 962 def set_segments(self, segs): 963 self._segments = segs 964 965 @synchronized 966 def set_committed(self, value=True): 967 """Set committed flag. 968 969 If value is True, set committed to be True. 970 971 If value is False, set committed to be False for this and all parents. 972 """ 973 if value == self._committed: 974 return 975 self._committed = value 976 if self._committed is False and self.parent is not None: 977 self.parent.set_committed(False) 978 979 @synchronized 980 def committed(self): 981 """Get whether this is committed or not.""" 982 return self._committed 983 984 @synchronized 985 def add_writer(self, writer): 986 """Add an ArvadosFileWriter reference to the list of writers""" 987 if isinstance(writer, ArvadosFileWriter): 988 self._writers.add(writer) 989 990 @synchronized 991 def remove_writer(self, writer, flush): 992 """ 993 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 994 and do some block maintenance tasks. 995 """ 996 self._writers.remove(writer) 997 998 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 999 # File writer closed, not small enough for repacking 1000 self.flush() 1001 elif self.closed(): 1002 # All writers closed and size is adequate for repacking 1003 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size()) 1004 1005 def closed(self): 1006 """ 1007 Get whether this is closed or not. When the writers list is empty, the file 1008 is supposed to be closed. 1009 """ 1010 return len(self._writers) == 0 1011 1012 @must_be_writable 1013 @synchronized 1014 def truncate(self, size): 1015 """Shrink or expand the size of the file. 1016 1017 If `size` is less than the size of the file, the file contents after 1018 `size` will be discarded. If `size` is greater than the current size 1019 of the file, it will be filled with zero bytes. 1020 1021 """ 1022 if size < self.size(): 1023 new_segs = [] 1024 for r in self._segments: 1025 range_end = r.range_start+r.range_size 1026 if r.range_start >= size: 1027 # segment is past the trucate size, all done 1028 break 1029 elif size < range_end: 1030 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1031 nr.segment_offset = r.segment_offset 1032 new_segs.append(nr) 1033 break 1034 else: 1035 new_segs.append(r) 1036 1037 self._segments = new_segs 1038 self.set_committed(False) 1039 elif size > self.size(): 1040 padding = self.parent._my_block_manager().get_padding_block() 1041 diff = size - self.size() 1042 while diff > config.KEEP_BLOCK_SIZE: 1043 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1044 diff -= config.KEEP_BLOCK_SIZE 1045 if diff > 0: 1046 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1047 self.set_committed(False) 1048 else: 1049 # size == self.size() 1050 pass 1051 1052 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 1053 """Read up to `size` bytes from the file starting at `offset`. 1054 1055 Arguments: 1056 1057 * exact: bool --- If False (default), return less data than 1058 requested if the read crosses a block boundary and the next 1059 block isn't cached. If True, only return less data than 1060 requested when hitting EOF. 1061 1062 * return_memoryview: bool --- If False (default) return a 1063 `bytes` object, which may entail making a copy in some 1064 situations. If True, return a `memoryview` object which may 1065 avoid making a copy, but may be incompatible with code 1066 expecting a `bytes` object. 1067 1068 """ 1069 1070 with self.lock: 1071 if size == 0 or offset >= self.size(): 1072 return memoryview(b'') if return_memoryview else b'' 1073 readsegs = locators_and_ranges(self._segments, offset, size) 1074 1075 prefetch = None 1076 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1077 if prefetch_lookahead: 1078 # Doing prefetch on every read() call is surprisingly expensive 1079 # when we're trying to deliver data at 600+ MiBps and want 1080 # the read() fast path to be as lightweight as possible. 1081 # 1082 # Only prefetching every 128 read operations 1083 # dramatically reduces the overhead while still 1084 # getting the benefit of prefetching (e.g. when 1085 # reading 128 KiB at a time, it checks for prefetch 1086 # every 16 MiB). 1087 self._read_counter = (self._read_counter+1) % 128 1088 if self._read_counter == 1: 1089 prefetch = locators_and_ranges(self._segments, 1090 offset + size, 1091 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1092 limit=(1+prefetch_lookahead)) 1093 1094 locs = set() 1095 data = [] 1096 for lr in readsegs: 1097 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1098 if block: 1099 blockview = memoryview(block) 1100 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1101 locs.add(lr.locator) 1102 else: 1103 break 1104 1105 if prefetch: 1106 for lr in prefetch: 1107 if lr.locator not in locs: 1108 self.parent._my_block_manager().block_prefetch(lr.locator) 1109 locs.add(lr.locator) 1110 1111 if len(data) == 1: 1112 return data[0] if return_memoryview else data[0].tobytes() 1113 else: 1114 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1115 1116 1117 @must_be_writable 1118 @synchronized 1119 def writeto(self, offset, data, num_retries): 1120 """Write `data` to the file starting at `offset`. 1121 1122 This will update existing bytes and/or extend the size of the file as 1123 necessary. 1124 1125 """ 1126 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1127 data = data.encode() 1128 if len(data) == 0: 1129 return 1130 1131 if offset > self.size(): 1132 self.truncate(offset) 1133 1134 if len(data) > config.KEEP_BLOCK_SIZE: 1135 # Chunk it up into smaller writes 1136 n = 0 1137 dataview = memoryview(data) 1138 while n < len(data): 1139 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1140 n += config.KEEP_BLOCK_SIZE 1141 return 1142 1143 self.set_committed(False) 1144 1145 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1146 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1147 1148 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1149 self._current_bblock.repack_writes() 1150 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1151 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1152 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1153 1154 self._current_bblock.append(data) 1155 1156 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1157 1158 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1159 1160 return len(data) 1161 1162 @synchronized 1163 def flush(self, sync=True, num_retries=0): 1164 """Flush the current bufferblock to Keep. 1165 1166 :sync: 1167 If True, commit block synchronously, wait until buffer block has been written. 1168 If False, commit block asynchronously, return immediately after putting block into 1169 the keep put queue. 1170 """ 1171 if self.committed(): 1172 return 1173 1174 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1175 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1176 self._current_bblock.repack_writes() 1177 if self._current_bblock.state() != _BufferBlock.DELETED: 1178 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1179 1180 if sync: 1181 to_delete = set() 1182 for s in self._segments: 1183 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1184 if bb: 1185 if bb.state() != _BufferBlock.COMMITTED: 1186 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1187 to_delete.add(s.locator) 1188 s.locator = bb.locator() 1189 for s in to_delete: 1190 # Don't delete the bufferblock if it's owned by many files. It'll be 1191 # deleted after all of its owners are flush()ed. 1192 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1193 self.parent._my_block_manager().delete_bufferblock(s) 1194 1195 self.parent.notify(MOD, self.parent, self.name, (self, self)) 1196 1197 @must_be_writable 1198 @synchronized 1199 def add_segment(self, blocks, pos, size): 1200 """Add a segment to the end of the file. 1201 1202 `pos` and `offset` reference a section of the stream described by 1203 `blocks` (a list of Range objects) 1204 1205 """ 1206 self._add_segment(blocks, pos, size) 1207 1208 def _add_segment(self, blocks, pos, size): 1209 """Internal implementation of add_segment.""" 1210 self.set_committed(False) 1211 for lr in locators_and_ranges(blocks, pos, size): 1212 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1213 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1214 self._segments.append(r) 1215 1216 @synchronized 1217 def size(self): 1218 """Get the file size.""" 1219 if self._segments: 1220 n = self._segments[-1] 1221 return n.range_start + n.range_size 1222 else: 1223 return 0 1224 1225 @synchronized 1226 def manifest_text(self, stream_name=".", portable_locators=False, 1227 normalize=False, only_committed=False): 1228 buf = "" 1229 filestream = [] 1230 for segment in self._segments: 1231 loc = segment.locator 1232 if self.parent._my_block_manager().is_bufferblock(loc): 1233 if only_committed: 1234 continue 1235 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1236 if portable_locators: 1237 loc = KeepLocator(loc).stripped() 1238 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1239 segment.segment_offset, segment.range_size)) 1240 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1241 buf += "\n" 1242 return buf 1243 1244 @must_be_writable 1245 @synchronized 1246 def _reparent(self, newparent, newname): 1247 self.set_committed(False) 1248 self.flush(sync=True) 1249 self.parent.remove(self.name) 1250 self.parent = newparent 1251 self.name = newname 1252 self.lock = self.parent.root_collection().lock
Represent a file in a Collection.
ArvadosFile manages the underlying representation of a file in Keep as a sequence of segments spanning a set of blocks, and implements random read/write access.
This object may be accessed from multiple threads.
831 def __init__(self, parent, name, stream=[], segments=[]): 832 """ 833 ArvadosFile constructor. 834 835 :stream: 836 a list of Range objects representing a block stream 837 838 :segments: 839 a list of Range objects representing segments 840 """ 841 self.parent = parent 842 self.name = name 843 self._writers = set() 844 self._committed = False 845 self._segments = [] 846 self.lock = parent.root_collection().lock 847 for s in segments: 848 self._add_segment(stream, s.locator, s.range_size) 849 self._current_bblock = None 850 self._read_counter = 0
ArvadosFile constructor.
:stream: a list of Range objects representing a block stream
:segments: a list of Range objects representing segments
855 @synchronized 856 def permission_expired(self, as_of_dt=None): 857 """Returns True if any of the segment's locators is expired""" 858 for r in self._segments: 859 if KeepLocator(r.locator).permission_expired(as_of_dt): 860 return True 861 return False
Returns True if any of the segment’s locators is expired
863 @synchronized 864 def has_remote_blocks(self): 865 """Returns True if any of the segment's locators has a +R signature""" 866 867 for s in self._segments: 868 if '+R' in s.locator: 869 return True 870 return False
Returns True if any of the segment’s locators has a +R signature
899 @synchronized 900 def clone(self, new_parent, new_name): 901 """Make a copy of this file.""" 902 cp = ArvadosFile(new_parent, new_name) 903 cp.replace_contents(self) 904 return cp
Make a copy of this file.
906 @must_be_writable 907 @synchronized 908 def replace_contents(self, other): 909 """Replace segments of this file with segments from another `ArvadosFile` object.""" 910 911 map_loc = {} 912 self._segments = [] 913 for other_segment in other.segments(): 914 new_loc = other_segment.locator 915 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 916 if other_segment.locator not in map_loc: 917 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 918 if bufferblock.state() != _BufferBlock.WRITABLE: 919 map_loc[other_segment.locator] = bufferblock.locator() 920 else: 921 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 922 new_loc = map_loc[other_segment.locator] 923 924 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 925 926 self.set_committed(False)
Replace segments of this file with segments from another ArvadosFile
object.
965 @synchronized 966 def set_committed(self, value=True): 967 """Set committed flag. 968 969 If value is True, set committed to be True. 970 971 If value is False, set committed to be False for this and all parents. 972 """ 973 if value == self._committed: 974 return 975 self._committed = value 976 if self._committed is False and self.parent is not None: 977 self.parent.set_committed(False)
Set committed flag.
If value is True, set committed to be True.
If value is False, set committed to be False for this and all parents.
979 @synchronized 980 def committed(self): 981 """Get whether this is committed or not.""" 982 return self._committed
Get whether this is committed or not.
984 @synchronized 985 def add_writer(self, writer): 986 """Add an ArvadosFileWriter reference to the list of writers""" 987 if isinstance(writer, ArvadosFileWriter): 988 self._writers.add(writer)
Add an ArvadosFileWriter reference to the list of writers
990 @synchronized 991 def remove_writer(self, writer, flush): 992 """ 993 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 994 and do some block maintenance tasks. 995 """ 996 self._writers.remove(writer) 997 998 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 999 # File writer closed, not small enough for repacking 1000 self.flush() 1001 elif self.closed(): 1002 # All writers closed and size is adequate for repacking 1003 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
Called from ArvadosFileWriter.close(). Remove a writer reference from the list and do some block maintenance tasks.
1005 def closed(self): 1006 """ 1007 Get whether this is closed or not. When the writers list is empty, the file 1008 is supposed to be closed. 1009 """ 1010 return len(self._writers) == 0
Get whether this is closed or not. When the writers list is empty, the file is supposed to be closed.
1012 @must_be_writable 1013 @synchronized 1014 def truncate(self, size): 1015 """Shrink or expand the size of the file. 1016 1017 If `size` is less than the size of the file, the file contents after 1018 `size` will be discarded. If `size` is greater than the current size 1019 of the file, it will be filled with zero bytes. 1020 1021 """ 1022 if size < self.size(): 1023 new_segs = [] 1024 for r in self._segments: 1025 range_end = r.range_start+r.range_size 1026 if r.range_start >= size: 1027 # segment is past the trucate size, all done 1028 break 1029 elif size < range_end: 1030 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1031 nr.segment_offset = r.segment_offset 1032 new_segs.append(nr) 1033 break 1034 else: 1035 new_segs.append(r) 1036 1037 self._segments = new_segs 1038 self.set_committed(False) 1039 elif size > self.size(): 1040 padding = self.parent._my_block_manager().get_padding_block() 1041 diff = size - self.size() 1042 while diff > config.KEEP_BLOCK_SIZE: 1043 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1044 diff -= config.KEEP_BLOCK_SIZE 1045 if diff > 0: 1046 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1047 self.set_committed(False) 1048 else: 1049 # size == self.size() 1050 pass
1052 def readfrom(self, offset, size, num_retries, exact=False, return_memoryview=False): 1053 """Read up to `size` bytes from the file starting at `offset`. 1054 1055 Arguments: 1056 1057 * exact: bool --- If False (default), return less data than 1058 requested if the read crosses a block boundary and the next 1059 block isn't cached. If True, only return less data than 1060 requested when hitting EOF. 1061 1062 * return_memoryview: bool --- If False (default) return a 1063 `bytes` object, which may entail making a copy in some 1064 situations. If True, return a `memoryview` object which may 1065 avoid making a copy, but may be incompatible with code 1066 expecting a `bytes` object. 1067 1068 """ 1069 1070 with self.lock: 1071 if size == 0 or offset >= self.size(): 1072 return memoryview(b'') if return_memoryview else b'' 1073 readsegs = locators_and_ranges(self._segments, offset, size) 1074 1075 prefetch = None 1076 prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead 1077 if prefetch_lookahead: 1078 # Doing prefetch on every read() call is surprisingly expensive 1079 # when we're trying to deliver data at 600+ MiBps and want 1080 # the read() fast path to be as lightweight as possible. 1081 # 1082 # Only prefetching every 128 read operations 1083 # dramatically reduces the overhead while still 1084 # getting the benefit of prefetching (e.g. when 1085 # reading 128 KiB at a time, it checks for prefetch 1086 # every 16 MiB). 1087 self._read_counter = (self._read_counter+1) % 128 1088 if self._read_counter == 1: 1089 prefetch = locators_and_ranges(self._segments, 1090 offset + size, 1091 config.KEEP_BLOCK_SIZE * prefetch_lookahead, 1092 limit=(1+prefetch_lookahead)) 1093 1094 locs = set() 1095 data = [] 1096 for lr in readsegs: 1097 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1098 if block: 1099 blockview = memoryview(block) 1100 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size]) 1101 locs.add(lr.locator) 1102 else: 1103 break 1104 1105 if prefetch: 1106 for lr in prefetch: 1107 if lr.locator not in locs: 1108 self.parent._my_block_manager().block_prefetch(lr.locator) 1109 locs.add(lr.locator) 1110 1111 if len(data) == 1: 1112 return data[0] if return_memoryview else data[0].tobytes() 1113 else: 1114 return memoryview(b''.join(data)) if return_memoryview else b''.join(data)
Read up to size
bytes from the file starting at offset
.
Arguments:
exact: bool — If False (default), return less data than requested if the read crosses a block boundary and the next block isn’t cached. If True, only return less data than requested when hitting EOF.
return_memoryview: bool — If False (default) return a
bytes
object, which may entail making a copy in some situations. If True, return amemoryview
object which may avoid making a copy, but may be incompatible with code expecting abytes
object.
1117 @must_be_writable 1118 @synchronized 1119 def writeto(self, offset, data, num_retries): 1120 """Write `data` to the file starting at `offset`. 1121 1122 This will update existing bytes and/or extend the size of the file as 1123 necessary. 1124 1125 """ 1126 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1127 data = data.encode() 1128 if len(data) == 0: 1129 return 1130 1131 if offset > self.size(): 1132 self.truncate(offset) 1133 1134 if len(data) > config.KEEP_BLOCK_SIZE: 1135 # Chunk it up into smaller writes 1136 n = 0 1137 dataview = memoryview(data) 1138 while n < len(data): 1139 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1140 n += config.KEEP_BLOCK_SIZE 1141 return 1142 1143 self.set_committed(False) 1144 1145 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1146 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1147 1148 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1149 self._current_bblock.repack_writes() 1150 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1151 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1152 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1153 1154 self._current_bblock.append(data) 1155 1156 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1157 1158 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1159 1160 return len(data)
Write data
to the file starting at offset
.
This will update existing bytes and/or extend the size of the file as necessary.
1162 @synchronized 1163 def flush(self, sync=True, num_retries=0): 1164 """Flush the current bufferblock to Keep. 1165 1166 :sync: 1167 If True, commit block synchronously, wait until buffer block has been written. 1168 If False, commit block asynchronously, return immediately after putting block into 1169 the keep put queue. 1170 """ 1171 if self.committed(): 1172 return 1173 1174 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1175 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1176 self._current_bblock.repack_writes() 1177 if self._current_bblock.state() != _BufferBlock.DELETED: 1178 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1179 1180 if sync: 1181 to_delete = set() 1182 for s in self._segments: 1183 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1184 if bb: 1185 if bb.state() != _BufferBlock.COMMITTED: 1186 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1187 to_delete.add(s.locator) 1188 s.locator = bb.locator() 1189 for s in to_delete: 1190 # Don't delete the bufferblock if it's owned by many files. It'll be 1191 # deleted after all of its owners are flush()ed. 1192 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1193 self.parent._my_block_manager().delete_bufferblock(s) 1194 1195 self.parent.notify(MOD, self.parent, self.name, (self, self))
Flush the current bufferblock to Keep.
:sync: If True, commit block synchronously, wait until buffer block has been written. If False, commit block asynchronously, return immediately after putting block into the keep put queue.
1197 @must_be_writable 1198 @synchronized 1199 def add_segment(self, blocks, pos, size): 1200 """Add a segment to the end of the file. 1201 1202 `pos` and `offset` reference a section of the stream described by 1203 `blocks` (a list of Range objects) 1204 1205 """ 1206 self._add_segment(blocks, pos, size)
Add a segment to the end of the file.
pos
and offset
reference a section of the stream described by
blocks
(a list of Range objects)
1216 @synchronized 1217 def size(self): 1218 """Get the file size.""" 1219 if self._segments: 1220 n = self._segments[-1] 1221 return n.range_start + n.range_size 1222 else: 1223 return 0
Get the file size.
1225 @synchronized 1226 def manifest_text(self, stream_name=".", portable_locators=False, 1227 normalize=False, only_committed=False): 1228 buf = "" 1229 filestream = [] 1230 for segment in self._segments: 1231 loc = segment.locator 1232 if self.parent._my_block_manager().is_bufferblock(loc): 1233 if only_committed: 1234 continue 1235 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1236 if portable_locators: 1237 loc = KeepLocator(loc).stripped() 1238 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1239 segment.segment_offset, segment.range_size)) 1240 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1241 buf += "\n" 1242 return buf
1255class ArvadosFileReader(ArvadosFileReaderBase): 1256 """Wraps ArvadosFile in a file-like object supporting reading only. 1257 1258 Be aware that this class is NOT thread safe as there is no locking around 1259 updating file pointer. 1260 1261 """ 1262 1263 def __init__(self, arvadosfile, mode="r", num_retries=None): 1264 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1265 self.arvadosfile = arvadosfile 1266 1267 def size(self): 1268 return self.arvadosfile.size() 1269 1270 def stream_name(self): 1271 return self.arvadosfile.parent.stream_name() 1272 1273 def readinto(self, b): 1274 data = self.read(len(b)) 1275 b[:len(data)] = data 1276 return len(data) 1277 1278 @_FileLikeObjectBase._before_close 1279 @retry_method 1280 def read(self, size=-1, num_retries=None, return_memoryview=False): 1281 """Read up to `size` bytes from the file and return the result. 1282 1283 Starts at the current file position. If `size` is negative or None, 1284 read the entire remainder of the file. 1285 1286 Returns None if the file pointer is at the end of the file. 1287 1288 Returns a `bytes` object, unless `return_memoryview` is True, 1289 in which case it returns a memory view, which may avoid an 1290 unnecessary data copy in some situations. 1291 1292 """ 1293 if size < 0 or size is None: 1294 data = [] 1295 # 1296 # specify exact=False, return_memoryview=True here so that we 1297 # only copy data once into the final buffer. 1298 # 1299 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1300 while rd: 1301 data.append(rd) 1302 self._filepos += len(rd) 1303 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1304 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1305 else: 1306 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1307 self._filepos += len(data) 1308 return data 1309 1310 @_FileLikeObjectBase._before_close 1311 @retry_method 1312 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1313 """Read up to `size` bytes from the stream, starting at the specified file offset. 1314 1315 This method does not change the file position. 1316 1317 Returns a `bytes` object, unless `return_memoryview` is True, 1318 in which case it returns a memory view, which may avoid an 1319 unnecessary data copy in some situations. 1320 1321 """ 1322 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview) 1323 1324 def flush(self): 1325 pass
Wraps ArvadosFile in a file-like object supporting reading only.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1278 @_FileLikeObjectBase._before_close 1279 @retry_method 1280 def read(self, size=-1, num_retries=None, return_memoryview=False): 1281 """Read up to `size` bytes from the file and return the result. 1282 1283 Starts at the current file position. If `size` is negative or None, 1284 read the entire remainder of the file. 1285 1286 Returns None if the file pointer is at the end of the file. 1287 1288 Returns a `bytes` object, unless `return_memoryview` is True, 1289 in which case it returns a memory view, which may avoid an 1290 unnecessary data copy in some situations. 1291 1292 """ 1293 if size < 0 or size is None: 1294 data = [] 1295 # 1296 # specify exact=False, return_memoryview=True here so that we 1297 # only copy data once into the final buffer. 1298 # 1299 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1300 while rd: 1301 data.append(rd) 1302 self._filepos += len(rd) 1303 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries, exact=False, return_memoryview=True) 1304 return memoryview(b''.join(data)) if return_memoryview else b''.join(data) 1305 else: 1306 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True, return_memoryview=return_memoryview) 1307 self._filepos += len(data) 1308 return data
Read up to size
bytes from the file and return the result.
Starts at the current file position. If size
is negative or None,
read the entire remainder of the file.
Returns None if the file pointer is at the end of the file.
Returns a bytes
object, unless return_memoryview
is True,
in which case it returns a memory view, which may avoid an
unnecessary data copy in some situations.
1310 @_FileLikeObjectBase._before_close 1311 @retry_method 1312 def readfrom(self, offset, size, num_retries=None, return_memoryview=False): 1313 """Read up to `size` bytes from the stream, starting at the specified file offset. 1314 1315 This method does not change the file position. 1316 1317 Returns a `bytes` object, unless `return_memoryview` is True, 1318 in which case it returns a memory view, which may avoid an 1319 unnecessary data copy in some situations. 1320 1321 """ 1322 return self.arvadosfile.readfrom(offset, size, num_retries, exact=True, return_memoryview=return_memoryview)
Read up to size
bytes from the stream, starting at the specified file offset.
This method does not change the file position.
Returns a bytes
object, unless return_memoryview
is True,
in which case it returns a memory view, which may avoid an
unnecessary data copy in some situations.
1328class ArvadosFileWriter(ArvadosFileReader): 1329 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1330 1331 Be aware that this class is NOT thread safe as there is no locking around 1332 updating file pointer. 1333 1334 """ 1335 1336 def __init__(self, arvadosfile, mode, num_retries=None): 1337 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1338 self.arvadosfile.add_writer(self) 1339 1340 def writable(self): 1341 return True 1342 1343 @_FileLikeObjectBase._before_close 1344 @retry_method 1345 def write(self, data, num_retries=None): 1346 if self.mode[0] == "a": 1347 self._filepos = self.size() 1348 self.arvadosfile.writeto(self._filepos, data, num_retries) 1349 self._filepos += len(data) 1350 return len(data) 1351 1352 @_FileLikeObjectBase._before_close 1353 @retry_method 1354 def writelines(self, seq, num_retries=None): 1355 for s in seq: 1356 self.write(s, num_retries=num_retries) 1357 1358 @_FileLikeObjectBase._before_close 1359 def truncate(self, size=None): 1360 if size is None: 1361 size = self._filepos 1362 self.arvadosfile.truncate(size) 1363 1364 @_FileLikeObjectBase._before_close 1365 def flush(self): 1366 self.arvadosfile.flush() 1367 1368 def close(self, flush=True): 1369 if not self.closed: 1370 self.arvadosfile.remove_writer(self, flush) 1371 super(ArvadosFileWriter, self).close()
Wraps ArvadosFile in a file-like object supporting both reading and writing.
Be aware that this class is NOT thread safe as there is no locking around updating file pointer.
1374class WrappableFile(object): 1375 """An interface to an Arvados file that's compatible with io wrappers. 1376 1377 """ 1378 def __init__(self, f): 1379 self.f = f 1380 self.closed = False 1381 def close(self): 1382 self.closed = True 1383 return self.f.close() 1384 def flush(self): 1385 return self.f.flush() 1386 def read(self, *args, **kwargs): 1387 return self.f.read(*args, **kwargs) 1388 def readable(self): 1389 return self.f.readable() 1390 def readinto(self, *args, **kwargs): 1391 return self.f.readinto(*args, **kwargs) 1392 def seek(self, *args, **kwargs): 1393 return self.f.seek(*args, **kwargs) 1394 def seekable(self): 1395 return self.f.seekable() 1396 def tell(self): 1397 return self.f.tell() 1398 def writable(self): 1399 return self.f.writable() 1400 def write(self, *args, **kwargs): 1401 return self.f.write(*args, **kwargs)
An interface to an Arvados file that’s compatible with io wrappers.