arvados.stream

  1# Copyright (C) The Arvados Authors. All rights reserved.
  2#
  3# SPDX-License-Identifier: Apache-2.0
  4
  5from __future__ import print_function
  6from __future__ import absolute_import
  7from future.utils import listvalues
  8from builtins import object
  9import collections
 10import hashlib
 11import os
 12import re
 13import threading
 14import functools
 15import copy
 16
 17from ._ranges import locators_and_ranges, Range
 18from .arvfile import StreamFileReader
 19from arvados.retry import retry_method
 20from arvados.keep import *
 21from . import config
 22from . import errors
 23from . import util
 24from ._normalize_stream import normalize_stream
 25
 26class StreamReader(object):
 27    @util._deprecated('3.0', 'arvados.collection.Collecttion')
 28    def __init__(self, tokens, keep=None, debug=False, _empty=False,
 29                 num_retries=10):
 30        self._stream_name = None
 31        self._data_locators = []
 32        self._files = collections.OrderedDict()
 33        self._keep = keep
 34        self.num_retries = num_retries
 35
 36        streamoffset = 0
 37
 38        # parse stream
 39        for tok in tokens:
 40            if debug: print('tok', tok)
 41            if self._stream_name is None:
 42                self._stream_name = tok.replace('\\040', ' ')
 43                continue
 44
 45            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
 46            if s:
 47                blocksize = int(s.group(1))
 48                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
 49                streamoffset += blocksize
 50                continue
 51
 52            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
 53            if s:
 54                pos = int(s.group(1))
 55                size = int(s.group(2))
 56                name = s.group(3).replace('\\040', ' ')
 57                if name not in self._files:
 58                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
 59                else:
 60                    filereader = self._files[name]
 61                    filereader.segments.append(Range(pos, filereader.size(), size))
 62                continue
 63
 64            raise errors.SyntaxError("Invalid manifest format")
 65
 66    def name(self):
 67        return self._stream_name
 68
 69    def files(self):
 70        return self._files
 71
 72    def all_files(self):
 73        return listvalues(self._files)
 74
 75    def size(self):
 76        n = self._data_locators[-1]
 77        return n.range_start + n.range_size
 78
 79    def locators_and_ranges(self, range_start, range_size):
 80        return locators_and_ranges(self._data_locators, range_start, range_size)
 81
 82    @retry_method
 83    def _keepget(self, locator, num_retries=None):
 84        return self._keep.get(locator, num_retries=num_retries)
 85
 86    @retry_method
 87    def readfrom(self, start, size, num_retries=None):
 88        """Read up to 'size' bytes from the stream, starting at 'start'"""
 89        if size == 0:
 90            return b''
 91        if self._keep is None:
 92            self._keep = KeepClient(num_retries=self.num_retries)
 93        data = []
 94        for lr in locators_and_ranges(self._data_locators, start, size):
 95            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
 96        return b''.join(data)
 97
 98    def manifest_text(self, strip=False):
 99        manifest_text = [self.name().replace(' ', '\\040')]
100        if strip:
101            for d in self._data_locators:
102                m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
103                manifest_text.append(m.group(0))
104        else:
105            manifest_text.extend([d.locator for d in self._data_locators])
106        manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
107                                        for seg in f.segments])
108                              for f in listvalues(self._files)])
109        return ' '.join(manifest_text) + '\n'
class StreamReader:
 27class StreamReader(object):
 28    @util._deprecated('3.0', 'arvados.collection.Collecttion')
 29    def __init__(self, tokens, keep=None, debug=False, _empty=False,
 30                 num_retries=10):
 31        self._stream_name = None
 32        self._data_locators = []
 33        self._files = collections.OrderedDict()
 34        self._keep = keep
 35        self.num_retries = num_retries
 36
 37        streamoffset = 0
 38
 39        # parse stream
 40        for tok in tokens:
 41            if debug: print('tok', tok)
 42            if self._stream_name is None:
 43                self._stream_name = tok.replace('\\040', ' ')
 44                continue
 45
 46            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
 47            if s:
 48                blocksize = int(s.group(1))
 49                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
 50                streamoffset += blocksize
 51                continue
 52
 53            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
 54            if s:
 55                pos = int(s.group(1))
 56                size = int(s.group(2))
 57                name = s.group(3).replace('\\040', ' ')
 58                if name not in self._files:
 59                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
 60                else:
 61                    filereader = self._files[name]
 62                    filereader.segments.append(Range(pos, filereader.size(), size))
 63                continue
 64
 65            raise errors.SyntaxError("Invalid manifest format")
 66
 67    def name(self):
 68        return self._stream_name
 69
 70    def files(self):
 71        return self._files
 72
 73    def all_files(self):
 74        return listvalues(self._files)
 75
 76    def size(self):
 77        n = self._data_locators[-1]
 78        return n.range_start + n.range_size
 79
 80    def locators_and_ranges(self, range_start, range_size):
 81        return locators_and_ranges(self._data_locators, range_start, range_size)
 82
 83    @retry_method
 84    def _keepget(self, locator, num_retries=None):
 85        return self._keep.get(locator, num_retries=num_retries)
 86
 87    @retry_method
 88    def readfrom(self, start, size, num_retries=None):
 89        """Read up to 'size' bytes from the stream, starting at 'start'"""
 90        if size == 0:
 91            return b''
 92        if self._keep is None:
 93            self._keep = KeepClient(num_retries=self.num_retries)
 94        data = []
 95        for lr in locators_and_ranges(self._data_locators, start, size):
 96            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
 97        return b''.join(data)
 98
 99    def manifest_text(self, strip=False):
