Package arvados :: Module arvfile
[hide private]
[frames] | no frames]

Source Code for Module arvados.arvfile

   1  # Copyright (C) The Arvados Authors. All rights reserved. 
   2  # 
   3  # SPDX-License-Identifier: Apache-2.0 
   4   
   5  from __future__ import absolute_import 
   6  from __future__ import division 
   7  from future import standard_library 
   8  from future.utils import listitems, listvalues 
   9  standard_library.install_aliases() 
  10  from builtins import range 
  11  from builtins import object 
  12  import bz2 
  13  import collections 
  14  import copy 
  15  import errno 
  16  import functools 
  17  import hashlib 
  18  import logging 
  19  import os 
  20  import queue 
  21  import re 
  22  import sys 
  23  import threading 
  24  import uuid 
  25  import zlib 
  26   
  27  from . import config 
  28  from .errors import KeepWriteError, AssertionError, ArgumentError 
  29  from .keep import KeepLocator 
  30  from ._normalize_stream import normalize_stream 
  31  from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange 
  32  from .retry import retry_method 
  33   
  34  MOD = "mod" 
  35  WRITE = "write" 
  36   
  37  _logger = logging.getLogger('arvados.arvfile') 
38 39 -def split(path):
40 """split(path) -> streamname, filename 41 42 Separate the stream name and file name in a /-separated stream path and 43 return a tuple (stream_name, file_name). If no stream name is available, 44 assume '.'. 45 46 """ 47 try: 48 stream_name, file_name = path.rsplit('/', 1) 49 except ValueError: # No / in string 50 stream_name, file_name = '.', path 51 return stream_name, file_name
52
53 54 -class UnownedBlockError(Exception):
55 """Raised when there's an writable block without an owner on the BlockManager.""" 56 pass
57
58 59 -class _FileLikeObjectBase(object):
60 - def __init__(self, name, mode):
61 self.name = name 62 self.mode = mode 63 self.closed = False
64 65 @staticmethod
66 - def _before_close(orig_func):
67 @functools.wraps(orig_func) 68 def before_close_wrapper(self, *args, **kwargs): 69 if self.closed: 70 raise ValueError("I/O operation on closed stream file") 71 return orig_func(self, *args, **kwargs)
72 return before_close_wrapper
73
74 - def __enter__(self):
75 return self
76
77 - def __exit__(self, exc_type, exc_value, traceback):
78 try: 79 self.close() 80 except Exception: 81 if exc_type is None: 82 raise
83
84 - def close(self):
85 self.closed = True
86
87 88 -class ArvadosFileReaderBase(_FileLikeObjectBase):
89 - def __init__(self, name, mode, num_retries=None):
90 super(ArvadosFileReaderBase, self).__init__(name, mode) 91 self._binary = 'b' in mode 92 if sys.version_info >= (3, 0) and not self._binary: 93 raise NotImplementedError("text mode {!r} is not implemented".format(mode)) 94 self._filepos = 0 95 self.num_retries = num_retries 96 self._readline_cache = (None, None)
97
98 - def __iter__(self):
99 while True: 100 data = self.readline() 101 if not data: 102 break 103 yield data
104
105 - def decompressed_name(self):
106 return re.sub('\.(bz2|gz)$', '', self.name)
107 108 @_FileLikeObjectBase._before_close
109 - def seek(self, pos, whence=os.SEEK_SET):
110 if whence == os.SEEK_CUR: 111 pos += self._filepos 112 elif whence == os.SEEK_END: 113 pos += self.size() 114 if pos < 0: 115 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.") 116 self._filepos = pos 117 return self._filepos
118
119 - def tell(self):
120 return self._filepos
121
122 - def readable(self):
123 return True
124
125 - def writable(self):
126 return False
127
128 - def seekable(self):
129 return True
130 131 @_FileLikeObjectBase._before_close 132 @retry_method
133 - def readall(self, size=2**20, num_retries=None):
134 while True: 135 data = self.read(size, num_retries=num_retries) 136 if len(data) == 0: 137 break 138 yield data
139 140 @_FileLikeObjectBase._before_close 141 @retry_method
142 - def readline(self, size=float('inf'), num_retries=None):
143 cache_pos, cache_data = self._readline_cache 144 if self.tell() == cache_pos: 145 data = [cache_data] 146 self._filepos += len(cache_data) 147 else: 148 data = [b''] 149 data_size = len(data[-1]) 150 while (data_size < size) and (b'\n' not in data[-1]): 151 next_read = self.read(2 ** 20, num_retries=num_retries) 152 if not next_read: 153 break 154 data.append(next_read) 155 data_size += len(next_read) 156 data = b''.join(data) 157 try: 158 nextline_index = data.index(b'\n') + 1 159 except ValueError: 160 nextline_index = len(data) 161 nextline_index = min(nextline_index, size) 162 self._filepos -= len(data) - nextline_index 163 self._readline_cache = (self.tell(), data[nextline_index:]) 164 return data[:nextline_index].decode()
165 166 @_FileLikeObjectBase._before_close 167 @retry_method
168 - def decompress(self, decompress, size, num_retries=None):
169 for segment in self.readall(size, num_retries=num_retries): 170 data = decompress(segment) 171 if data: 172 yield data
173 174 @_FileLikeObjectBase._before_close 175 @retry_method
176 - def readall_decompressed(self, size=2**20, num_retries=None):
177 self.seek(0) 178 if self.name.endswith('.bz2'): 179 dc = bz2.BZ2Decompressor() 180 return self.decompress(dc.decompress, size, 181 num_retries=num_retries) 182 elif self.name.endswith('.gz'): 183 dc = zlib.decompressobj(16+zlib.MAX_WBITS) 184 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), 185 size, num_retries=num_retries) 186 else: 187 return self.readall(size, num_retries=num_retries)
188 189 @_FileLikeObjectBase._before_close 190 @retry_method
191 - def readlines(self, sizehint=float('inf'), num_retries=None):
192 data = [] 193 data_size = 0 194 for s in self.readall(num_retries=num_retries): 195 data.append(s) 196 data_size += len(s) 197 if data_size >= sizehint: 198 break 199 return b''.join(data).decode().splitlines(True)
200
201 - def size(self):
202 raise IOError(errno.ENOSYS, "Not implemented")
203
204 - def read(self, size, num_retries=None):
205 raise IOError(errno.ENOSYS, "Not implemented")
206
207 - def readfrom(self, start, size, num_retries=None):
208 raise IOError(errno.ENOSYS, "Not implemented")
209
210 211 -class StreamFileReader(ArvadosFileReaderBase):
212 - class _NameAttribute(str):
213 # The Python file API provides a plain .name attribute. 214 # Older SDK provided a name() method. 215 # This class provides both, for maximum compatibility.
216 - def __call__(self):
217 return self
218
219 - def __init__(self, stream, segments, name):
220 super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries) 221 self._stream = stream 222 self.segments = segments
223
224 - def stream_name(self):
225 return self._stream.name()
226
227 - def size(self):
228 n = self.segments[-1] 229 return n.range_start + n.range_size
230 231 @_FileLikeObjectBase._before_close 232 @retry_method
233 - def read(self, size, num_retries=None):
234 """Read up to 'size' bytes from the stream, starting at the current file position""" 235 if size == 0: 236 return b'' 237 238 data = b'' 239 available_chunks = locators_and_ranges(self.segments, self._filepos, size) 240 if available_chunks: 241 lr = available_chunks[0] 242 data = self._stream.readfrom(lr.locator+lr.segment_offset, 243 lr.segment_size, 244 num_retries=num_retries) 245 246 self._filepos += len(data) 247 return data
248 249 @_FileLikeObjectBase._before_close 250 @retry_method
251 - def readfrom(self, start, size, num_retries=None):
252 """Read up to 'size' bytes from the stream, starting at 'start'""" 253 if size == 0: 254 return b'' 255 256 data = [] 257 for lr in locators_and_ranges(self.segments, start, size): 258 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size, 259 num_retries=num_retries)) 260 return b''.join(data)
261
262 - def as_manifest(self):
263 segs = [] 264 for r in self.segments: 265 segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size)) 266 return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
267
268 269 -def synchronized(orig_func):
270 @functools.wraps(orig_func) 271 def synchronized_wrapper(self, *args, **kwargs): 272 with self.lock: 273 return orig_func(self, *args, **kwargs)
274 return synchronized_wrapper 275
276 277 -class StateChangeError(Exception):
278 - def __init__(self, message, state, nextstate):
279 super(StateChangeError, self).__init__(message) 280 self.state = state 281 self.nextstate = nextstate
282
283 -class _BufferBlock(object):
284 """A stand-in for a Keep block that is in the process of being written. 285 286 Writers can append to it, get the size, and compute the Keep locator. 287 There are three valid states: 288 289 WRITABLE 290 Can append to block. 291 292 PENDING 293 Block is in the process of being uploaded to Keep, append is an error. 294 295 COMMITTED 296 The block has been written to Keep, its internal buffer has been 297 released, fetching the block will fetch it via keep client (since we 298 discarded the internal copy), and identifiers referring to the BufferBlock 299 can be replaced with the block locator. 300 301 """ 302 303 WRITABLE = 0 304 PENDING = 1 305 COMMITTED = 2 306 ERROR = 3 307 DELETED = 4 308
309 - def __init__(self, blockid, starting_capacity, owner):
310 """ 311 :blockid: 312 the identifier for this block 313 314 :starting_capacity: 315 the initial buffer capacity 316 317 :owner: 318 ArvadosFile that owns this block 319 320 """ 321 self.blockid = blockid 322 self.buffer_block = bytearray(starting_capacity) 323 self.buffer_view = memoryview(self.buffer_block) 324 self.write_pointer = 0 325 self._state = _BufferBlock.WRITABLE 326 self._locator = None 327 self.owner = owner 328 self.lock = threading.Lock() 329 self.wait_for_commit = threading.Event() 330 self.error = None
331 332 @synchronized
333 - def append(self, data):
334 """Append some data to the buffer. 335 336 Only valid if the block is in WRITABLE state. Implements an expanding 337 buffer, doubling capacity as needed to accomdate all the data. 338 339 """ 340 if self._state == _BufferBlock.WRITABLE: 341 if not isinstance(data, bytes) and not isinstance(data, memoryview): 342 data = data.encode() 343 while (self.write_pointer+len(data)) > len(self.buffer_block): 344 new_buffer_block = bytearray(len(self.buffer_block) * 2) 345 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer] 346 self.buffer_block = new_buffer_block 347 self.buffer_view = memoryview(self.buffer_block) 348 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data 349 self.write_pointer += len(data) 350 self._locator = None 351 else: 352 raise AssertionError("Buffer block is not writable")
353 354 STATE_TRANSITIONS = frozenset([ 355 (WRITABLE, PENDING), 356 (PENDING, COMMITTED), 357 (PENDING, ERROR), 358 (ERROR, PENDING)]) 359 360 @synchronized
361 - def set_state(self, nextstate, val=None):
362 if (self._state, nextstate) not in self.STATE_TRANSITIONS: 363 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate) 364 self._state = nextstate 365 366 if self._state == _BufferBlock.PENDING: 367 self.wait_for_commit.clear() 368 369 if self._state == _BufferBlock.COMMITTED: 370 self._locator = val 371 self.buffer_view = None 372 self.buffer_block = None 373 self.wait_for_commit.set() 374 375 if self._state == _BufferBlock.ERROR: 376 self.error = val 377 self.wait_for_commit.set()
378 379 @synchronized
380 - def state(self):
381 return self._state
382
383 - def size(self):
384 """The amount of data written to the buffer.""" 385 return self.write_pointer
386 387 @synchronized
388 - def locator(self):
389 """The Keep locator for this buffer's contents.""" 390 if self._locator is None: 391 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size()) 392 return self._locator
393 394 @synchronized
395 - def clone(self, new_blockid, owner):
396 if self._state == _BufferBlock.COMMITTED: 397 raise AssertionError("Cannot duplicate committed buffer block") 398 bufferblock = _BufferBlock(new_blockid, self.size(), owner) 399 bufferblock.append(self.buffer_view[0:self.size()]) 400 return bufferblock
401 402 @synchronized
403 - def clear(self):
404 self._state = _BufferBlock.DELETED 405 self.owner = None 406 self.buffer_block = None 407 self.buffer_view = None
408 409 @synchronized
410 - def repack_writes(self):
411 """Optimize buffer block by repacking segments in file sequence. 412 413 When the client makes random writes, they appear in the buffer block in 414 the sequence they were written rather than the sequence they appear in 415 the file. This makes for inefficient, fragmented manifests. Attempt 416 to optimize by repacking writes in file sequence. 417 418 """ 419 if self._state != _BufferBlock.WRITABLE: 420 raise AssertionError("Cannot repack non-writable block") 421 422 segs = self.owner.segments() 423 424 # Collect the segments that reference the buffer block. 425 bufferblock_segs = [s for s in segs if s.locator == self.blockid] 426 427 # Collect total data referenced by segments (could be smaller than 428 # bufferblock size if a portion of the file was written and 429 # then overwritten). 430 write_total = sum([s.range_size for s in bufferblock_segs]) 431 432 if write_total < self.size() or len(bufferblock_segs) > 1: 433 # If there's more than one segment referencing this block, it is 434 # due to out-of-order writes and will produce a fragmented 435 # manifest, so try to optimize by re-packing into a new buffer. 436 contents = self.buffer_view[0:self.write_pointer].tobytes() 437 new_bb = _BufferBlock(None, write_total, None) 438 for t in bufferblock_segs: 439 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size]) 440 t.segment_offset = new_bb.size() - t.range_size 441 442 self.buffer_block = new_bb.buffer_block 443 self.buffer_view = new_bb.buffer_view 444 self.write_pointer = new_bb.write_pointer 445 self._locator = None 446 new_bb.clear() 447 self.owner.set_segments(segs)
448
449 - def __repr__(self):
450 return "<BufferBlock %s>" % (self.blockid)
451
452 453 -class NoopLock(object):
454 - def __enter__(self):
455 return self
456
457 - def __exit__(self, exc_type, exc_value, traceback):
458 pass
459
460 - def acquire(self, blocking=False):
461 pass
462
463 - def release(self):
464 pass
465
466 467 -def must_be_writable(orig_func):
468 @functools.wraps(orig_func) 469 def must_be_writable_wrapper(self, *args, **kwargs): 470 if not self.writable(): 471 raise IOError(errno.EROFS, "Collection is read-only.") 472 return orig_func(self, *args, **kwargs)
473 return must_be_writable_wrapper 474
475 476 -class _BlockManager(object):
477 """BlockManager handles buffer blocks. 478 479 Also handles background block uploads, and background block prefetch for a 480 Collection of ArvadosFiles. 481 482 """ 483 484 DEFAULT_PUT_THREADS = 2 485 DEFAULT_GET_THREADS = 2 486
487 - def __init__(self, keep, copies=None, put_threads=None):
488 """keep: KeepClient object to use""" 489 self._keep = keep 490 self._bufferblocks = collections.OrderedDict() 491 self._put_queue = None 492 self._put_threads = None 493 self._prefetch_queue = None 494 self._prefetch_threads = None 495 self.lock = threading.Lock() 496 self.prefetch_enabled = True 497 if put_threads: 498 self.num_put_threads = put_threads 499 else: 500 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS 501 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS 502 self.copies = copies 503 self._pending_write_size = 0 504 self.threads_lock = threading.Lock() 505 self.padding_block = None
506 507 @synchronized
508 - def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
509 """Allocate a new, empty bufferblock in WRITABLE state and return it. 510 511 :blockid: 512 optional block identifier, otherwise one will be automatically assigned 513 514 :starting_capacity: 515 optional capacity, otherwise will use default capacity 516 517 :owner: 518 ArvadosFile that owns this block 519 520 """ 521 return self._alloc_bufferblock(blockid, starting_capacity, owner)
522
523 - def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
524 if blockid is None: 525 blockid = str(uuid.uuid4()) 526 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner) 527 self._bufferblocks[bufferblock.blockid] = bufferblock 528 return bufferblock
529 530 @synchronized
531 - def dup_block(self, block, owner):
532 """Create a new bufferblock initialized with the content of an existing bufferblock. 533 534 :block: 535 the buffer block to copy. 536 537 :owner: 538 ArvadosFile that owns the new block 539 540 """ 541 new_blockid = str(uuid.uuid4()) 542 bufferblock = block.clone(new_blockid, owner) 543 self._bufferblocks[bufferblock.blockid] = bufferblock 544 return bufferblock
545 546 @synchronized
547 - def is_bufferblock(self, locator):
548 return locator in self._bufferblocks
549
550 - def _commit_bufferblock_worker(self):
551 """Background uploader thread.""" 552 553 while True: 554 try: 555 bufferblock = self._put_queue.get() 556 if bufferblock is None: 557 return 558 559 if self.copies is None: 560 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) 561 else: 562 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) 563 bufferblock.set_state(_BufferBlock.COMMITTED, loc) 564 except Exception as e: 565 bufferblock.set_state(_BufferBlock.ERROR, e) 566 finally: 567 if self._put_queue is not None: 568 self._put_queue.task_done()
569
570 - def start_put_threads(self):
571 with self.threads_lock: 572 if self._put_threads is None: 573 # Start uploader threads. 574 575 # If we don't limit the Queue size, the upload queue can quickly 576 # grow to take up gigabytes of RAM if the writing process is 577 # generating data more quickly than it can be send to the Keep 578 # servers. 579 # 580 # With two upload threads and a queue size of 2, this means up to 4 581 # blocks pending. If they are full 64 MiB blocks, that means up to 582 # 256 MiB of internal buffering, which is the same size as the 583 # default download block cache in KeepClient. 584 self._put_queue = queue.Queue(maxsize=2) 585 586 self._put_threads = [] 587 for i in range(0, self.num_put_threads): 588 thread = threading.Thread(target=self._commit_bufferblock_worker) 589 self._put_threads.append(thread) 590 thread.daemon = True 591 thread.start()
592
593 - def _block_prefetch_worker(self):
594 """The background downloader thread.""" 595 while True: 596 try: 597 b = self._prefetch_queue.get() 598 if b is None: 599 return 600 self._keep.get(b) 601 except Exception: 602 _logger.exception("Exception doing block prefetch")
603 604 @synchronized
605 - def start_get_threads(self):
606 if self._prefetch_threads is None: 607 self._prefetch_queue = queue.Queue() 608 self._prefetch_threads = [] 609 for i in range(0, self.num_get_threads): 610 thread = threading.Thread(target=self._block_prefetch_worker) 611 self._prefetch_threads.append(thread) 612 thread.daemon = True 613 thread.start()
614 615 616 @synchronized
617 - def stop_threads(self):
618 """Shut down and wait for background upload and download threads to finish.""" 619 620 if self._put_threads is not None: 621 for t in self._put_threads: 622 self._put_queue.put(None) 623 for t in self._put_threads: 624 t.join() 625 self._put_threads = None 626 self._put_queue = None 627 628 if self._prefetch_threads is not None: 629 for t in self._prefetch_threads: 630 self._prefetch_queue.put(None) 631 for t in self._prefetch_threads: 632 t.join() 633 self._prefetch_threads = None 634 self._prefetch_queue = None
635
636 - def __enter__(self):
637 return self
638
639 - def __exit__(self, exc_type, exc_value, traceback):
640 self.stop_threads()
641 642 @synchronized
643 - def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
644 """Packs small blocks together before uploading""" 645 646 self._pending_write_size += closed_file_size 647 648 # Check if there are enough small blocks for filling up one in full 649 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)): 650 return 651 652 # Search blocks ready for getting packed together before being 653 # committed to Keep. 654 # A WRITABLE block always has an owner. 655 # A WRITABLE block with its owner.closed() implies that its 656 # size is <= KEEP_BLOCK_SIZE/2. 657 try: 658 small_blocks = [b for b in listvalues(self._bufferblocks) 659 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()] 660 except AttributeError: 661 # Writable blocks without owner shouldn't exist. 662 raise UnownedBlockError() 663 664 if len(small_blocks) <= 1: 665 # Not enough small blocks for repacking 666 return 667 668 for bb in small_blocks: 669 bb.repack_writes() 670 671 # Update the pending write size count with its true value, just in case 672 # some small file was opened, written and closed several times. 673 self._pending_write_size = sum([b.size() for b in small_blocks]) 674 675 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force: 676 return 677 678 new_bb = self._alloc_bufferblock() 679 new_bb.owner = [] 680 files = [] 681 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE: 682 bb = small_blocks.pop(0) 683 new_bb.owner.append(bb.owner) 684 self._pending_write_size -= bb.size() 685 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes()) 686 files.append((bb, new_bb.write_pointer - bb.size())) 687 688 self.commit_bufferblock(new_bb, sync=sync) 689 690 for bb, new_bb_segment_offset in files: 691 newsegs = bb.owner.segments() 692 for s in newsegs: 693 if s.locator == bb.blockid: 694 s.locator = new_bb.blockid 695 s.segment_offset = new_bb_segment_offset+s.segment_offset 696 bb.owner.set_segments(newsegs) 697 self._delete_bufferblock(bb.blockid)
698
699 - def commit_bufferblock(self, block, sync):
700 """Initiate a background upload of a bufferblock. 701 702 :block: 703 The block object to upload 704 705 :sync: 706 If `sync` is True, upload the block synchronously. 707 If `sync` is False, upload the block asynchronously. This will 708 return immediately unless the upload queue is at capacity, in 709 which case it will wait on an upload queue slot. 710 711 """ 712 try: 713 # Mark the block as PENDING so to disallow any more appends. 714 block.set_state(_BufferBlock.PENDING) 715 except StateChangeError as e: 716 if e.state == _BufferBlock.PENDING: 717 if sync: 718 block.wait_for_commit.wait() 719 else: 720 return 721 if block.state() == _BufferBlock.COMMITTED: 722 return 723 elif block.state() == _BufferBlock.ERROR: 724 raise block.error 725 else: 726 raise 727 728 if sync: 729 try: 730 if self.copies is None: 731 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) 732 else: 733 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) 734 block.set_state(_BufferBlock.COMMITTED, loc) 735 except Exception as e: 736 block.set_state(_BufferBlock.ERROR, e) 737 raise 738 else: 739 self.start_put_threads() 740 self._put_queue.put(block)
741 742 @synchronized
743 - def get_bufferblock(self, locator):
744 return self._bufferblocks.get(locator)
745 746 @synchronized
747 - def get_padding_block(self):
748 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding 749 when using truncate() to extend the size of a file. 750 751 For reference (and possible future optimization), the md5sum of the 752 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864 753 754 """ 755 756 if self.padding_block is None: 757 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE) 758 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE 759 self.commit_bufferblock(self.padding_block, False) 760 return self.padding_block
761 762 @synchronized
763 - def delete_bufferblock(self, locator):
765
766 - def _delete_bufferblock(self, locator):
767 bb = self._bufferblocks[locator] 768 bb.clear() 769 del self._bufferblocks[locator]
770
771 - def get_block_contents(self, locator, num_retries, cache_only=False):
772 """Fetch a block. 773 774 First checks to see if the locator is a BufferBlock and return that, if 775 not, passes the request through to KeepClient.get(). 776 777 """ 778 with self.lock: 779 if locator in self._bufferblocks: 780 bufferblock = self._bufferblocks[locator] 781 if bufferblock.state() != _BufferBlock.COMMITTED: 782 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes() 783 else: 784 locator = bufferblock._locator 785 if cache_only: 786 return self._keep.get_from_cache(locator) 787 else: 788 return self._keep.get(locator, num_retries=num_retries)
789
790 - def commit_all(self):
791 """Commit all outstanding buffer blocks. 792 793 This is a synchronous call, and will not return until all buffer blocks 794 are uploaded. Raises KeepWriteError() if any blocks failed to upload. 795 796 """ 797 self.repack_small_blocks(force=True, sync=True) 798 799 with self.lock: 800 items = listitems(self._bufferblocks) 801 802 for k,v in items: 803 if v.state() != _BufferBlock.COMMITTED and v.owner: 804 # Ignore blocks with a list of owners, as if they're not in COMMITTED 805 # state, they're already being committed asynchronously. 806 if isinstance(v.owner, ArvadosFile): 807 v.owner.flush(sync=False) 808 809 with self.lock: 810 if self._put_queue is not None: 811 self._put_queue.join() 812 813 err = [] 814 for k,v in items: 815 if v.state() == _BufferBlock.ERROR: 816 err.append((v.locator(), v.error)) 817 if err: 818 raise KeepWriteError("Error writing some blocks", err, label="block") 819 820 for k,v in items: 821 # flush again with sync=True to remove committed bufferblocks from 822 # the segments. 823 if v.owner: 824 if isinstance(v.owner, ArvadosFile): 825 v.owner.flush(sync=True) 826 elif isinstance(v.owner, list) and len(v.owner) > 0: 827 # This bufferblock is referenced by many files as a result 828 # of repacking small blocks, so don't delete it when flushing 829 # its owners, just do it after flushing them all. 830 for owner in v.owner: 831 owner.flush(sync=True) 832 self.delete_bufferblock(k)
833
834 - def block_prefetch(self, locator):
835 """Initiate a background download of a block. 836 837 This assumes that the underlying KeepClient implements a block cache, 838 so repeated requests for the same block will not result in repeated 839 downloads (unless the block is evicted from the cache.) This method 840 does not block. 841 842 """ 843 844 if not self.prefetch_enabled: 845 return 846 847 if self._keep.get_from_cache(locator) is not None: 848 return 849 850 with self.lock: 851 if locator in self._bufferblocks: 852 return 853 854 self.start_get_threads() 855 self._prefetch_queue.put(locator)
856
857 858 -class ArvadosFile(object):
859 """Represent a file in a Collection. 860 861 ArvadosFile manages the underlying representation of a file in Keep as a 862 sequence of segments spanning a set of blocks, and implements random 863 read/write access. 864 865 This object may be accessed from multiple threads. 866 867 """ 868 869 __slots__ = ('parent', 'name', '_writers', '_committed', 870 '_segments', 'lock', '_current_bblock', 'fuse_entry') 871
872 - def __init__(self, parent, name, stream=[], segments=[]):
873 """ 874 ArvadosFile constructor. 875 876 :stream: 877 a list of Range objects representing a block stream 878 879 :segments: 880 a list of Range objects representing segments 881 """ 882 self.parent = parent 883 self.name = name 884 self._writers = set() 885 self._committed = False 886 self._segments = [] 887 self.lock = parent.root_collection().lock 888 for s in segments: 889 self._add_segment(stream, s.locator, s.range_size) 890 self._current_bblock = None
891
892 - def writable(self):
893 return self.parent.writable()
894 895 @synchronized
896 - def permission_expired(self, as_of_dt=None):
897 """Returns True if any of the segment's locators is expired""" 898 for r in self._segments: 899 if KeepLocator(r.locator).permission_expired(as_of_dt): 900 return True 901 return False
902 903 @synchronized
904 - def segments(self):
905 return copy.copy(self._segments)
906 907 @synchronized
908 - def clone(self, new_parent, new_name):
909 """Make a copy of this file.""" 910 cp = ArvadosFile(new_parent, new_name) 911 cp.replace_contents(self) 912 return cp
913 914 @must_be_writable 915 @synchronized
916 - def replace_contents(self, other):
917 """Replace segments of this file with segments from another `ArvadosFile` object.""" 918 919 map_loc = {} 920 self._segments = [] 921 for other_segment in other.segments(): 922 new_loc = other_segment.locator 923 if other.parent._my_block_manager().is_bufferblock(other_segment.locator): 924 if other_segment.locator not in map_loc: 925 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator) 926 if bufferblock.state() != _BufferBlock.WRITABLE: 927 map_loc[other_segment.locator] = bufferblock.locator() 928 else: 929 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid 930 new_loc = map_loc[other_segment.locator] 931 932 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset)) 933 934 self.set_committed(False)
935
936 - def __eq__(self, other):
937 if other is self: 938 return True 939 if not isinstance(other, ArvadosFile): 940 return False 941 942 othersegs = other.segments() 943 with self.lock: 944 if len(self._segments) != len(othersegs): 945 return False 946 for i in range(0, len(othersegs)): 947 seg1 = self._segments[i] 948 seg2 = othersegs[i] 949 loc1 = seg1.locator 950 loc2 = seg2.locator 951 952 if self.parent._my_block_manager().is_bufferblock(loc1): 953 loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator() 954 955 if other.parent._my_block_manager().is_bufferblock(loc2): 956 loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator() 957 958 if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or 959 seg1.range_start != seg2.range_start or 960 seg1.range_size != seg2.range_size or 961 seg1.segment_offset != seg2.segment_offset): 962 return False 963 964 return True
965
966 - def __ne__(self, other):
967 return not self.__eq__(other)
968 969 @synchronized
970 - def set_segments(self, segs):
971 self._segments = segs
972 973 @synchronized
974 - def set_committed(self, value=True):
975 """Set committed flag. 976 977 If value is True, set committed to be True. 978 979 If value is False, set committed to be False for this and all parents. 980 """ 981 if value == self._committed: 982 return 983 self._committed = value 984 if self._committed is False and self.parent is not None: 985 self.parent.set_committed(False)
986 987 @synchronized
988 - def committed(self):
989 """Get whether this is committed or not.""" 990 return self._committed
991 992 @synchronized
993 - def add_writer(self, writer):
994 """Add an ArvadosFileWriter reference to the list of writers""" 995 if isinstance(writer, ArvadosFileWriter): 996 self._writers.add(writer)
997 998 @synchronized
999 - def remove_writer(self, writer, flush):
1000 """ 1001 Called from ArvadosFileWriter.close(). Remove a writer reference from the list 1002 and do some block maintenance tasks. 1003 """ 1004 self._writers.remove(writer) 1005 1006 if flush or self.size() > config.KEEP_BLOCK_SIZE // 2: 1007 # File writer closed, not small enough for repacking 1008 self.flush() 1009 elif self.closed(): 1010 # All writers closed and size is adequate for repacking 1011 self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
1012
1013 - def closed(self):
1014 """ 1015 Get whether this is closed or not. When the writers list is empty, the file 1016 is supposed to be closed. 1017 """ 1018 return len(self._writers) == 0
1019 1020 @must_be_writable 1021 @synchronized
1022 - def truncate(self, size):
1023 """Shrink or expand the size of the file. 1024 1025 If `size` is less than the size of the file, the file contents after 1026 `size` will be discarded. If `size` is greater than the current size 1027 of the file, it will be filled with zero bytes. 1028 1029 """ 1030 if size < self.size(): 1031 new_segs = [] 1032 for r in self._segments: 1033 range_end = r.range_start+r.range_size 1034 if r.range_start >= size: 1035 # segment is past the trucate size, all done 1036 break 1037 elif size < range_end: 1038 nr = Range(r.locator, r.range_start, size - r.range_start, 0) 1039 nr.segment_offset = r.segment_offset 1040 new_segs.append(nr) 1041 break 1042 else: 1043 new_segs.append(r) 1044 1045 self._segments = new_segs 1046 self.set_committed(False) 1047 elif size > self.size(): 1048 padding = self.parent._my_block_manager().get_padding_block() 1049 diff = size - self.size() 1050 while diff > config.KEEP_BLOCK_SIZE: 1051 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0)) 1052 diff -= config.KEEP_BLOCK_SIZE 1053 if diff > 0: 1054 self._segments.append(Range(padding.blockid, self.size(), diff, 0)) 1055 self.set_committed(False) 1056 else: 1057 # size == self.size() 1058 pass
1059
1060 - def readfrom(self, offset, size, num_retries, exact=False):
1061 """Read up to `size` bytes from the file starting at `offset`. 1062 1063 :exact: 1064 If False (default), return less data than requested if the read 1065 crosses a block boundary and the next block isn't cached. If True, 1066 only return less data than requested when hitting EOF. 1067 """ 1068 1069 with self.lock: 1070 if size == 0 or offset >= self.size(): 1071 return b'' 1072 readsegs = locators_and_ranges(self._segments, offset, size) 1073 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32) 1074 1075 locs = set() 1076 data = [] 1077 for lr in readsegs: 1078 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact)) 1079 if block: 1080 blockview = memoryview(block) 1081 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes()) 1082 locs.add(lr.locator) 1083 else: 1084 break 1085 1086 for lr in prefetch: 1087 if lr.locator not in locs: 1088 self.parent._my_block_manager().block_prefetch(lr.locator) 1089 locs.add(lr.locator) 1090 1091 return b''.join(data)
1092 1093 @must_be_writable 1094 @synchronized
1095 - def writeto(self, offset, data, num_retries):
1096 """Write `data` to the file starting at `offset`. 1097 1098 This will update existing bytes and/or extend the size of the file as 1099 necessary. 1100 1101 """ 1102 if not isinstance(data, bytes) and not isinstance(data, memoryview): 1103 data = data.encode() 1104 if len(data) == 0: 1105 return 1106 1107 if offset > self.size(): 1108 self.truncate(offset) 1109 1110 if len(data) > config.KEEP_BLOCK_SIZE: 1111 # Chunk it up into smaller writes 1112 n = 0 1113 dataview = memoryview(data) 1114 while n < len(data): 1115 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries) 1116 n += config.KEEP_BLOCK_SIZE 1117 return 1118 1119 self.set_committed(False) 1120 1121 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE: 1122 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1123 1124 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1125 self._current_bblock.repack_writes() 1126 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: 1127 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False) 1128 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) 1129 1130 self._current_bblock.append(data) 1131 1132 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data)) 1133 1134 self.parent.notify(WRITE, self.parent, self.name, (self, self)) 1135 1136 return len(data)
1137 1138 @synchronized
1139 - def flush(self, sync=True, num_retries=0):
1140 """Flush the current bufferblock to Keep. 1141 1142 :sync: 1143 If True, commit block synchronously, wait until buffer block has been written. 1144 If False, commit block asynchronously, return immediately after putting block into 1145 the keep put queue. 1146 """ 1147 if self.committed(): 1148 return 1149 1150 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED: 1151 if self._current_bblock.state() == _BufferBlock.WRITABLE: 1152 self._current_bblock.repack_writes() 1153 if self._current_bblock.state() != _BufferBlock.DELETED: 1154 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync) 1155 1156 if sync: 1157 to_delete = set() 1158 for s in self._segments: 1159 bb = self.parent._my_block_manager().get_bufferblock(s.locator) 1160 if bb: 1161 if bb.state() != _BufferBlock.COMMITTED: 1162 self.parent._my_block_manager().commit_bufferblock(bb, sync=True) 1163 to_delete.add(s.locator) 1164 s.locator = bb.locator() 1165 for s in to_delete: 1166 # Don't delete the bufferblock if it's owned by many files. It'll be 1167 # deleted after all of its owners are flush()ed. 1168 if self.parent._my_block_manager().get_bufferblock(s).owner is self: 1169 self.parent._my_block_manager().delete_bufferblock(s) 1170 1171 self.parent.notify(MOD, self.parent, self.name, (self, self))
1172 1173 @must_be_writable 1174 @synchronized
1175 - def add_segment(self, blocks, pos, size):
1176 """Add a segment to the end of the file. 1177 1178 `pos` and `offset` reference a section of the stream described by 1179 `blocks` (a list of Range objects) 1180 1181 """ 1182 self._add_segment(blocks, pos, size)
1183
1184 - def _add_segment(self, blocks, pos, size):
1185 """Internal implementation of add_segment.""" 1186 self.set_committed(False) 1187 for lr in locators_and_ranges(blocks, pos, size): 1188 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) 1189 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) 1190 self._segments.append(r)
1191 1192 @synchronized
1193 - def size(self):
1194 """Get the file size.""" 1195 if self._segments: 1196 n = self._segments[-1] 1197 return n.range_start + n.range_size 1198 else: 1199 return 0
1200 1201 @synchronized
1202 - def manifest_text(self, stream_name=".", portable_locators=False, 1203 normalize=False, only_committed=False):
1204 buf = "" 1205 filestream = [] 1206 for segment in self._segments: 1207 loc = segment.locator 1208 if self.parent._my_block_manager().is_bufferblock(loc): 1209 if only_committed: 1210 continue 1211 loc = self.parent._my_block_manager().get_bufferblock(loc).locator() 1212 if portable_locators: 1213 loc = KeepLocator(loc).stripped() 1214 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1215 segment.segment_offset, segment.range_size)) 1216 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream})) 1217 buf += "\n" 1218 return buf
1219 1220 @must_be_writable 1221 @synchronized
1222 - def _reparent(self, newparent, newname):
1223 self.set_committed(False) 1224 self.flush(sync=True) 1225 self.parent.remove(self.name) 1226 self.parent = newparent 1227 self.name = newname 1228 self.lock = self.parent.root_collection().lock
1229
1230 1231 -class ArvadosFileReader(ArvadosFileReaderBase):
1232 """Wraps ArvadosFile in a file-like object supporting reading only. 1233 1234 Be aware that this class is NOT thread safe as there is no locking around 1235 updating file pointer. 1236 1237 """ 1238
1239 - def __init__(self, arvadosfile, mode="r", num_retries=None):
1240 super(ArvadosFileReader, self).__init__(arvadosfile.name, mode=mode, num_retries=num_retries) 1241 self.arvadosfile = arvadosfile
1242
1243 - def size(self):
1244 return self.arvadosfile.size()
1245
1246 - def stream_name(self):
1247 return self.arvadosfile.parent.stream_name()
1248 1249 @_FileLikeObjectBase._before_close 1250 @retry_method
1251 - def read(self, size=None, num_retries=None):
1252 """Read up to `size` bytes from the file and return the result. 1253 1254 Starts at the current file position. If `size` is None, read the 1255 entire remainder of the file. 1256 """ 1257 if size is None: 1258 data = [] 1259 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1260 while rd: 1261 data.append(rd) 1262 self._filepos += len(rd) 1263 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries) 1264 return b''.join(data) 1265 else: 1266 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True) 1267 self._filepos += len(data) 1268 return data
1269 1270 @_FileLikeObjectBase._before_close 1271 @retry_method
1272 - def readfrom(self, offset, size, num_retries=None):
1273 """Read up to `size` bytes from the stream, starting at the specified file offset. 1274 1275 This method does not change the file position. 1276 """ 1277 return self.arvadosfile.readfrom(offset, size, num_retries)
1278
1279 - def flush(self):
1280 pass
1281
1282 1283 -class ArvadosFileWriter(ArvadosFileReader):
1284 """Wraps ArvadosFile in a file-like object supporting both reading and writing. 1285 1286 Be aware that this class is NOT thread safe as there is no locking around 1287 updating file pointer. 1288 1289 """ 1290
1291 - def __init__(self, arvadosfile, mode, num_retries=None):
1292 super(ArvadosFileWriter, self).__init__(arvadosfile, mode=mode, num_retries=num_retries) 1293 self.arvadosfile.add_writer(self)
1294
1295 - def writable(self):
1296 return True
1297 1298 @_FileLikeObjectBase._before_close 1299 @retry_method
1300 - def write(self, data, num_retries=None):
1301 if self.mode[0] == "a": 1302 self._filepos = self.size() 1303 self.arvadosfile.writeto(self._filepos, data, num_retries) 1304 self._filepos += len(data) 1305 return len(data)
1306 1307 @_FileLikeObjectBase._before_close 1308 @retry_method
1309 - def writelines(self, seq, num_retries=None):
1310 for s in seq: 1311 self.write(s, num_retries=num_retries)
1312 1313 @_FileLikeObjectBase._before_close
1314 - def truncate(self, size=None):
1315 if size is None: 1316 size = self._filepos 1317 self.arvadosfile.truncate(size)
1318 1319 @_FileLikeObjectBase._before_close
1320 - def flush(self):
1321 self.arvadosfile.flush()
1322
1323 - def close(self, flush=True):
1324 if not self.closed: 1325 self.arvadosfile.remove_writer(self, flush) 1326 super(ArvadosFileWriter, self).close()
1327