1
2
3
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import datetime
17 import ciso8601
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
25 from .keep import KeepLocator, KeepClient
26 from .stream import StreamReader
27 from ._normalize_stream import normalize_stream
28 from ._ranges import Range, LocatorAndRange
29 from .safeapi import ThreadSafeApiCache
30 import arvados.config as config
31 import arvados.errors as errors
32 import arvados.util
33 import arvados.events as events
34 from arvados.retry import retry_method
35
36 _logger = logging.getLogger('arvados.collection')
39 """Abstract base class for Collection classes."""
40
43
44 - def __exit__(self, exc_type, exc_value, traceback):
46
48 if self._keep_client is None:
49 self._keep_client = KeepClient(api_client=self._api_client,
50 num_retries=self.num_retries)
51 return self._keep_client
52
54 """Get the manifest with locator hints stripped.
55
56 Return the manifest for the current collection with all
57 non-portable hints (i.e., permission signatures and other
58 hints other than size hints) removed from the locators.
59 """
60 raw = self.manifest_text()
61 clean = []
62 for line in raw.split("\n"):
63 fields = line.split()
64 if fields:
65 clean_fields = fields[:1] + [
66 (re.sub(r'\+[^\d][^\+]*', '', x)
67 if re.match(arvados.util.keep_locator_pattern, x)
68 else x)
69 for x in fields[1:]]
70 clean += [' '.join(clean_fields), "\n"]
71 return ''.join(clean)
72
95
98 """Deprecated, use Collection instead."""
99
100 - def __init__(self, api_client=None, num_retries=0, replication=None):
101 """Instantiate a CollectionWriter.
102
103 CollectionWriter lets you build a new Arvados Collection from scratch.
104 Write files to it. The CollectionWriter will upload data to Keep as
105 appropriate, and provide you with the Collection manifest text when
106 you're finished.
107
108 Arguments:
109 * api_client: The API client to use to look up Collections. If not
110 provided, CollectionReader will build one from available Arvados
111 configuration.
112 * num_retries: The default number of times to retry failed
113 service requests. Default 0. You may change this value
114 after instantiation, but note those changes may not
115 propagate to related objects like the Keep client.
116 * replication: The number of copies of each block to store.
117 If this argument is None or not supplied, replication is
118 the server-provided default if available, otherwise 2.
119 """
120 self._api_client = api_client
121 self.num_retries = num_retries
122 self.replication = (2 if replication is None else replication)
123 self._keep_client = None
124 self._data_buffer = []
125 self._data_buffer_len = 0
126 self._current_stream_files = []
127 self._current_stream_length = 0
128 self._current_stream_locators = []
129 self._current_stream_name = '.'
130 self._current_file_name = None
131 self._current_file_pos = 0
132 self._finished_streams = []
133 self._close_file = None
134 self._queued_file = None
135 self._queued_dirents = deque()
136 self._queued_trees = deque()
137 self._last_open = None
138
139 - def __exit__(self, exc_type, exc_value, traceback):
140 if exc_type is None:
141 self.finish()
142
144
145
146
147
148
149
150
151
152
153
154
155
156
157 while True:
158 if self._queued_file:
159 self._work_file()
160 elif self._queued_dirents:
161 self._work_dirents()
162 elif self._queued_trees:
163 self._work_trees()
164 else:
165 break
166
178
195
197 path, stream_name, max_manifest_depth = self._queued_trees[0]
198 d = arvados.util.listdir_recursive(
199 path, max_depth = (None if max_manifest_depth == 0 else 0))
200 if d:
201 self._queue_dirents(stream_name, d)
202 else:
203 self._queued_trees.popleft()
204
206 assert (self._queued_file is None), "tried to queue more than one file"
207 if not hasattr(source, 'read'):
208 source = open(source, 'rb')
209 self._close_file = True
210 else:
211 self._close_file = False
212 if filename is None:
213 filename = os.path.basename(source.name)
214 self.start_new_file(filename)
215 self._queued_file = source
216
218 assert (not self._queued_dirents), "tried to queue more than one tree"
219 self._queued_dirents = deque(sorted(dirents))
220
221 - def _queue_tree(self, path, stream_name, max_manifest_depth):
223
227
232
233 - def write(self, newdata):
234 if isinstance(newdata, bytes):
235 pass
236 elif isinstance(newdata, str):
237 newdata = newdata.encode()
238 elif hasattr(newdata, '__iter__'):
239 for s in newdata:
240 self.write(s)
241 return
242 self._data_buffer.append(newdata)
243 self._data_buffer_len += len(newdata)
244 self._current_stream_length += len(newdata)
245 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
246 self.flush_data()
247
248 - def open(self, streampath, filename=None):
249 """open(streampath[, filename]) -> file-like object
250
251 Pass in the path of a file to write to the Collection, either as a
252 single string or as two separate stream name and file name arguments.
253 This method returns a file-like object you can write to add it to the
254 Collection.
255
256 You may only have one file object from the Collection open at a time,
257 so be sure to close the object when you're done. Using the object in
258 a with statement makes that easy::
259
260 with cwriter.open('./doc/page1.txt') as outfile:
261 outfile.write(page1_data)
262 with cwriter.open('./doc/page2.txt') as outfile:
263 outfile.write(page2_data)
264 """
265 if filename is None:
266 streampath, filename = split(streampath)
267 if self._last_open and not self._last_open.closed:
268 raise errors.AssertionError(
269 "can't open '{}' when '{}' is still open".format(
270 filename, self._last_open.name))
271 if streampath != self.current_stream_name():
272 self.start_new_stream(streampath)
273 self.set_current_file_name(filename)
274 self._last_open = _WriterFile(self, filename)
275 return self._last_open
276
278 data_buffer = b''.join(self._data_buffer)
279 if data_buffer:
280 self._current_stream_locators.append(
281 self._my_keep().put(
282 data_buffer[0:config.KEEP_BLOCK_SIZE],
283 copies=self.replication))
284 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
285 self._data_buffer_len = len(self._data_buffer[0])
286
290
292 if re.search(r'[\t\n]', newfilename):
293 raise errors.AssertionError(
294 "Manifest filenames cannot contain whitespace: %s" %
295 newfilename)
296 elif re.search(r'\x00', newfilename):
297 raise errors.AssertionError(
298 "Manifest filenames cannot contain NUL characters: %s" %
299 newfilename)
300 self._current_file_name = newfilename
301
303 return self._current_file_name
304
306 if self._current_file_name is None:
307 if self._current_file_pos == self._current_stream_length:
308 return
309 raise errors.AssertionError(
310 "Cannot finish an unnamed file " +
311 "(%d bytes at offset %d in '%s' stream)" %
312 (self._current_stream_length - self._current_file_pos,
313 self._current_file_pos,
314 self._current_stream_name))
315 self._current_stream_files.append([
316 self._current_file_pos,
317 self._current_stream_length - self._current_file_pos,
318 self._current_file_name])
319 self._current_file_pos = self._current_stream_length
320 self._current_file_name = None
321
325
327 if re.search(r'[\t\n]', newstreamname):
328 raise errors.AssertionError(
329 "Manifest stream names cannot contain whitespace: '%s'" %
330 (newstreamname))
331 self._current_stream_name = '.' if newstreamname=='' else newstreamname
332
334 return self._current_stream_name
335
337 self.finish_current_file()
338 self.flush_data()
339 if not self._current_stream_files:
340 pass
341 elif self._current_stream_name is None:
342 raise errors.AssertionError(
343 "Cannot finish an unnamed stream (%d bytes in %d files)" %
344 (self._current_stream_length, len(self._current_stream_files)))
345 else:
346 if not self._current_stream_locators:
347 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
348 self._finished_streams.append([self._current_stream_name,
349 self._current_stream_locators,
350 self._current_stream_files])
351 self._current_stream_files = []
352 self._current_stream_length = 0
353 self._current_stream_locators = []
354 self._current_stream_name = None
355 self._current_file_pos = 0
356 self._current_file_name = None
357
359 """Store the manifest in Keep and return its locator.
360
361 This is useful for storing manifest fragments (task outputs)
362 temporarily in Keep during a Crunch job.
363
364 In other cases you should make a collection instead, by
365 sending manifest_text() to the API server's "create
366 collection" endpoint.
367 """
368 return self._my_keep().put(self.manifest_text().encode(),
369 copies=self.replication)
370
374
375 - def manifest_text(self):
376 self.finish_current_stream()
377 manifest = ''
378
379 for stream in self._finished_streams:
380 if not re.search(r'^\.(/.*)?$', stream[0]):
381 manifest += './'
382 manifest += stream[0].replace(' ', '\\040')
383 manifest += ' ' + ' '.join(stream[1])
384 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
385 manifest += "\n"
386
387 return manifest
388
390 ret = []
391 for name, locators, files in self._finished_streams:
392 ret += locators
393 return ret
394
396 return self._api_client.collections().create(
397 ensure_unique_name=True,
398 body={
399 'name': name,
400 'manifest_text': self.manifest_text(),
401 }).execute(num_retries=self.num_retries)
402
405 """Deprecated, use Collection instead."""
406
407 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
408 '_current_stream_locators', '_current_stream_name',
409 '_current_file_name', '_current_file_pos', '_close_file',
410 '_data_buffer', '_dependencies', '_finished_streams',
411 '_queued_dirents', '_queued_trees']
412
413 - def __init__(self, api_client=None, **kwargs):
416
417 @classmethod
418 - def from_state(cls, state, *init_args, **init_kwargs):
419
420
421
422
423
424
425 writer = cls(*init_args, **init_kwargs)
426 for attr_name in cls.STATE_PROPS:
427 attr_value = state[attr_name]
428 attr_class = getattr(writer, attr_name).__class__
429
430
431 if attr_class not in (type(None), attr_value.__class__):
432 attr_value = attr_class(attr_value)
433 setattr(writer, attr_name, attr_value)
434
435 if any(KeepLocator(ls).permission_expired()
436 for ls in writer._current_stream_locators):
437 raise errors.StaleWriterStateError(
438 "locators include expired permission hint")
439 writer.check_dependencies()
440 if state['_current_file'] is not None:
441 path, pos = state['_current_file']
442 try:
443 writer._queued_file = open(path, 'rb')
444 writer._queued_file.seek(pos)
445 except IOError as error:
446 raise errors.StaleWriterStateError(
447 "failed to reopen active file {}: {}".format(path, error))
448 return writer
449
451 for path, orig_stat in listitems(self._dependencies):
452 if not S_ISREG(orig_stat[ST_MODE]):
453 raise errors.StaleWriterStateError("{} not file".format(path))
454 try:
455 now_stat = tuple(os.stat(path))
456 except OSError as error:
457 raise errors.StaleWriterStateError(
458 "failed to stat {}: {}".format(path, error))
459 if ((not S_ISREG(now_stat[ST_MODE])) or
460 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
461 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
462 raise errors.StaleWriterStateError("{} changed".format(path))
463
465 state = {attr: copy_func(getattr(self, attr))
466 for attr in self.STATE_PROPS}
467 if self._queued_file is None:
468 state['_current_file'] = None
469 else:
470 state['_current_file'] = (os.path.realpath(self._queued_file.name),
471 self._queued_file.tell())
472 return state
473
475 try:
476 src_path = os.path.realpath(source)
477 except Exception:
478 raise errors.AssertionError("{} not a file path".format(source))
479 try:
480 path_stat = os.stat(src_path)
481 except OSError as stat_error:
482 path_stat = None
483 super(ResumableCollectionWriter, self)._queue_file(source, filename)
484 fd_stat = os.fstat(self._queued_file.fileno())
485 if not S_ISREG(fd_stat.st_mode):
486
487
488 self._dependencies[source] = tuple(fd_stat)
489 elif path_stat is None:
490 raise errors.AssertionError(
491 "could not stat {}: {}".format(source, stat_error))
492 elif path_stat.st_ino != fd_stat.st_ino:
493 raise errors.AssertionError(
494 "{} changed between open and stat calls".format(source))
495 else:
496 self._dependencies[src_path] = tuple(fd_stat)
497
503
504
505 ADD = "add"
506 DEL = "del"
507 MOD = "mod"
508 TOK = "tok"
509 FILE = "file"
510 COLLECTION = "collection"
513 """Base class for Collections and Subcollections.
514
515 Implements the majority of functionality relating to accessing items in the
516 Collection.
517
518 """
519
521 self.parent = parent
522 self._committed = False
523 self._callback = None
524 self._items = {}
525
528
531
534
537
540
541 - def notify(self, event, collection, name, item):
543
546
547 @must_be_writable
548 @synchronized
550 """Recursively search the specified file path.
551
552 May return either a `Collection` or `ArvadosFile`. If not found, will
553 create a new item at the specified path based on `create_type`. Will
554 create intermediate subcollections needed to contain the final item in
555 the path.
556
557 :create_type:
558 One of `arvados.collection.FILE` or
559 `arvados.collection.COLLECTION`. If the path is not found, and value
560 of create_type is FILE then create and return a new ArvadosFile for
561 the last path component. If COLLECTION, then create and return a new
562 Collection for the last path component.
563
564 """
565
566 pathcomponents = path.split("/", 1)
567 if pathcomponents[0]:
568 item = self._items.get(pathcomponents[0])
569 if len(pathcomponents) == 1:
570 if item is None:
571
572 if create_type == COLLECTION:
573 item = Subcollection(self, pathcomponents[0])
574 else:
575 item = ArvadosFile(self, pathcomponents[0])
576 self._items[pathcomponents[0]] = item
577 self.set_committed(False)
578 self.notify(ADD, self, pathcomponents[0], item)
579 return item
580 else:
581 if item is None:
582
583 item = Subcollection(self, pathcomponents[0])
584 self._items[pathcomponents[0]] = item
585 self.set_committed(False)
586 self.notify(ADD, self, pathcomponents[0], item)
587 if isinstance(item, RichCollectionBase):
588 return item.find_or_create(pathcomponents[1], create_type)
589 else:
590 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
591 else:
592 return self
593
594 @synchronized
595 - def find(self, path):
596 """Recursively search the specified file path.
597
598 May return either a Collection or ArvadosFile. Return None if not
599 found.
600 If path is invalid (ex: starts with '/'), an IOError exception will be
601 raised.
602
603 """
604 if not path:
605 raise errors.ArgumentError("Parameter 'path' is empty.")
606
607 pathcomponents = path.split("/", 1)
608 if pathcomponents[0] == '':
609 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
610
611 item = self._items.get(pathcomponents[0])
612 if item is None:
613 return None
614 elif len(pathcomponents) == 1:
615 return item
616 else:
617 if isinstance(item, RichCollectionBase):
618 if pathcomponents[1]:
619 return item.find(pathcomponents[1])
620 else:
621 return item
622 else:
623 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
624
625 @synchronized
627 """Recursive subcollection create.
628
629 Like `os.makedirs()`. Will create intermediate subcollections needed
630 to contain the leaf subcollection path.
631
632 """
633
634 if self.find(path) != None:
635 raise IOError(errno.EEXIST, "Directory or file exists", path)
636
637 return self.find_or_create(path, COLLECTION)
638
639 - def open(self, path, mode="r"):
640 """Open a file-like object for access.
641
642 :path:
643 path to a file in the collection
644 :mode:
645 a string consisting of "r", "w", or "a", optionally followed
646 by "b" or "t", optionally followed by "+".
647 :"b":
648 binary mode: write() accepts bytes, read() returns bytes.
649 :"t":
650 text mode (default): write() accepts strings, read() returns strings.
651 :"r":
652 opens for reading
653 :"r+":
654 opens for reading and writing. Reads/writes share a file pointer.
655 :"w", "w+":
656 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
657 :"a", "a+":
658 opens for reading and writing. All writes are appended to
659 the end of the file. Writing does not affect the file pointer for
660 reading.
661 """
662
663 if not re.search(r'^[rwa][bt]?\+?$', mode):
664 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
665
666 if mode[0] == 'r' and '+' not in mode:
667 fclass = ArvadosFileReader
668 arvfile = self.find(path)
669 elif not self.writable():
670 raise IOError(errno.EROFS, "Collection is read only")
671 else:
672 fclass = ArvadosFileWriter
673 arvfile = self.find_or_create(path, FILE)
674
675 if arvfile is None:
676 raise IOError(errno.ENOENT, "File not found", path)
677 if not isinstance(arvfile, ArvadosFile):
678 raise IOError(errno.EISDIR, "Is a directory", path)
679
680 if mode[0] == 'w':
681 arvfile.truncate(0)
682
683 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
684
686 """Determine if the collection has been modified since last commited."""
687 return not self.committed()
688
689 @synchronized
691 """Determine if the collection has been committed to the API server."""
692 return self._committed
693
694 @synchronized
696 """Recursively set committed flag.
697
698 If value is True, set committed to be True for this and all children.
699
700 If value is False, set committed to be False for this and all parents.
701 """
702 if value == self._committed:
703 return
704 if value:
705 for k,v in listitems(self._items):
706 v.set_committed(True)
707 self._committed = True
708 else:
709 self._committed = False
710 if self.parent is not None:
711 self.parent.set_committed(False)
712
713 @synchronized
715 """Iterate over names of files and collections contained in this collection."""
716 return iter(viewkeys(self._items))
717
718 @synchronized
720 """Get a file or collection that is directly contained by this collection.
721
722 If you want to search a path, use `find()` instead.
723
724 """
725 return self._items[k]
726
727 @synchronized
729 """Test if there is a file or collection a directly contained by this collection."""
730 return k in self._items
731
732 @synchronized
734 """Get the number of items directly contained in this collection."""
735 return len(self._items)
736
737 @must_be_writable
738 @synchronized
740 """Delete an item by name which is directly contained by this collection."""
741 del self._items[p]
742 self.set_committed(False)
743 self.notify(DEL, self, p, None)
744
745 @synchronized
747 """Get a list of names of files and collections directly contained in this collection."""
748 return self._items.keys()
749
750 @synchronized
752 """Get a list of files and collection objects directly contained in this collection."""
753 return listvalues(self._items)
754
755 @synchronized
757 """Get a list of (name, object) tuples directly contained in this collection."""
758 return listitems(self._items)
759
761 """Test if there is a file or collection at `path`."""
762 return self.find(path) is not None
763
764 @must_be_writable
765 @synchronized
766 - def remove(self, path, recursive=False):
767 """Remove the file or subcollection (directory) at `path`.
768
769 :recursive:
770 Specify whether to remove non-empty subcollections (True), or raise an error (False).
771 """
772
773 if not path:
774 raise errors.ArgumentError("Parameter 'path' is empty.")
775
776 pathcomponents = path.split("/", 1)
777 item = self._items.get(pathcomponents[0])
778 if item is None:
779 raise IOError(errno.ENOENT, "File not found", path)
780 if len(pathcomponents) == 1:
781 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
782 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
783 deleteditem = self._items[pathcomponents[0]]
784 del self._items[pathcomponents[0]]
785 self.set_committed(False)
786 self.notify(DEL, self, pathcomponents[0], deleteditem)
787 else:
788 item.remove(pathcomponents[1])
789
791 for k,v in listitems(source):
792 self._items[k] = v.clone(self, k)
793
796
797 @must_be_writable
798 @synchronized
799 - def add(self, source_obj, target_name, overwrite=False, reparent=False):
800 """Copy or move a file or subcollection to this collection.
801
802 :source_obj:
803 An ArvadosFile, or Subcollection object
804
805 :target_name:
806 Destination item name. If the target name already exists and is a
807 file, this will raise an error unless you specify `overwrite=True`.
808
809 :overwrite:
810 Whether to overwrite target file if it already exists.
811
812 :reparent:
813 If True, source_obj will be moved from its parent collection to this collection.
814 If False, source_obj will be copied and the parent collection will be
815 unmodified.
816
817 """
818
819 if target_name in self and not overwrite:
820 raise IOError(errno.EEXIST, "File already exists", target_name)
821
822 modified_from = None
823 if target_name in self:
824 modified_from = self[target_name]
825
826
827 if reparent:
828 source_obj._reparent(self, target_name)
829 item = source_obj
830 else:
831 item = source_obj.clone(self, target_name)
832
833 self._items[target_name] = item
834 self.set_committed(False)
835
836 if modified_from:
837 self.notify(MOD, self, target_name, (modified_from, item))
838 else:
839 self.notify(ADD, self, target_name, item)
840
841 - def _get_src_target(self, source, target_path, source_collection, create_dest):
842 if source_collection is None:
843 source_collection = self
844
845
846 if isinstance(source, basestring):
847 source_obj = source_collection.find(source)
848 if source_obj is None:
849 raise IOError(errno.ENOENT, "File not found", source)
850 sourcecomponents = source.split("/")
851 else:
852 source_obj = source
853 sourcecomponents = None
854
855
856 targetcomponents = target_path.split("/")
857
858
859 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
860
861 if not target_name:
862 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
863
864 if create_dest:
865 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
866 else:
867 if len(targetcomponents) > 1:
868 target_dir = self.find("/".join(targetcomponents[0:-1]))
869 else:
870 target_dir = self
871
872 if target_dir is None:
873 raise IOError(errno.ENOENT, "Target directory not found", target_name)
874
875 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
876 target_dir = target_dir[target_name]
877 target_name = sourcecomponents[-1]
878
879 return (source_obj, target_dir, target_name)
880
881 @must_be_writable
882 @synchronized
883 - def copy(self, source, target_path, source_collection=None, overwrite=False):
884 """Copy a file or subcollection to a new path in this collection.
885
886 :source:
887 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
888
889 :target_path:
890 Destination file or path. If the target path already exists and is a
891 subcollection, the item will be placed inside the subcollection. If
892 the target path already exists and is a file, this will raise an error
893 unless you specify `overwrite=True`.
894
895 :source_collection:
896 Collection to copy `source_path` from (default `self`)
897
898 :overwrite:
899 Whether to overwrite target file if it already exists.
900 """
901
902 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
903 target_dir.add(source_obj, target_name, overwrite, False)
904
905 @must_be_writable
906 @synchronized
907 - def rename(self, source, target_path, source_collection=None, overwrite=False):
908 """Move a file or subcollection from `source_collection` to a new path in this collection.
909
910 :source:
911 A string with a path to source file or subcollection.
912
913 :target_path:
914 Destination file or path. If the target path already exists and is a
915 subcollection, the item will be placed inside the subcollection. If
916 the target path already exists and is a file, this will raise an error
917 unless you specify `overwrite=True`.
918
919 :source_collection:
920 Collection to copy `source_path` from (default `self`)
921
922 :overwrite:
923 Whether to overwrite target file if it already exists.
924 """
925
926 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
927 if not source_obj.writable():
928 raise IOError(errno.EROFS, "Source collection is read only", source)
929 target_dir.add(source_obj, target_name, overwrite, True)
930
931 - def portable_manifest_text(self, stream_name="."):
932 """Get the manifest text for this collection, sub collections and files.
933
934 This method does not flush outstanding blocks to Keep. It will return
935 a normalized manifest with access tokens stripped.
936
937 :stream_name:
938 Name to use for this stream (directory)
939
940 """
941 return self._get_manifest_text(stream_name, True, True)
942
943 @synchronized
944 - def manifest_text(self, stream_name=".", strip=False, normalize=False,
945 only_committed=False):
946 """Get the manifest text for this collection, sub collections and files.
947
948 This method will flush outstanding blocks to Keep. By default, it will
949 not normalize an unmodified manifest or strip access tokens.
950
951 :stream_name:
952 Name to use for this stream (directory)
953
954 :strip:
955 If True, remove signing tokens from block locators if present.
956 If False (default), block locators are left unchanged.
957
958 :normalize:
959 If True, always export the manifest text in normalized form
960 even if the Collection is not modified. If False (default) and the collection
961 is not modified, return the original manifest text even if it is not
962 in normalized form.
963
964 :only_committed:
965 If True, don't commit pending blocks.
966
967 """
968
969 if not only_committed:
970 self._my_block_manager().commit_all()
971 return self._get_manifest_text(stream_name, strip, normalize,
972 only_committed=only_committed)
973
974 @synchronized
975 - def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
976 """Get the manifest text for this collection, sub collections and files.
977
978 :stream_name:
979 Name to use for this stream (directory)
980
981 :strip:
982 If True, remove signing tokens from block locators if present.
983 If False (default), block locators are left unchanged.
984
985 :normalize:
986 If True, always export the manifest text in normalized form
987 even if the Collection is not modified. If False (default) and the collection
988 is not modified, return the original manifest text even if it is not
989 in normalized form.
990
991 :only_committed:
992 If True, only include blocks that were already committed to Keep.
993
994 """
995
996 if not self.committed() or self._manifest_text is None or normalize:
997 stream = {}
998 buf = []
999 sorted_keys = sorted(self.keys())
1000 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1001
1002 arvfile = self[filename]
1003 filestream = []
1004 for segment in arvfile.segments():
1005 loc = segment.locator
1006 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1007 if only_committed:
1008 continue
1009 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1010 if strip:
1011 loc = KeepLocator(loc).stripped()
1012 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1013 segment.segment_offset, segment.range_size))
1014 stream[filename] = filestream
1015 if stream:
1016 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1017 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1018 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1019 return "".join(buf)
1020 else:
1021 if strip:
1022 return self.stripped_manifest()
1023 else:
1024 return self._manifest_text
1025
1026 @synchronized
1027 - def diff(self, end_collection, prefix=".", holding_collection=None):
1028 """Generate list of add/modify/delete actions.
1029
1030 When given to `apply`, will change `self` to match `end_collection`
1031
1032 """
1033 changes = []
1034 if holding_collection is None:
1035 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1036 for k in self:
1037 if k not in end_collection:
1038 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1039 for k in end_collection:
1040 if k in self:
1041 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1042 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1043 elif end_collection[k] != self[k]:
1044 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1045 else:
1046 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1047 else:
1048 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1049 return changes
1050
1051 @must_be_writable
1052 @synchronized
1053 - def apply(self, changes):
1054 """Apply changes from `diff`.
1055
1056 If a change conflicts with a local change, it will be saved to an
1057 alternate path indicating the conflict.
1058
1059 """
1060 if changes:
1061 self.set_committed(False)
1062 for change in changes:
1063 event_type = change[0]
1064 path = change[1]
1065 initial = change[2]
1066 local = self.find(path)
1067 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1068 time.gmtime()))
1069 if event_type == ADD:
1070 if local is None:
1071
1072 self.copy(initial, path)
1073 elif local is not None and local != initial:
1074
1075
1076 self.copy(initial, conflictpath)
1077 elif event_type == MOD or event_type == TOK:
1078 final = change[3]
1079 if local == initial:
1080
1081
1082 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1083
1084 local.replace_contents(final)
1085 else:
1086
1087
1088 self.copy(final, path, overwrite=True)
1089 else:
1090
1091
1092 self.copy(final, conflictpath)
1093 elif event_type == DEL:
1094 if local == initial:
1095
1096 self.remove(path, recursive=True)
1097
1098
1099
1101 """Get the portable data hash for this collection's manifest."""
1102 if self._manifest_locator and self.committed():
1103
1104
1105 return self._portable_data_hash
1106 else:
1107 stripped = self.portable_manifest_text().encode()
1108 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1109
1110 @synchronized
1112 if self._callback is None:
1113 self._callback = callback
1114 else:
1115 raise errors.ArgumentError("A callback is already set on this collection.")
1116
1117 @synchronized
1119 if self._callback is not None:
1120 self._callback = None
1121
1122 @synchronized
1123 - def notify(self, event, collection, name, item):
1127
1128 @synchronized
1130 if other is self:
1131 return True
1132 if not isinstance(other, RichCollectionBase):
1133 return False
1134 if len(self._items) != len(other):
1135 return False
1136 for k in self._items:
1137 if k not in other:
1138 return False
1139 if self._items[k] != other[k]:
1140 return False
1141 return True
1142
1144 return not self.__eq__(other)
1145
1146 @synchronized
1148 """Flush bufferblocks to Keep."""
1149 for e in listvalues(self):
1150 e.flush()
1151
1154 """Represents the root of an Arvados Collection.
1155
1156 This class is threadsafe. The root collection object, all subcollections
1157 and files are protected by a single lock (i.e. each access locks the entire
1158 collection).
1159
1160 Brief summary of
1161 useful methods:
1162
1163 :To read an existing file:
1164 `c.open("myfile", "r")`
1165
1166 :To write a new file:
1167 `c.open("myfile", "w")`
1168
1169 :To determine if a file exists:
1170 `c.find("myfile") is not None`
1171
1172 :To copy a file:
1173 `c.copy("source", "dest")`
1174
1175 :To delete a file:
1176 `c.remove("myfile")`
1177
1178 :To save to an existing collection record:
1179 `c.save()`
1180
1181 :To save a new collection record:
1182 `c.save_new()`
1183
1184 :To merge remote changes into this object:
1185 `c.update()`
1186
1187 Must be associated with an API server Collection record (during
1188 initialization, or using `save_new`) to use `save` or `update`
1189
1190 """
1191
1192 - def __init__(self, manifest_locator_or_text=None,
1193 api_client=None,
1194 keep_client=None,
1195 num_retries=None,
1196 parent=None,
1197 apiconfig=None,
1198 block_manager=None,
1199 replication_desired=None,
1200 put_threads=None):
1201 """Collection constructor.
1202
1203 :manifest_locator_or_text:
1204 An Arvados collection UUID, portable data hash, raw manifest
1205 text, or (if creating an empty collection) None.
1206
1207 :parent:
1208 the parent Collection, may be None.
1209
1210 :apiconfig:
1211 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1212 Prefer this over supplying your own api_client and keep_client (except in testing).
1213 Will use default config settings if not specified.
1214
1215 :api_client:
1216 The API client object to use for requests. If not specified, create one using `apiconfig`.
1217
1218 :keep_client:
1219 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1220
1221 :num_retries:
1222 the number of retries for API and Keep requests.
1223
1224 :block_manager:
1225 the block manager to use. If not specified, create one.
1226
1227 :replication_desired:
1228 How many copies should Arvados maintain. If None, API server default
1229 configuration applies. If not None, this value will also be used
1230 for determining the number of block copies being written.
1231
1232 """
1233 super(Collection, self).__init__(parent)
1234 self._api_client = api_client
1235 self._keep_client = keep_client
1236 self._block_manager = block_manager
1237 self.replication_desired = replication_desired
1238 self.put_threads = put_threads
1239
1240 if apiconfig:
1241 self._config = apiconfig
1242 else:
1243 self._config = config.settings()
1244
1245 self.num_retries = num_retries if num_retries is not None else 0
1246 self._manifest_locator = None
1247 self._manifest_text = None
1248 self._portable_data_hash = None
1249 self._api_response = None
1250 self._past_versions = set()
1251
1252 self.lock = threading.RLock()
1253 self.events = None
1254
1255 if manifest_locator_or_text:
1256 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1257 self._manifest_locator = manifest_locator_or_text
1258 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1259 self._manifest_locator = manifest_locator_or_text
1260 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1261 self._manifest_text = manifest_locator_or_text
1262 else:
1263 raise errors.ArgumentError(
1264 "Argument to CollectionReader is not a manifest or a collection UUID")
1265
1266 try:
1267 self._populate()
1268 except (IOError, errors.SyntaxError) as e:
1269 raise errors.ArgumentError("Error processing manifest text: %s", e)
1270
1273
1275 if self._api_response and self._api_response["properties"]:
1276 return self._api_response["properties"]
1277 else:
1278 return {}
1279
1281 if self._api_response and self._api_response["trash_at"]:
1282 return ciso8601.parse_datetime(self._api_response["trash_at"])
1283 else:
1284 return None
1285
1288
1291
1292 @synchronized
1294 return modified_at_and_portable_data_hash in self._past_versions
1295
1296 @synchronized
1297 @retry_method
1298 - def update(self, other=None, num_retries=None):
1318
1319 @synchronized
1321 if self._api_client is None:
1322 self._api_client = ThreadSafeApiCache(self._config)
1323 if self._keep_client is None:
1324 self._keep_client = self._api_client.keep
1325 return self._api_client
1326
1327 @synchronized
1329 if self._keep_client is None:
1330 if self._api_client is None:
1331 self._my_api()
1332 else:
1333 self._keep_client = KeepClient(api_client=self._api_client)
1334 return self._keep_client
1335
1336 @synchronized
1338 if self._block_manager is None:
1339 copies = (self.replication_desired or
1340 self._my_api()._rootDesc.get('defaultCollectionReplication',
1341 2))
1342 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1343 return self._block_manager
1344
1348
1350
1351
1352
1353
1354
1355
1356
1357 self._remember_api_response(self._my_api().collections().get(
1358 uuid=self._manifest_locator).execute(
1359 num_retries=self.num_retries))
1360 self._manifest_text = self._api_response['manifest_text']
1361 self._portable_data_hash = self._api_response['portable_data_hash']
1362
1363
1364 if self.replication_desired is None:
1365 self.replication_desired = self._api_response.get('replication_desired', None)
1366
1368 if self._manifest_text is None:
1369 if self._manifest_locator is None:
1370 return
1371 else:
1372 self._populate_from_api_server()
1373 self._baseline_manifest = self._manifest_text
1374 self._import_manifest(self._manifest_text)
1375
1378
1381
1382 - def __exit__(self, exc_type, exc_value, traceback):
1388
1390 if self._block_manager is not None:
1391 self._block_manager.stop_threads()
1392
1393 @synchronized
1395 """Get the manifest locator, if any.
1396
1397 The manifest locator will be set when the collection is loaded from an
1398 API server record or the portable data hash of a manifest.
1399
1400 The manifest locator will be None if the collection is newly created or
1401 was created directly from manifest text. The method `save_new()` will
1402 assign a manifest locator.
1403
1404 """
1405 return self._manifest_locator
1406
1407 @synchronized
1408 - def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1409 if new_config is None:
1410 new_config = self._config
1411 if readonly:
1412 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1413 else:
1414 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1415
1416 newcollection._clonefrom(self)
1417 return newcollection
1418
1419 @synchronized
1421 """Returns information about this Collection fetched from the API server.
1422
1423 If the Collection exists in Keep but not the API server, currently
1424 returns None. Future versions may provide a synthetic response.
1425
1426 """
1427 return self._api_response
1428
1430 """See `RichCollectionBase.find_or_create`"""
1431 if path == ".":
1432 return self
1433 else:
1434 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1435
1436 - def find(self, path):
1437 """See `RichCollectionBase.find`"""
1438 if path == ".":
1439 return self
1440 else:
1441 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1442
1443 - def remove(self, path, recursive=False):
1444 """See `RichCollectionBase.remove`"""
1445 if path == ".":
1446 raise errors.ArgumentError("Cannot remove '.'")
1447 else:
1448 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1449
1450 @must_be_writable
1451 @synchronized
1452 @retry_method
1453 - def save(self,
1454 properties=None,
1455 storage_classes=None,
1456 trash_at=None,
1457 merge=True,
1458 num_retries=None):
1459 """Save collection to an existing collection record.
1460
1461 Commit pending buffer blocks to Keep, merge with remote record (if
1462 merge=True, the default), and update the collection record. Returns
1463 the current manifest text.
1464
1465 Will raise AssertionError if not associated with a collection record on
1466 the API server. If you want to save a manifest to Keep only, see
1467 `save_new()`.
1468
1469 :properties:
1470 Additional properties of collection. This value will replace any existing
1471 properties of collection.
1472
1473 :storage_classes:
1474 Specify desirable storage classes to be used when writing data to Keep.
1475
1476 :trash_at:
1477 A collection is *expiring* when it has a *trash_at* time in the future.
1478 An expiring collection can be accessed as normal,
1479 but is scheduled to be trashed automatically at the *trash_at* time.
1480
1481 :merge:
1482 Update and merge remote changes before saving. Otherwise, any
1483 remote changes will be ignored and overwritten.
1484
1485 :num_retries:
1486 Retry count on API calls (if None, use the collection default)
1487
1488 """
1489 if properties and type(properties) is not dict:
1490 raise errors.ArgumentError("properties must be dictionary type.")
1491
1492 if storage_classes and type(storage_classes) is not list:
1493 raise errors.ArgumentError("storage_classes must be list type.")
1494
1495 if trash_at and type(trash_at) is not datetime.datetime:
1496 raise errors.ArgumentError("trash_at must be datetime type.")
1497
1498 body={}
1499 if properties:
1500 body["properties"] = properties
1501 if storage_classes:
1502 body["storage_classes_desired"] = storage_classes
1503 if trash_at:
1504 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1505 body["trash_at"] = t
1506
1507 if not self.committed():
1508 if not self._has_collection_uuid():
1509 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1510
1511 self._my_block_manager().commit_all()
1512
1513 if merge:
1514 self.update()
1515
1516 text = self.manifest_text(strip=False)
1517 body['manifest_text'] = text
1518
1519 self._remember_api_response(self._my_api().collections().update(
1520 uuid=self._manifest_locator,
1521 body=body
1522 ).execute(num_retries=num_retries))
1523 self._manifest_text = self._api_response["manifest_text"]
1524 self._portable_data_hash = self._api_response["portable_data_hash"]
1525 self.set_committed(True)
1526 elif body:
1527 self._remember_api_response(self._my_api().collections().update(
1528 uuid=self._manifest_locator,
1529 body=body
1530 ).execute(num_retries=num_retries))
1531
1532 return self._manifest_text
1533
1534
1535 @must_be_writable
1536 @synchronized
1537 @retry_method
1538 - def save_new(self, name=None,
1539 create_collection_record=True,
1540 owner_uuid=None,
1541 properties=None,
1542 storage_classes=None,
1543 trash_at=None,
1544 ensure_unique_name=False,
1545 num_retries=None):
1546 """Save collection to a new collection record.
1547
1548 Commit pending buffer blocks to Keep and, when create_collection_record
1549 is True (default), create a new collection record. After creating a
1550 new collection record, this Collection object will be associated with
1551 the new record used by `save()`. Returns the current manifest text.
1552
1553 :name:
1554 The collection name.
1555
1556 :create_collection_record:
1557 If True, create a collection record on the API server.
1558 If False, only commit blocks to Keep and return the manifest text.
1559
1560 :owner_uuid:
1561 the user, or project uuid that will own this collection.
1562 If None, defaults to the current user.
1563
1564 :properties:
1565 Additional properties of collection. This value will replace any existing
1566 properties of collection.
1567
1568 :storage_classes:
1569 Specify desirable storage classes to be used when writing data to Keep.
1570
1571 :trash_at:
1572 A collection is *expiring* when it has a *trash_at* time in the future.
1573 An expiring collection can be accessed as normal,
1574 but is scheduled to be trashed automatically at the *trash_at* time.
1575
1576 :ensure_unique_name:
1577 If True, ask the API server to rename the collection
1578 if it conflicts with a collection with the same name and owner. If
1579 False, a name conflict will result in an error.
1580
1581 :num_retries:
1582 Retry count on API calls (if None, use the collection default)
1583
1584 """
1585 if properties and type(properties) is not dict:
1586 raise errors.ArgumentError("properties must be dictionary type.")
1587
1588 if storage_classes and type(storage_classes) is not list:
1589 raise errors.ArgumentError("storage_classes must be list type.")
1590
1591 if trash_at and type(trash_at) is not datetime.datetime:
1592 raise errors.ArgumentError("trash_at must be datetime type.")
1593
1594 self._my_block_manager().commit_all()
1595 text = self.manifest_text(strip=False)
1596
1597 if create_collection_record:
1598 if name is None:
1599 name = "New collection"
1600 ensure_unique_name = True
1601
1602 body = {"manifest_text": text,
1603 "name": name,
1604 "replication_desired": self.replication_desired}
1605 if owner_uuid:
1606 body["owner_uuid"] = owner_uuid
1607 if properties:
1608 body["properties"] = properties
1609 if storage_classes:
1610 body["storage_classes_desired"] = storage_classes
1611 if trash_at:
1612 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1613 body["trash_at"] = t
1614
1615 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1616 text = self._api_response["manifest_text"]
1617
1618 self._manifest_locator = self._api_response["uuid"]
1619 self._portable_data_hash = self._api_response["portable_data_hash"]
1620
1621 self._manifest_text = text
1622 self.set_committed(True)
1623
1624 return text
1625
1626 _token_re = re.compile(r'(\S+)(\s+|$)')
1627 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1628 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1629
1631 return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1632
1633 @synchronized
1635 """Import a manifest into a `Collection`.
1636
1637 :manifest_text:
1638 The manifest text to import from.
1639
1640 """
1641 if len(self) > 0:
1642 raise ArgumentError("Can only import manifest into an empty collection")
1643
1644 STREAM_NAME = 0
1645 BLOCKS = 1
1646 SEGMENTS = 2
1647
1648 stream_name = None
1649 state = STREAM_NAME
1650
1651 for token_and_separator in self._token_re.finditer(manifest_text):
1652 tok = token_and_separator.group(1)
1653 sep = token_and_separator.group(2)
1654
1655 if state == STREAM_NAME:
1656
1657 stream_name = self._unescape_manifest_path(tok)
1658 blocks = []
1659 segments = []
1660 streamoffset = 0
1661 state = BLOCKS
1662 self.find_or_create(stream_name, COLLECTION)
1663 continue
1664
1665 if state == BLOCKS:
1666 block_locator = self._block_re.match(tok)
1667 if block_locator:
1668 blocksize = int(block_locator.group(1))
1669 blocks.append(Range(tok, streamoffset, blocksize, 0))
1670 streamoffset += blocksize
1671 else:
1672 state = SEGMENTS
1673
1674 if state == SEGMENTS:
1675 file_segment = self._segment_re.match(tok)
1676 if file_segment:
1677 pos = int(file_segment.group(1))
1678 size = int(file_segment.group(2))
1679 name = self._unescape_manifest_path(file_segment.group(3))
1680 if name.split('/')[-1] == '.':
1681
1682 if len(name) > 2:
1683 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1684 else:
1685 filepath = os.path.join(stream_name, name)
1686 afile = self.find_or_create(filepath, FILE)
1687 if isinstance(afile, ArvadosFile):
1688 afile.add_segment(blocks, pos, size)
1689 else:
1690 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1691 else:
1692
1693 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1694
1695 if sep == "\n":
1696 stream_name = None
1697 state = STREAM_NAME
1698
1699 self.set_committed(True)
1700
1701 @synchronized
1702 - def notify(self, event, collection, name, item):
1703 if self._callback:
1704 self._callback(event, collection, name, item)
1705
1708 """This is a subdirectory within a collection that doesn't have its own API
1709 server record.
1710
1711 Subcollection locking falls under the umbrella lock of its root collection.
1712
1713 """
1714
1721
1724
1727
1730
1733
1736
1739
1740 @synchronized
1741 - def clone(self, new_parent, new_name):
1745
1746 @must_be_writable
1747 @synchronized
1755
1758 """A read-only collection object.
1759
1760 Initialize from a collection UUID or portable data hash, or raw
1761 manifest text. See `Collection` constructor for detailed options.
1762
1763 """
1764 - def __init__(self, manifest_locator_or_text, *args, **kwargs):
1765 self._in_init = True
1766 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1767 self._in_init = False
1768
1769
1770 self.lock = NoopLock()
1771
1772
1773
1774 self._streams = None
1775
1777 return self._in_init
1778
1780 @functools.wraps(orig_func)
1781 def populate_streams_wrapper(self, *args, **kwargs):
1782
1783 if self._streams is None:
1784 if self._manifest_text:
1785 self._streams = [sline.split()
1786 for sline in self._manifest_text.split("\n")
1787 if sline]
1788 else:
1789 self._streams = []
1790 return orig_func(self, *args, **kwargs)
1791 return populate_streams_wrapper
1792
1793 @_populate_streams
1795 """Normalize the streams returned by `all_streams`.
1796
1797 This method is kept for backwards compatability and only affects the
1798 behavior of `all_streams()` and `all_files()`
1799
1800 """
1801
1802
1803 streams = {}
1804 for s in self.all_streams():
1805 for f in s.all_files():
1806 streamname, filename = split(s.name() + "/" + f.name())
1807 if streamname not in streams:
1808 streams[streamname] = {}
1809 if filename not in streams[streamname]:
1810 streams[streamname][filename] = []
1811 for r in f.segments:
1812 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1813
1814 self._streams = [normalize_stream(s, streams[s])
1815 for s in sorted(streams)]
1816 @_populate_streams
1818 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1819 for s in self._streams]
1820
1821 @_populate_streams
1826