arvados.stream

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5import collections
  6import hashlib
  7import os
  8import re
  9import threading
 10import functools
 11import copy
 12
 13from ._ranges import locators_and_ranges, Range
 14from .arvfile import StreamFileReader
 15from arvados.retry import retry_method
 16from arvados.keep import *
 17from . import config
 18from . import errors
 19from . import util
 20from ._normalize_stream import normalize_stream
 21
 22class StreamReader(object):
 23    @util._deprecated('3.0', 'arvados.collection.Collecttion')
 24    def __init__(self, tokens, keep=None, debug=False, _empty=False,
 25                 num_retries=10):
 26        self._stream_name = None
 27        self._data_locators = []
 28        self._files = collections.OrderedDict()
 29        self._keep = keep
 30        self.num_retries = num_retries
 31
 32        streamoffset = 0
 33
 34        # parse stream
 35        for tok in tokens:
 36            if debug: print('tok', tok)
 37            if self._stream_name is None:
 38                self._stream_name = tok.replace('\\040', ' ')
 39                continue
 40
 41            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
 42            if s:
 43                blocksize = int(s.group(1))
 44                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
 45                streamoffset += blocksize
 46                continue
 47
 48            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
 49            if s:
 50                pos = int(s.group(1))
 51                size = int(s.group(2))
 52                name = s.group(3).replace('\\040', ' ')
 53                if name not in self._files:
 54                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
 55                else:
 56                    filereader = self._files[name]
 57                    filereader.segments.append(Range(pos, filereader.size(), size))
 58                continue
 59
 60            raise errors.SyntaxError("Invalid manifest format")
 61
 62    def name(self):
 63        return self._stream_name
 64
 65    def files(self):
 66        return self._files
 67
 68    def all_files(self):
 69        return list(self._files.values())
 70
 71    def size(self):
 72        n = self._data_locators[-1]
 73        return n.range_start + n.range_size
 74
 75    def locators_and_ranges(self, range_start, range_size):
 76        return locators_and_ranges(self._data_locators, range_start, range_size)
 77
 78    @retry_method
 79    def _keepget(self, locator, num_retries=None):
 80        return self._keep.get(locator, num_retries=num_retries)
 81
 82    @retry_method
 83    def readfrom(self, start, size, num_retries=None):
 84        """Read up to 'size' bytes from the stream, starting at 'start'"""
 85        if size == 0:
 86            return b''
 87        if self._keep is None:
 88            self._keep = KeepClient(num_retries=self.num_retries)
 89        data = []
 90        for lr in locators_and_ranges(self._data_locators, start, size):
 91            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
 92        return b''.join(data)
 93
 94    def manifest_text(self, strip=False):
 95        manifest_text = [self.name().replace(' ', '\\040')]
 96        if strip:
 97            for d in self._data_locators:
 98                m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
 99                manifest_text.append(m.group(0))
100        else:
101            manifest_text.extend([d.locator for d in self._data_locators])
102        manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
103                                        for seg in f.segments])
104                              for f in self._files.values()])
105        return ' '.join(manifest_text) + '\n'
class StreamReader:
 23class StreamReader(object):
 24    @util._deprecated('3.0', 'arvados.collection.Collecttion')
 25    def __init__(self, tokens, keep=None, debug=False, _empty=False,
 26                 num_retries=10):
 27        self._stream_name = None
 28        self._data_locators = []
 29        self._files = collections.OrderedDict()
 30        self._keep = keep
 31        self.num_retries = num_retries
 32
 33        streamoffset = 0
 34
 35        # parse stream
 36        for tok in tokens:
 37            if debug: print('tok', tok)
 38            if self._stream_name is None:
 39                self._stream_name = tok.replace('\\040', ' ')
 40                continue
 41
 42            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
 43            if s:
 44                blocksize = int(s.group(1))
 45                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
 46                streamoffset += blocksize
 47                continue
 48
 49            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
 50            if s:
 51                pos = int(s.group(1))
 52                size = int(s.group(2))
 53                name = s.group(3).replace('\\040', ' ')
 54                if name not in self._files:
 55                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
 56                else:
 57                    filereader = self._files[name]
 58                    filereader.segments.append(Range(pos, filereader.size(), size))
 59                continue
 60
 61            raise errors.SyntaxError("Invalid manifest format")
 62
 63    def name(self):
 64        return self._stream_name
 65
 66    def files(self):
 67        return self._files
 68
 69    def all_files(self):
 70        return list(self._files.values())
 71
 72    def size(self):
 73        n = self._data_locators[-1]
 74        return n.range_start + n.range_size
 75
 76    def locators_and_ranges(self, range_start, range_size):
 77        return locators_and_ranges(self._data_locators, range_start, range_size)
 78
 79    @retry_method
 80    def _keepget(self, locator, num_retries=None):
 81        return self._keep.get(locator, num_retries=num_retries)
 82
 83    @retry_method
 84    def readfrom(self, start, size, num_retries=None):
 85        """Read up to 'size' bytes from the stream, starting at 'start'"""
 86        if size == 0:
 87            return b''
 88        if self._keep is None:
 89            self._keep = KeepClient(num_retries=self.num_retries)
 90        data = []
 91        for lr in locators_and_ranges(self._data_locators, start, size):
 92            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
 93        return b''.join(data)
 94
 95    def manifest_text(self, strip=False):
 96        manifest_text = [self.name().replace(' ', '\\040')]
 97        if strip:
 98            for d in self._data_locators:
 99                m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
