arvados.stream
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import print_function 6from __future__ import absolute_import 7from future.utils import listvalues 8from builtins import object 9import collections 10import hashlib 11import os 12import re 13import threading 14import functools 15import copy 16 17from ._ranges import locators_and_ranges, Range 18from .arvfile import StreamFileReader 19from arvados.retry import retry_method 20from arvados.keep import * 21from . import config 22from . import errors 23from . import util 24from ._normalize_stream import normalize_stream 25 26class StreamReader(object): 27 @util._deprecated('3.0', 'arvados.collection.Collecttion') 28 def __init__(self, tokens, keep=None, debug=False, _empty=False, 29 num_retries=10): 30 self._stream_name = None 31 self._data_locators = [] 32 self._files = collections.OrderedDict() 33 self._keep = keep 34 self.num_retries = num_retries 35 36 streamoffset = 0 37 38 # parse stream 39 for tok in tokens: 40 if debug: print('tok', tok) 41 if self._stream_name is None: 42 self._stream_name = tok.replace('\\040', ' ') 43 continue 44 45 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 46 if s: 47 blocksize = int(s.group(1)) 48 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 49 streamoffset += blocksize 50 continue 51 52 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 53 if s: 54 pos = int(s.group(1)) 55 size = int(s.group(2)) 56 name = s.group(3).replace('\\040', ' ') 57 if name not in self._files: 58 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 59 else: 60 filereader = self._files[name] 61 filereader.segments.append(Range(pos, filereader.size(), size)) 62 continue 63 64 raise errors.SyntaxError("Invalid manifest format") 65 66 def name(self): 67 return self._stream_name 68 69 def files(self): 70 return self._files 71 72 def all_files(self): 73 return listvalues(self._files) 74 75 def size(self): 76 n = self._data_locators[-1] 77 return n.range_start + n.range_size 78 79 def locators_and_ranges(self, range_start, range_size): 80 return locators_and_ranges(self._data_locators, range_start, range_size) 81 82 @retry_method 83 def _keepget(self, locator, num_retries=None): 84 return self._keep.get(locator, num_retries=num_retries) 85 86 @retry_method 87 def readfrom(self, start, size, num_retries=None): 88 """Read up to 'size' bytes from the stream, starting at 'start'""" 89 if size == 0: 90 return b'' 91 if self._keep is None: 92 self._keep = KeepClient(num_retries=self.num_retries) 93 data = [] 94 for lr in locators_and_ranges(self._data_locators, start, size): 95 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 96 return b''.join(data) 97 98 def manifest_text(self, strip=False): 99 manifest_text = [self.name().replace(' ', '\\040')] 100 if strip: 101 for d in self._data_locators: 102 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 103 manifest_text.append(m.group(0)) 104 else: 105 manifest_text.extend([d.locator for d in self._data_locators]) 106 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 107 for seg in f.segments]) 108 for f in listvalues(self._files)]) 109 return ' '.join(manifest_text) + '\n'
class
StreamReader:
27class StreamReader(object): 28 @util._deprecated('3.0', 'arvados.collection.Collecttion') 29 def __init__(self, tokens, keep=None, debug=False, _empty=False, 30 num_retries=10): 31 self._stream_name = None 32 self._data_locators = [] 33 self._files = collections.OrderedDict() 34 self._keep = keep 35 self.num_retries = num_retries 36 37 streamoffset = 0 38 39 # parse stream 40 for tok in tokens: 41 if debug: print('tok', tok) 42 if self._stream_name is None: 43 self._stream_name = tok.replace('\\040', ' ') 44 continue 45 46 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 47 if s: 48 blocksize = int(s.group(1)) 49 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 50 streamoffset += blocksize 51 continue 52 53 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 54 if s: 55 pos = int(s.group(1)) 56 size = int(s.group(2)) 57 name = s.group(3).replace('\\040', ' ') 58 if name not in self._files: 59 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 60 else: 61 filereader = self._files[name] 62 filereader.segments.append(Range(pos, filereader.size(), size)) 63 continue 64 65 raise errors.SyntaxError("Invalid manifest format") 66 67 def name(self): 68 return self._stream_name 69 70 def files(self): 71 return self._files 72 73 def all_files(self): 74 return listvalues(self._files) 75 76 def size(self): 77 n = self._data_locators[-1] 78 return n.range_start + n.range_size 79 80 def locators_and_ranges(self, range_start, range_size): 81 return locators_and_ranges(self._data_locators, range_start, range_size) 82 83 @retry_method 84 def _keepget(self, locator, num_retries=None): 85 return self._keep.get(locator, num_retries=num_retries) 86 87 @retry_method 88 def readfrom(self, start, size, num_retries=None): 89 """Read up to 'size' bytes from the stream, starting at 'start'""" 90 if size == 0: 91 return b'' 92 if self._keep is None: 93 self._keep = KeepClient(num_retries=self.num_retries) 94 data = [] 95 for lr in locators_and_ranges(self._data_locators, start, size): 96 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 97 return b''.join(data) 98 99 def manifest_text(self, strip=False): 100 manifest_text = [self.name().replace(' ', '\\040')] 101 if strip: 102 for d in self._data_locators: 103 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 104 manifest_text.append(m.group(0)) 105 else: 106 manifest_text.extend([d.locator for d in self._data_locators]) 107 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 108 for seg in f.segments]) 109 for f in listvalues(self._files)]) 110 return ' '.join(manifest_text) + '\n'
@util._deprecated('3.0', 'arvados.collection.Collecttion')
StreamReader(tokens, keep=None, debug=False, _empty=False, num_retries=10)
28 @util._deprecated('3.0', 'arvados.collection.Collecttion') 29 def __init__(self, tokens, keep=None, debug=False, _empty=False, 30 num_retries=10): 31 self._stream_name = None 32 self._data_locators = [] 33 self._files = collections.OrderedDict() 34 self._keep = keep 35 self.num_retries = num_retries 36 37 streamoffset = 0 38 39 # parse stream 40 for tok in tokens: 41 if debug: print('tok', tok) 42 if self._stream_name is None: 43 self._stream_name = tok.replace('\\040', ' ') 44 continue 45 46 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 47 if s: 48 blocksize = int(s.group(1)) 49 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 50 streamoffset += blocksize 51 continue 52 53 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 54 if s: 55 pos = int(s.group(1)) 56 size = int(s.group(2)) 57 name = s.group(3).replace('\\040', ' ') 58 if name not in self._files: 59 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 60 else: 61 filereader = self._files[name] 62 filereader.segments.append(Range(pos, filereader.size(), size)) 63 continue 64 65 raise errors.SyntaxError("Invalid manifest format")
@retry_method
def
readfrom(self, start, size, num_retries=None):
87 @retry_method 88 def readfrom(self, start, size, num_retries=None): 89 """Read up to 'size' bytes from the stream, starting at 'start'""" 90 if size == 0: 91 return b'' 92 if self._keep is None: 93 self._keep = KeepClient(num_retries=self.num_retries) 94 data = [] 95 for lr in locators_and_ranges(self._data_locators, start, size): 96 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 97 return b''.join(data)
Read up to ‘size’ bytes from the stream, starting at ‘start’
def
manifest_text(self, strip=False):
99 def manifest_text(self, strip=False): 100 manifest_text = [self.name().replace(' ', '\\040')] 101 if strip: 102 for d in self._data_locators: 103 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 104 manifest_text.append(m.group(0)) 105 else: 106 manifest_text.extend([d.locator for d in self._data_locators]) 107 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 108 for seg in f.segments]) 109 for f in listvalues(self._files)]) 110 return ' '.join(manifest_text) + '\n'