arvados.stream
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import collections 6import hashlib 7import os 8import re 9import threading 10import functools 11import copy 12 13from ._ranges import locators_and_ranges, Range 14from .arvfile import StreamFileReader 15from arvados.retry import retry_method 16from arvados.keep import * 17from . import config 18from . import errors 19from . import util 20from ._normalize_stream import normalize_stream 21 22class StreamReader(object): 23 @util._deprecated('3.0', 'arvados.collection.Collecttion') 24 def __init__(self, tokens, keep=None, debug=False, _empty=False, 25 num_retries=10): 26 self._stream_name = None 27 self._data_locators = [] 28 self._files = collections.OrderedDict() 29 self._keep = keep 30 self.num_retries = num_retries 31 32 streamoffset = 0 33 34 # parse stream 35 for tok in tokens: 36 if debug: print('tok', tok) 37 if self._stream_name is None: 38 self._stream_name = tok.replace('\\040', ' ') 39 continue 40 41 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 42 if s: 43 blocksize = int(s.group(1)) 44 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 45 streamoffset += blocksize 46 continue 47 48 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 49 if s: 50 pos = int(s.group(1)) 51 size = int(s.group(2)) 52 name = s.group(3).replace('\\040', ' ') 53 if name not in self._files: 54 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 55 else: 56 filereader = self._files[name] 57 filereader.segments.append(Range(pos, filereader.size(), size)) 58 continue 59 60 raise errors.SyntaxError("Invalid manifest format") 61 62 def name(self): 63 return self._stream_name 64 65 def files(self): 66 return self._files 67 68 def all_files(self): 69 return list(self._files.values()) 70 71 def size(self): 72 n = self._data_locators[-1] 73 return n.range_start + n.range_size 74 75 def locators_and_ranges(self, range_start, range_size): 76 return locators_and_ranges(self._data_locators, range_start, range_size) 77 78 @retry_method 79 def _keepget(self, locator, num_retries=None): 80 return self._keep.get(locator, num_retries=num_retries) 81 82 @retry_method 83 def readfrom(self, start, size, num_retries=None): 84 """Read up to 'size' bytes from the stream, starting at 'start'""" 85 if size == 0: 86 return b'' 87 if self._keep is None: 88 self._keep = KeepClient(num_retries=self.num_retries) 89 data = [] 90 for lr in locators_and_ranges(self._data_locators, start, size): 91 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 92 return b''.join(data) 93 94 def manifest_text(self, strip=False): 95 manifest_text = [self.name().replace(' ', '\\040')] 96 if strip: 97 for d in self._data_locators: 98 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 99 manifest_text.append(m.group(0)) 100 else: 101 manifest_text.extend([d.locator for d in self._data_locators]) 102 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 103 for seg in f.segments]) 104 for f in self._files.values()]) 105 return ' '.join(manifest_text) + '\n'
class
StreamReader:
23class StreamReader(object): 24 @util._deprecated('3.0', 'arvados.collection.Collecttion') 25 def __init__(self, tokens, keep=None, debug=False, _empty=False, 26 num_retries=10): 27 self._stream_name = None 28 self._data_locators = [] 29 self._files = collections.OrderedDict() 30 self._keep = keep 31 self.num_retries = num_retries 32 33 streamoffset = 0 34 35 # parse stream 36 for tok in tokens: 37 if debug: print('tok', tok) 38 if self._stream_name is None: 39 self._stream_name = tok.replace('\\040', ' ') 40 continue 41 42 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 43 if s: 44 blocksize = int(s.group(1)) 45 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 46 streamoffset += blocksize 47 continue 48 49 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 50 if s: 51 pos = int(s.group(1)) 52 size = int(s.group(2)) 53 name = s.group(3).replace('\\040', ' ') 54 if name not in self._files: 55 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 56 else: 57 filereader = self._files[name] 58 filereader.segments.append(Range(pos, filereader.size(), size)) 59 continue 60 61 raise errors.SyntaxError("Invalid manifest format") 62 63 def name(self): 64 return self._stream_name 65 66 def files(self): 67 return self._files 68 69 def all_files(self): 70 return list(self._files.values()) 71 72 def size(self): 73 n = self._data_locators[-1] 74 return n.range_start + n.range_size 75 76 def locators_and_ranges(self, range_start, range_size): 77 return locators_and_ranges(self._data_locators, range_start, range_size) 78 79 @retry_method 80 def _keepget(self, locator, num_retries=None): 81 return self._keep.get(locator, num_retries=num_retries) 82 83 @retry_method 84 def readfrom(self, start, size, num_retries=None): 85 """Read up to 'size' bytes from the stream, starting at 'start'""" 86 if size == 0: 87 return b'' 88 if self._keep is None: 89 self._keep = KeepClient(num_retries=self.num_retries) 90 data = [] 91 for lr in locators_and_ranges(self._data_locators, start, size): 92 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 93 return b''.join(data) 94 95 def manifest_text(self, strip=False): 96 manifest_text = [self.name().replace(' ', '\\040')] 97 if strip: 98 for d in self._data_locators: 99 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 100 manifest_text.append(m.group(0)) 101 else: 102 manifest_text.extend([d.locator for d in self._data_locators]) 103 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 104 for seg in f.segments]) 105 for f in self._files.values()]) 106 return ' '.join(manifest_text) + '\n'
@util._deprecated('3.0', 'arvados.collection.Collecttion')
StreamReader(tokens, keep=None, debug=False, _empty=False, num_retries=10)
24 @util._deprecated('3.0', 'arvados.collection.Collecttion') 25 def __init__(self, tokens, keep=None, debug=False, _empty=False, 26 num_retries=10): 27 self._stream_name = None 28 self._data_locators = [] 29 self._files = collections.OrderedDict() 30 self._keep = keep 31 self.num_retries = num_retries 32 33 streamoffset = 0 34 35 # parse stream 36 for tok in tokens: 37 if debug: print('tok', tok) 38 if self._stream_name is None: 39 self._stream_name = tok.replace('\\040', ' ') 40 continue 41 42 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 43 if s: 44 blocksize = int(s.group(1)) 45 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 46 streamoffset += blocksize 47 continue 48 49 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 50 if s: 51 pos = int(s.group(1)) 52 size = int(s.group(2)) 53 name = s.group(3).replace('\\040', ' ') 54 if name not in self._files: 55 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 56 else: 57 filereader = self._files[name] 58 filereader.segments.append(Range(pos, filereader.size(), size)) 59 continue 60 61 raise errors.SyntaxError("Invalid manifest format")
@retry_method
def
readfrom(self, start, size, num_retries=None):
83 @retry_method 84 def readfrom(self, start, size, num_retries=None): 85 """Read up to 'size' bytes from the stream, starting at 'start'""" 86 if size == 0: 87 return b'' 88 if self._keep is None: 89 self._keep = KeepClient(num_retries=self.num_retries) 90 data = [] 91 for lr in locators_and_ranges(self._data_locators, start, size): 92 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 93 return b''.join(data)
Read up to ‘size’ bytes from the stream, starting at ‘start’
def
manifest_text(self, strip=False):
95 def manifest_text(self, strip=False): 96 manifest_text = [self.name().replace(' ', '\\040')] 97 if strip: 98 for d in self._data_locators: 99 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 100 manifest_text.append(m.group(0)) 101 else: 102 manifest_text.extend([d.locator for d in self._data_locators]) 103 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 104 for seg in f.segments]) 105 for f in self._files.values()]) 106 return ' '.join(manifest_text) + '\n'