100                manifest_text.append(m.group(0))
101        else:
102            manifest_text.extend([d.locator for d in self._data_locators])
103        manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
104                                        for seg in f.segments])
105                              for f in self._files.values()])
106        return ' '.join(manifest_text) + '\n'
@util._deprecated('3.0', 'arvados.collection.Collecttion')
StreamReader(tokens, keep=None, debug=False, _empty=False, num_retries=10)
24    @util._deprecated('3.0', 'arvados.collection.Collecttion')
25    def __init__(self, tokens, keep=None, debug=False, _empty=False,
26                 num_retries=10):
27        self._stream_name = None
28        self._data_locators = []
29        self._files = collections.OrderedDict()
30        self._keep = keep
31        self.num_retries = num_retries
32
33        streamoffset = 0
34
35        # parse stream
36        for tok in tokens:
37            if debug: print('tok', tok)
38            if self._stream_name is None:
39                self._stream_name = tok.replace('\\040', ' ')
40                continue
41
42            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
43            if s:
44                blocksize = int(s.group(1))
45                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
46                streamoffset += blocksize
47                continue
48
49            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
50            if s:
51                pos = int(s.group(1))
52                size = int(s.group(2))
53                name = s.group(3).replace('\\040', ' ')
54                if name not in self._files:
55                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
56                else:
57                    filereader = self._files[name]
58                    filereader.segments.append(Range(pos, filereader.size(), size))
59                continue
60
61            raise errors.SyntaxError("Invalid manifest format")
num_retries
def name(self):
63    def name(self):
64        return self._stream_name
def files(self):
66    def files(self):
67        return self._files
def all_files(self):
69    def all_files(self):
70        return list(self._files.values())
def size(self):
72    def size(self):
73        n = self._data_locators[-1]
74        return n.range_start + n.range_size
def locators_and_ranges(self, range_start, range_size):
76    def locators_and_ranges(self, range_start, range_size):
77        return locators_and_ranges(self._data_locators, range_start, range_size)
@retry_method
def readfrom(self, start, size, num_retries=None):
83    @retry_method
84    def readfrom(self, start, size, num_retries=None):
85        """Read up to 'size' bytes from the stream, starting at 'start'"""
86        if size == 0:
87            return b''
88        if self._keep is None:
89            self._keep = KeepClient(num_retries=self.num_retries)
90        data = []
91        for lr in locators_and_ranges(self._data_locators, start, size):
92            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
93        return b''.join(data)

Read up to ‘size’ bytes from the stream, starting at ‘start’

def manifest_text(self, strip=False):
 95    def manifest_text(self, strip=False):
 96        manifest_text = [self.name().replace(' ', '\\040')]
 97        if strip:
 98            for d in self._data_locators:
 99                m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
100                manifest_text.append(m.group(0))
101        else:
102            manifest_text.extend([d.locator for d in self._data_locators])
103        manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
104                                        for seg in f.segments])
105                              for f in self._files.values()])
106        return ' '.join(manifest_text) + '\n'