100        manifest_text = [self.name().replace(' ', '\\040')]
101        if strip:
102            for d in self._data_locators:
103                m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
104                manifest_text.append(m.group(0))
105        else:
106            manifest_text.extend([d.locator for d in self._data_locators])
107        manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
108                                        for seg in f.segments])
109                              for f in listvalues(self._files)])
110        return ' '.join(manifest_text) + '\n'
@util._deprecated('3.0', 'arvados.collection.Collecttion')
StreamReader(tokens, keep=None, debug=False, _empty=False, num_retries=10)
28    @util._deprecated('3.0', 'arvados.collection.Collecttion')
29    def __init__(self, tokens, keep=None, debug=False, _empty=False,
30                 num_retries=10):
31        self._stream_name = None
32        self._data_locators = []
33        self._files = collections.OrderedDict()
34        self._keep = keep
35        self.num_retries = num_retries
36
37        streamoffset = 0
38
39        # parse stream
40        for tok in tokens:
41            if debug: print('tok', tok)
42            if self._stream_name is None:
43                self._stream_name = tok.replace('\\040', ' ')
44                continue
45
46            s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
47            if s:
48                blocksize = int(s.group(1))
49                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
50                streamoffset += blocksize
51                continue
52
53            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
54            if s:
55                pos = int(s.group(1))
56                size = int(s.group(2))
57                name = s.group(3).replace('\\040', ' ')
58                if name not in self._files:
59                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
60                else:
61                    filereader = self._files[name]
62                    filereader.segments.append(Range(pos, filereader.size(), size))
63                continue
64
65            raise errors.SyntaxError("Invalid manifest format")
num_retries
def name(self):
67    def name(self):
68        return self._stream_name
def files(self):
70    def files(self):
71        return self._files
def all_files(self):
73    def all_files(self):
74        return listvalues(self._files)
def size(self):
76    def size(self):
77        n = self._data_locators[-1]
78        return n.range_start + n.range_size
def locators_and_ranges(self, range_start, range_size):
80    def locators_and_ranges(self, range_start, range_size):
81        return locators_and_ranges(self._data_locators, range_start, range_size)
@retry_method
def readfrom(self, start, size, num_retries=None):
87    @retry_method
88    def readfrom(self, start, size, num_retries=None):
89        """Read up to 'size' bytes from the stream, starting at 'start'"""
90        if size == 0:
91            return b''
92        if self._keep is None:
93            self._keep = KeepClient(num_retries=self.num_retries)
94        data = []
95        for lr in locators_and_ranges(self._data_locators, start, size):
96            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
97        return b''.join(data)

Read up to ‘size’ bytes from the stream, starting at ‘start’

def manifest_text(self, strip=False):
 99    def manifest_text(self, strip=False):
100        manifest_text = [self.name().replace(' ', '\\040')]
101        if strip:
102            for d in self._data_locators:
103                m = re.match(r'^[0-9a-f]{32}\+\d+', d.locator)
104                manifest_text.append(m.group(0))
105        else:
106            manifest_text.extend([d.locator for d in self._data_locators])
107        manifest_text.extend([' '.join(["{}:{}:{}".format(seg.locator, seg.range_size, f.name.replace(' ', '\\040'))
108                                        for seg in f.segments])
109                              for f in listvalues(self._files)])
110        return ' '.join(manifest_text) + '\n'