Package arvados :: Module stream
[hide private]
[frames] | no frames]

Source Code for Module arvados.stream

  1  # Copyright (C) The Arvados Authors. All rights reserved. 
  2  # 
  3  # SPDX-License-Identifier: Apache-2.0 
  4   
  5  from __future__ import print_function 
  6  from __future__ import absolute_import 
  7  from future.utils import listvalues 
  8  from builtins import object 
  9  import collections 
 10  import hashlib 
 11  import os 
 12  import re 
 13  import threading 
 14  import functools 
 15  import copy 
 16   
 17  from ._ranges import locators_and_ranges, Range 
 18  from .arvfile import StreamFileReader 
 19  from arvados.retry import retry_method 
 20  from arvados.keep import * 
 21  from . import config 
 22  from . import errors 
 23  from ._normalize_stream import normalize_stream 
24 25 -class StreamReader(object):
26 - def __init__(self, tokens, keep=None, debug=False, _empty=False, 27 num_retries=0):
28 self._stream_name = None 29 self._data_locators = [] 30 self._files = collections.OrderedDict() 31 self._keep = keep 32 self.num_retries = num_retries 33 34 streamoffset = 0 35 36 # parse stream 37 for tok in tokens: 38 if debug: print('tok', tok) 39 if self._stream_name is None: 40 self._stream_name = tok.replace('\\040', ' ') 41 continue 42 43 s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) 44 if s: 45 blocksize = int(s.group(1)) 46 self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) 47 streamoffset += blocksize 48 continue 49 50 s = re.search(r'^(\d+):(\d+):(\S+)', tok) 51 if s: 52 pos = int(s.group(1)) 53 size = int(s.group(2)) 54 name = s.group(3).replace('\\040', ' ') 55 if name not in self._files: 56 self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) 57 else: 58 filereader = self._files[name] 59 filereader.segments.append(Range(pos, filereader.size(), size)) 60 continue 61 62 raise errors.SyntaxError("Invalid manifest format")
63
64 - def name(self):
65 return self._stream_name
66
67 - def files(self):
68 return self._files
69
70 - def all_files(self):
71 return listvalues(self._files)
72
73 - def size(self):
74 n = self._data_locators[-1] 75 return n.range_start + n.range_size
76
77 - def locators_and_ranges(self, range_start, range_size):
78 return locators_and_ranges(self._data_locators, range_start, range_size)
79 80 @retry_method
81 - def _keepget(self, locator, num_retries=None):
82 return self._keep.get(locator, num_retries=num_retries)
83 84 @retry_method
85 - def readfrom(self, start, size, num_retries=None):
86 """Read up to 'size' bytes from the stream, starting at 'start'""" 87 if size == 0: 88 return b'' 89 if self._keep is None: 90 self._keep = KeepClient(num_retries=self.num_retries) 91 data = [] 92 for lr in locators_and_ranges(self._data_locators, start, size): 93 data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size]) 94 return b''.join(data)
95
96 - def manifest_text(self, strip=False):
97 manifest_text = [self.name().replace(' ', '\\040')] 98 if strip: 99 for d in self._data_locators: 100 m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator) 101 manifest_text.append(m.group(0)) 102 else: 103 manifest_text.extend([d.locator for d in self._data_locators]) 104 manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040')) 105 for seg in f.segments]) 106 for f in listvalues(self._files)]) 107 return ' '.join(manifest_text) + '\n'
108