1
2
3
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import listitems, listvalues
9 standard_library.install_aliases()
10 from builtins import range
11 from builtins import object
12 import bz2
13 import collections
14 import copy
15 import errno
16 import functools
17 import hashlib
18 import logging
19 import os
20 import queue
21 import re
22 import sys
23 import threading
24 import uuid
25 import zlib
26
27 from . import config
28 from .errors import KeepWriteError, AssertionError, ArgumentError
29 from .keep import KeepLocator
30 from ._normalize_stream import normalize_stream
31 from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
32 from .retry import retry_method
33
34 MOD = "mod"
35 WRITE = "write"
36
37 _logger = logging.getLogger('arvados.arvfile')
40 """split(path) -> streamname, filename
41
42 Separate the stream name and file name in a /-separated stream path and
43 return a tuple (stream_name, file_name). If no stream name is available,
44 assume '.'.
45
46 """
47 try:
48 stream_name, file_name = path.rsplit('/', 1)
49 except ValueError:
50 stream_name, file_name = '.', path
51 return stream_name, file_name
52
55 """Raised when there's an writable block without an owner on the BlockManager."""
56 pass
57
64
65 @staticmethod
67 @functools.wraps(orig_func)
68 def before_close_wrapper(self, *args, **kwargs):
69 if self.closed:
70 raise ValueError("I/O operation on closed stream file")
71 return orig_func(self, *args, **kwargs)
72 return before_close_wrapper
73
76
77 - def __exit__(self, exc_type, exc_value, traceback):
78 try:
79 self.close()
80 except Exception:
81 if exc_type is None:
82 raise
83
86
89 - def __init__(self, name, mode, num_retries=None):
90 super(ArvadosFileReaderBase, self).__init__(name, mode)
91 self._binary = 'b' in mode
92 if sys.version_info >= (3, 0) and not self._binary:
93 raise NotImplementedError("text mode {!r} is not implemented".format(mode))
94 self._filepos = 0
95 self.num_retries = num_retries
96 self._readline_cache = (None, None)
97
99 while True:
100 data = self.readline()
101 if not data:
102 break
103 yield data
104
106 return re.sub('\.(bz2|gz)$', '', self.name)
107
108 @_FileLikeObjectBase._before_close
109 - def seek(self, pos, whence=os.SEEK_SET):
110 if whence == os.SEEK_CUR:
111 pos += self._filepos
112 elif whence == os.SEEK_END:
113 pos += self.size()
114 if pos < 0:
115 raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
116 self._filepos = pos
117 return self._filepos
118
121
124
127
130
131 @_FileLikeObjectBase._before_close
132 @retry_method
133 - def readall(self, size=2**20, num_retries=None):
134 while True:
135 data = self.read(size, num_retries=num_retries)
136 if len(data) == 0:
137 break
138 yield data
139
140 @_FileLikeObjectBase._before_close
141 @retry_method
142 - def readline(self, size=float('inf'), num_retries=None):
143 cache_pos, cache_data = self._readline_cache
144 if self.tell() == cache_pos:
145 data = [cache_data]
146 self._filepos += len(cache_data)
147 else:
148 data = [b'']
149 data_size = len(data[-1])
150 while (data_size < size) and (b'\n' not in data[-1]):
151 next_read = self.read(2 ** 20, num_retries=num_retries)
152 if not next_read:
153 break
154 data.append(next_read)
155 data_size += len(next_read)
156 data = b''.join(data)
157 try:
158 nextline_index = data.index(b'\n') + 1
159 except ValueError:
160 nextline_index = len(data)
161 nextline_index = min(nextline_index, size)
162 self._filepos -= len(data) - nextline_index
163 self._readline_cache = (self.tell(), data[nextline_index:])
164 return data[:nextline_index].decode()
165
166 @_FileLikeObjectBase._before_close
167 @retry_method
168 - def decompress(self, decompress, size, num_retries=None):
169 for segment in self.readall(size, num_retries=num_retries):
170 data = decompress(segment)
171 if data:
172 yield data
173
174 @_FileLikeObjectBase._before_close
175 @retry_method
177 self.seek(0)
178 if self.name.endswith('.bz2'):
179 dc = bz2.BZ2Decompressor()
180 return self.decompress(dc.decompress, size,
181 num_retries=num_retries)
182 elif self.name.endswith('.gz'):
183 dc = zlib.decompressobj(16+zlib.MAX_WBITS)
184 return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
185 size, num_retries=num_retries)
186 else:
187 return self.readall(size, num_retries=num_retries)
188
189 @_FileLikeObjectBase._before_close
190 @retry_method
191 - def readlines(self, sizehint=float('inf'), num_retries=None):
192 data = []
193 data_size = 0
194 for s in self.readall(num_retries=num_retries):
195 data.append(s)
196 data_size += len(s)
197 if data_size >= sizehint:
198 break
199 return b''.join(data).decode().splitlines(True)
200
202 raise IOError(errno.ENOSYS, "Not implemented")
203
204 - def read(self, size, num_retries=None):
205 raise IOError(errno.ENOSYS, "Not implemented")
206
207 - def readfrom(self, start, size, num_retries=None):
208 raise IOError(errno.ENOSYS, "Not implemented")
209
218
219 - def __init__(self, stream, segments, name):
223
225 return self._stream.name()
226
228 n = self.segments[-1]
229 return n.range_start + n.range_size
230
231 @_FileLikeObjectBase._before_close
232 @retry_method
233 - def read(self, size, num_retries=None):
234 """Read up to 'size' bytes from the stream, starting at the current file position"""
235 if size == 0:
236 return b''
237
238 data = b''
239 available_chunks = locators_and_ranges(self.segments, self._filepos, size)
240 if available_chunks:
241 lr = available_chunks[0]
242 data = self._stream.readfrom(lr.locator+lr.segment_offset,
243 lr.segment_size,
244 num_retries=num_retries)
245
246 self._filepos += len(data)
247 return data
248
249 @_FileLikeObjectBase._before_close
250 @retry_method
251 - def readfrom(self, start, size, num_retries=None):
252 """Read up to 'size' bytes from the stream, starting at 'start'"""
253 if size == 0:
254 return b''
255
256 data = []
257 for lr in locators_and_ranges(self.segments, start, size):
258 data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
259 num_retries=num_retries))
260 return b''.join(data)
261
267
270 @functools.wraps(orig_func)
271 def synchronized_wrapper(self, *args, **kwargs):
272 with self.lock:
273 return orig_func(self, *args, **kwargs)
274 return synchronized_wrapper
275
278 - def __init__(self, message, state, nextstate):
282
284 """A stand-in for a Keep block that is in the process of being written.
285
286 Writers can append to it, get the size, and compute the Keep locator.
287 There are three valid states:
288
289 WRITABLE
290 Can append to block.
291
292 PENDING
293 Block is in the process of being uploaded to Keep, append is an error.
294
295 COMMITTED
296 The block has been written to Keep, its internal buffer has been
297 released, fetching the block will fetch it via keep client (since we
298 discarded the internal copy), and identifiers referring to the BufferBlock
299 can be replaced with the block locator.
300
301 """
302
303 WRITABLE = 0
304 PENDING = 1
305 COMMITTED = 2
306 ERROR = 3
307 DELETED = 4
308
309 - def __init__(self, blockid, starting_capacity, owner):
310 """
311 :blockid:
312 the identifier for this block
313
314 :starting_capacity:
315 the initial buffer capacity
316
317 :owner:
318 ArvadosFile that owns this block
319
320 """
321 self.blockid = blockid
322 self.buffer_block = bytearray(starting_capacity)
323 self.buffer_view = memoryview(self.buffer_block)
324 self.write_pointer = 0
325 self._state = _BufferBlock.WRITABLE
326 self._locator = None
327 self.owner = owner
328 self.lock = threading.Lock()
329 self.wait_for_commit = threading.Event()
330 self.error = None
331
332 @synchronized
334 """Append some data to the buffer.
335
336 Only valid if the block is in WRITABLE state. Implements an expanding
337 buffer, doubling capacity as needed to accomdate all the data.
338
339 """
340 if self._state == _BufferBlock.WRITABLE:
341 if not isinstance(data, bytes) and not isinstance(data, memoryview):
342 data = data.encode()
343 while (self.write_pointer+len(data)) > len(self.buffer_block):
344 new_buffer_block = bytearray(len(self.buffer_block) * 2)
345 new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
346 self.buffer_block = new_buffer_block
347 self.buffer_view = memoryview(self.buffer_block)
348 self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
349 self.write_pointer += len(data)
350 self._locator = None
351 else:
352 raise AssertionError("Buffer block is not writable")
353
354 STATE_TRANSITIONS = frozenset([
355 (WRITABLE, PENDING),
356 (PENDING, COMMITTED),
357 (PENDING, ERROR),
358 (ERROR, PENDING)])
359
360 @synchronized
362 if (self._state, nextstate) not in self.STATE_TRANSITIONS:
363 raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
364 self._state = nextstate
365
366 if self._state == _BufferBlock.PENDING:
367 self.wait_for_commit.clear()
368
369 if self._state == _BufferBlock.COMMITTED:
370 self._locator = val
371 self.buffer_view = None
372 self.buffer_block = None
373 self.wait_for_commit.set()
374
375 if self._state == _BufferBlock.ERROR:
376 self.error = val
377 self.wait_for_commit.set()
378
379 @synchronized
382
384 """The amount of data written to the buffer."""
385 return self.write_pointer
386
387 @synchronized
389 """The Keep locator for this buffer's contents."""
390 if self._locator is None:
391 self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
392 return self._locator
393
394 @synchronized
395 - def clone(self, new_blockid, owner):
401
402 @synchronized
404 self._state = _BufferBlock.DELETED
405 self.owner = None
406 self.buffer_block = None
407 self.buffer_view = None
408
409 @synchronized
411 """Optimize buffer block by repacking segments in file sequence.
412
413 When the client makes random writes, they appear in the buffer block in
414 the sequence they were written rather than the sequence they appear in
415 the file. This makes for inefficient, fragmented manifests. Attempt
416 to optimize by repacking writes in file sequence.
417
418 """
419 if self._state != _BufferBlock.WRITABLE:
420 raise AssertionError("Cannot repack non-writable block")
421
422 segs = self.owner.segments()
423
424
425 bufferblock_segs = [s for s in segs if s.locator == self.blockid]
426
427
428
429
430 write_total = sum([s.range_size for s in bufferblock_segs])
431
432 if write_total < self.size() or len(bufferblock_segs) > 1:
433
434
435
436 contents = self.buffer_view[0:self.write_pointer].tobytes()
437 new_bb = _BufferBlock(None, write_total, None)
438 for t in bufferblock_segs:
439 new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
440 t.segment_offset = new_bb.size() - t.range_size
441
442 self.buffer_block = new_bb.buffer_block
443 self.buffer_view = new_bb.buffer_view
444 self.write_pointer = new_bb.write_pointer
445 self._locator = None
446 new_bb.clear()
447 self.owner.set_segments(segs)
448
450 return "<BufferBlock %s>" % (self.blockid)
451
456
457 - def __exit__(self, exc_type, exc_value, traceback):
459
460 - def acquire(self, blocking=False):
462
465
468 @functools.wraps(orig_func)
469 def must_be_writable_wrapper(self, *args, **kwargs):
470 if not self.writable():
471 raise IOError(errno.EROFS, "Collection is read-only.")
472 return orig_func(self, *args, **kwargs)
473 return must_be_writable_wrapper
474
477 """BlockManager handles buffer blocks.
478
479 Also handles background block uploads, and background block prefetch for a
480 Collection of ArvadosFiles.
481
482 """
483
484 DEFAULT_PUT_THREADS = 2
485 DEFAULT_GET_THREADS = 2
486
487 - def __init__(self, keep, copies=None, put_threads=None):
488 """keep: KeepClient object to use"""
489 self._keep = keep
490 self._bufferblocks = collections.OrderedDict()
491 self._put_queue = None
492 self._put_threads = None
493 self._prefetch_queue = None
494 self._prefetch_threads = None
495 self.lock = threading.Lock()
496 self.prefetch_enabled = True
497 if put_threads:
498 self.num_put_threads = put_threads
499 else:
500 self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
501 self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
502 self.copies = copies
503 self._pending_write_size = 0
504 self.threads_lock = threading.Lock()
505 self.padding_block = None
506
507 @synchronized
509 """Allocate a new, empty bufferblock in WRITABLE state and return it.
510
511 :blockid:
512 optional block identifier, otherwise one will be automatically assigned
513
514 :starting_capacity:
515 optional capacity, otherwise will use default capacity
516
517 :owner:
518 ArvadosFile that owns this block
519
520 """
521 return self._alloc_bufferblock(blockid, starting_capacity, owner)
522
524 if blockid is None:
525 blockid = str(uuid.uuid4())
526 bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
527 self._bufferblocks[bufferblock.blockid] = bufferblock
528 return bufferblock
529
530 @synchronized
532 """Create a new bufferblock initialized with the content of an existing bufferblock.
533
534 :block:
535 the buffer block to copy.
536
537 :owner:
538 ArvadosFile that owns the new block
539
540 """
541 new_blockid = str(uuid.uuid4())
542 bufferblock = block.clone(new_blockid, owner)
543 self._bufferblocks[bufferblock.blockid] = bufferblock
544 return bufferblock
545
546 @synchronized
548 return locator in self._bufferblocks
549
551 """Background uploader thread."""
552
553 while True:
554 try:
555 bufferblock = self._put_queue.get()
556 if bufferblock is None:
557 return
558
559 if self.copies is None:
560 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
561 else:
562 loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
563 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
564 except Exception as e:
565 bufferblock.set_state(_BufferBlock.ERROR, e)
566 finally:
567 if self._put_queue is not None:
568 self._put_queue.task_done()
569
571 with self.threads_lock:
572 if self._put_threads is None:
573
574
575
576
577
578
579
580
581
582
583
584 self._put_queue = queue.Queue(maxsize=2)
585
586 self._put_threads = []
587 for i in range(0, self.num_put_threads):
588 thread = threading.Thread(target=self._commit_bufferblock_worker)
589 self._put_threads.append(thread)
590 thread.daemon = True
591 thread.start()
592
594 """The background downloader thread."""
595 while True:
596 try:
597 b = self._prefetch_queue.get()
598 if b is None:
599 return
600 self._keep.get(b)
601 except Exception:
602 _logger.exception("Exception doing block prefetch")
603
604 @synchronized
606 if self._prefetch_threads is None:
607 self._prefetch_queue = queue.Queue()
608 self._prefetch_threads = []
609 for i in range(0, self.num_get_threads):
610 thread = threading.Thread(target=self._block_prefetch_worker)
611 self._prefetch_threads.append(thread)
612 thread.daemon = True
613 thread.start()
614
615
616 @synchronized
618 """Shut down and wait for background upload and download threads to finish."""
619
620 if self._put_threads is not None:
621 for t in self._put_threads:
622 self._put_queue.put(None)
623 for t in self._put_threads:
624 t.join()
625 self._put_threads = None
626 self._put_queue = None
627
628 if self._prefetch_threads is not None:
629 for t in self._prefetch_threads:
630 self._prefetch_queue.put(None)
631 for t in self._prefetch_threads:
632 t.join()
633 self._prefetch_threads = None
634 self._prefetch_queue = None
635
638
639 - def __exit__(self, exc_type, exc_value, traceback):
641
642 @synchronized
644 """Packs small blocks together before uploading"""
645
646 self._pending_write_size += closed_file_size
647
648
649 if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
650 return
651
652
653
654
655
656
657 try:
658 small_blocks = [b for b in listvalues(self._bufferblocks)
659 if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
660 except AttributeError:
661
662 raise UnownedBlockError()
663
664 if len(small_blocks) <= 1:
665
666 return
667
668 for bb in small_blocks:
669 bb.repack_writes()
670
671
672
673 self._pending_write_size = sum([b.size() for b in small_blocks])
674
675 if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
676 return
677
678 new_bb = self._alloc_bufferblock()
679 new_bb.owner = []
680 files = []
681 while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
682 bb = small_blocks.pop(0)
683 new_bb.owner.append(bb.owner)
684 self._pending_write_size -= bb.size()
685 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
686 files.append((bb, new_bb.write_pointer - bb.size()))
687
688 self.commit_bufferblock(new_bb, sync=sync)
689
690 for bb, new_bb_segment_offset in files:
691 newsegs = bb.owner.segments()
692 for s in newsegs:
693 if s.locator == bb.blockid:
694 s.locator = new_bb.blockid
695 s.segment_offset = new_bb_segment_offset+s.segment_offset
696 bb.owner.set_segments(newsegs)
697 self._delete_bufferblock(bb.blockid)
698
700 """Initiate a background upload of a bufferblock.
701
702 :block:
703 The block object to upload
704
705 :sync:
706 If `sync` is True, upload the block synchronously.
707 If `sync` is False, upload the block asynchronously. This will
708 return immediately unless the upload queue is at capacity, in
709 which case it will wait on an upload queue slot.
710
711 """
712 try:
713
714 block.set_state(_BufferBlock.PENDING)
715 except StateChangeError as e:
716 if e.state == _BufferBlock.PENDING:
717 if sync:
718 block.wait_for_commit.wait()
719 else:
720 return
721 if block.state() == _BufferBlock.COMMITTED:
722 return
723 elif block.state() == _BufferBlock.ERROR:
724 raise block.error
725 else:
726 raise
727
728 if sync:
729 try:
730 if self.copies is None:
731 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
732 else:
733 loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
734 block.set_state(_BufferBlock.COMMITTED, loc)
735 except Exception as e:
736 block.set_state(_BufferBlock.ERROR, e)
737 raise
738 else:
739 self.start_put_threads()
740 self._put_queue.put(block)
741
742 @synchronized
745
746 @synchronized
748 """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
749 when using truncate() to extend the size of a file.
750
751 For reference (and possible future optimization), the md5sum of the
752 padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
753
754 """
755
756 if self.padding_block is None:
757 self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
758 self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
759 self.commit_bufferblock(self.padding_block, False)
760 return self.padding_block
761
762 @synchronized
765
770
771 - def get_block_contents(self, locator, num_retries, cache_only=False):
772 """Fetch a block.
773
774 First checks to see if the locator is a BufferBlock and return that, if
775 not, passes the request through to KeepClient.get().
776
777 """
778 with self.lock:
779 if locator in self._bufferblocks:
780 bufferblock = self._bufferblocks[locator]
781 if bufferblock.state() != _BufferBlock.COMMITTED:
782 return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
783 else:
784 locator = bufferblock._locator
785 if cache_only:
786 return self._keep.get_from_cache(locator)
787 else:
788 return self._keep.get(locator, num_retries=num_retries)
789
791 """Commit all outstanding buffer blocks.
792
793 This is a synchronous call, and will not return until all buffer blocks
794 are uploaded. Raises KeepWriteError() if any blocks failed to upload.
795
796 """
797 self.repack_small_blocks(force=True, sync=True)
798
799 with self.lock:
800 items = listitems(self._bufferblocks)
801
802 for k,v in items:
803 if v.state() != _BufferBlock.COMMITTED and v.owner:
804
805
806 if isinstance(v.owner, ArvadosFile):
807 v.owner.flush(sync=False)
808
809 with self.lock:
810 if self._put_queue is not None:
811 self._put_queue.join()
812
813 err = []
814 for k,v in items:
815 if v.state() == _BufferBlock.ERROR:
816 err.append((v.locator(), v.error))
817 if err:
818 raise KeepWriteError("Error writing some blocks", err, label="block")
819
820 for k,v in items:
821
822
823 if v.owner:
824 if isinstance(v.owner, ArvadosFile):
825 v.owner.flush(sync=True)
826 elif isinstance(v.owner, list) and len(v.owner) > 0:
827
828
829
830 for owner in v.owner:
831 owner.flush(sync=True)
832 self.delete_bufferblock(k)
833
835 """Initiate a background download of a block.
836
837 This assumes that the underlying KeepClient implements a block cache,
838 so repeated requests for the same block will not result in repeated
839 downloads (unless the block is evicted from the cache.) This method
840 does not block.
841
842 """
843
844 if not self.prefetch_enabled:
845 return
846
847 if self._keep.get_from_cache(locator) is not None:
848 return
849
850 with self.lock:
851 if locator in self._bufferblocks:
852 return
853
854 self.start_get_threads()
855 self._prefetch_queue.put(locator)
856
859 """Represent a file in a Collection.
860
861 ArvadosFile manages the underlying representation of a file in Keep as a
862 sequence of segments spanning a set of blocks, and implements random
863 read/write access.
864
865 This object may be accessed from multiple threads.
866
867 """
868
869 __slots__ = ('parent', 'name', '_writers', '_committed',
870 '_segments', 'lock', '_current_bblock', 'fuse_entry')
871
872 - def __init__(self, parent, name, stream=[], segments=[]):
873 """
874 ArvadosFile constructor.
875
876 :stream:
877 a list of Range objects representing a block stream
878
879 :segments:
880 a list of Range objects representing segments
881 """
882 self.parent = parent
883 self.name = name
884 self._writers = set()
885 self._committed = False
886 self._segments = []
887 self.lock = parent.root_collection().lock
888 for s in segments:
889 self._add_segment(stream, s.locator, s.range_size)
890 self._current_bblock = None
891
894
895 @synchronized
897 """Returns True if any of the segment's locators is expired"""
898 for r in self._segments:
899 if KeepLocator(r.locator).permission_expired(as_of_dt):
900 return True
901 return False
902
903 @synchronized
906
907 @synchronized
908 - def clone(self, new_parent, new_name):
913
914 @must_be_writable
915 @synchronized
916 - def replace_contents(self, other):
917 """Replace segments of this file with segments from another `ArvadosFile` object."""
918
919 map_loc = {}
920 self._segments = []
921 for other_segment in other.segments():
922 new_loc = other_segment.locator
923 if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
924 if other_segment.locator not in map_loc:
925 bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
926 if bufferblock.state() != _BufferBlock.WRITABLE:
927 map_loc[other_segment.locator] = bufferblock.locator()
928 else:
929 map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
930 new_loc = map_loc[other_segment.locator]
931
932 self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
933
934 self.set_committed(False)
935
965
967 return not self.__eq__(other)
968
969 @synchronized
971 self._segments = segs
972
973 @synchronized
975 """Set committed flag.
976
977 If value is True, set committed to be True.
978
979 If value is False, set committed to be False for this and all parents.
980 """
981 if value == self._committed:
982 return
983 self._committed = value
984 if self._committed is False and self.parent is not None:
985 self.parent.set_committed(False)
986
987 @synchronized
989 """Get whether this is committed or not."""
990 return self._committed
991
992 @synchronized
994 """Add an ArvadosFileWriter reference to the list of writers"""
995 if isinstance(writer, ArvadosFileWriter):
996 self._writers.add(writer)
997
998 @synchronized
1012
1014 """
1015 Get whether this is closed or not. When the writers list is empty, the file
1016 is supposed to be closed.
1017 """
1018 return len(self._writers) == 0
1019
1020 @must_be_writable
1021 @synchronized
1023 """Shrink or expand the size of the file.
1024
1025 If `size` is less than the size of the file, the file contents after
1026 `size` will be discarded. If `size` is greater than the current size
1027 of the file, it will be filled with zero bytes.
1028
1029 """
1030 if size < self.size():
1031 new_segs = []
1032 for r in self._segments:
1033 range_end = r.range_start+r.range_size
1034 if r.range_start >= size:
1035
1036 break
1037 elif size < range_end:
1038 nr = Range(r.locator, r.range_start, size - r.range_start, 0)
1039 nr.segment_offset = r.segment_offset
1040 new_segs.append(nr)
1041 break
1042 else:
1043 new_segs.append(r)
1044
1045 self._segments = new_segs
1046 self.set_committed(False)
1047 elif size > self.size():
1048 padding = self.parent._my_block_manager().get_padding_block()
1049 diff = size - self.size()
1050 while diff > config.KEEP_BLOCK_SIZE:
1051 self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
1052 diff -= config.KEEP_BLOCK_SIZE
1053 if diff > 0:
1054 self._segments.append(Range(padding.blockid, self.size(), diff, 0))
1055 self.set_committed(False)
1056 else:
1057
1058 pass
1059
1060 - def readfrom(self, offset, size, num_retries, exact=False):
1061 """Read up to `size` bytes from the file starting at `offset`.
1062
1063 :exact:
1064 If False (default), return less data than requested if the read
1065 crosses a block boundary and the next block isn't cached. If True,
1066 only return less data than requested when hitting EOF.
1067 """
1068
1069 with self.lock:
1070 if size == 0 or offset >= self.size():
1071 return b''
1072 readsegs = locators_and_ranges(self._segments, offset, size)
1073 prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
1074
1075 locs = set()
1076 data = []
1077 for lr in readsegs:
1078 block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
1079 if block:
1080 blockview = memoryview(block)
1081 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
1082 locs.add(lr.locator)
1083 else:
1084 break
1085
1086 for lr in prefetch:
1087 if lr.locator not in locs:
1088 self.parent._my_block_manager().block_prefetch(lr.locator)
1089 locs.add(lr.locator)
1090
1091 return b''.join(data)
1092
1093 @must_be_writable
1094 @synchronized
1095 - def writeto(self, offset, data, num_retries):
1096 """Write `data` to the file starting at `offset`.
1097
1098 This will update existing bytes and/or extend the size of the file as
1099 necessary.
1100
1101 """
1102 if not isinstance(data, bytes) and not isinstance(data, memoryview):
1103 data = data.encode()
1104 if len(data) == 0:
1105 return
1106
1107 if offset > self.size():
1108 self.truncate(offset)
1109
1110 if len(data) > config.KEEP_BLOCK_SIZE:
1111
1112 n = 0
1113 dataview = memoryview(data)
1114 while n < len(data):
1115 self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1116 n += config.KEEP_BLOCK_SIZE
1117 return
1118
1119 self.set_committed(False)
1120
1121 if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1122 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1123
1124 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1125 self._current_bblock.repack_writes()
1126 if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1127 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1128 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1129
1130 self._current_bblock.append(data)
1131
1132 replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1133
1134 self.parent.notify(WRITE, self.parent, self.name, (self, self))
1135
1136 return len(data)
1137
1138 @synchronized
1139 - def flush(self, sync=True, num_retries=0):
1140 """Flush the current bufferblock to Keep.
1141
1142 :sync:
1143 If True, commit block synchronously, wait until buffer block has been written.
1144 If False, commit block asynchronously, return immediately after putting block into
1145 the keep put queue.
1146 """
1147 if self.committed():
1148 return
1149
1150 if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1151 if self._current_bblock.state() == _BufferBlock.WRITABLE:
1152 self._current_bblock.repack_writes()
1153 if self._current_bblock.state() != _BufferBlock.DELETED:
1154 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1155
1156 if sync:
1157 to_delete = set()
1158 for s in self._segments:
1159 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1160 if bb:
1161 if bb.state() != _BufferBlock.COMMITTED:
1162 self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1163 to_delete.add(s.locator)
1164 s.locator = bb.locator()
1165 for s in to_delete:
1166
1167
1168 if self.parent._my_block_manager().get_bufferblock(s).owner is self:
1169 self.parent._my_block_manager().delete_bufferblock(s)
1170
1171 self.parent.notify(MOD, self.parent, self.name, (self, self))
1172
1173 @must_be_writable
1174 @synchronized
1176 """Add a segment to the end of the file.
1177
1178 `pos` and `offset` reference a section of the stream described by
1179 `blocks` (a list of Range objects)
1180
1181 """
1182 self._add_segment(blocks, pos, size)
1183
1185 """Internal implementation of add_segment."""
1186 self.set_committed(False)
1187 for lr in locators_and_ranges(blocks, pos, size):
1188 last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1189 r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1190 self._segments.append(r)
1191
1192 @synchronized
1194 """Get the file size."""
1195 if self._segments:
1196 n = self._segments[-1]
1197 return n.range_start + n.range_size
1198 else:
1199 return 0
1200
1201 @synchronized
1202 - def manifest_text(self, stream_name=".", portable_locators=False,
1203 normalize=False, only_committed=False):
1204 buf = ""
1205 filestream = []
1206 for segment in self._segments:
1207 loc = segment.locator
1208 if self.parent._my_block_manager().is_bufferblock(loc):
1209 if only_committed:
1210 continue
1211 loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
1212 if portable_locators:
1213 loc = KeepLocator(loc).stripped()
1214 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1215 segment.segment_offset, segment.range_size))
1216 buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
1217 buf += "\n"
1218 return buf
1219
1220 @must_be_writable
1221 @synchronized
1229
1232 """Wraps ArvadosFile in a file-like object supporting reading only.
1233
1234 Be aware that this class is NOT thread safe as there is no locking around
1235 updating file pointer.
1236
1237 """
1238
1239 - def __init__(self, arvadosfile, mode="r", num_retries=None):
1242
1244 return self.arvadosfile.size()
1245
1248
1249 @_FileLikeObjectBase._before_close
1250 @retry_method
1251 - def read(self, size=None, num_retries=None):
1252 """Read up to `size` bytes from the file and return the result.
1253
1254 Starts at the current file position. If `size` is None, read the
1255 entire remainder of the file.
1256 """
1257 if size is None:
1258 data = []
1259 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1260 while rd:
1261 data.append(rd)
1262 self._filepos += len(rd)
1263 rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1264 return b''.join(data)
1265 else:
1266 data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1267 self._filepos += len(data)
1268 return data
1269
1270 @_FileLikeObjectBase._before_close
1271 @retry_method
1272 - def readfrom(self, offset, size, num_retries=None):
1273 """Read up to `size` bytes from the stream, starting at the specified file offset.
1274
1275 This method does not change the file position.
1276 """
1277 return self.arvadosfile.readfrom(offset, size, num_retries)
1278
1281
1284 """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1285
1286 Be aware that this class is NOT thread safe as there is no locking around
1287 updating file pointer.
1288
1289 """
1290
1291 - def __init__(self, arvadosfile, mode, num_retries=None):
1294
1297
1298 @_FileLikeObjectBase._before_close
1299 @retry_method
1300 - def write(self, data, num_retries=None):
1301 if self.mode[0] == "a":
1302 self._filepos = self.size()
1303 self.arvadosfile.writeto(self._filepos, data, num_retries)
1304 self._filepos += len(data)
1305 return len(data)
1306
1307 @_FileLikeObjectBase._before_close
1308 @retry_method
1310 for s in seq:
1311 self.write(s, num_retries=num_retries)
1312
1313 @_FileLikeObjectBase._before_close
1318
1319 @_FileLikeObjectBase._before_close
1321 self.arvadosfile.flush()
1322
1323 - def close(self, flush=True):
1327