arvados.collection

Tools to work with Arvados collections

This module provides high-level interfaces to create, read, and update Arvados collections. Most users will want to instantiate Collection objects, and use methods like Collection.open and Collection.mkdirs to read and write data in the collection. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4"""Tools to work with Arvados collections
   5
   6This module provides high-level interfaces to create, read, and update
   7Arvados collections. Most users will want to instantiate `Collection`
   8objects, and use methods like `Collection.open` and `Collection.mkdirs` to
   9read and write data in the collection. Refer to the Arvados Python SDK
  10cookbook for [an introduction to using the Collection class][cookbook].
  11
  12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
  13"""
  14
  15from __future__ import absolute_import
  16from future.utils import listitems, listvalues, viewkeys
  17from builtins import str
  18from past.builtins import basestring
  19from builtins import object
  20import ciso8601
  21import datetime
  22import errno
  23import functools
  24import hashlib
  25import io
  26import logging
  27import os
  28import re
  29import sys
  30import threading
  31import time
  32
  33from collections import deque
  34from stat import *
  35
  36from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
  37from .keep import KeepLocator, KeepClient
  38from .stream import StreamReader
  39from ._normalize_stream import normalize_stream, escape
  40from ._ranges import Range, LocatorAndRange
  41from .safeapi import ThreadSafeApiCache
  42import arvados.config as config
  43import arvados.errors as errors
  44import arvados.util
  45import arvados.events as events
  46from arvados.retry import retry_method
  47
  48from typing import (
  49    Any,
  50    Callable,
  51    Dict,
  52    IO,
  53    Iterator,
  54    List,
  55    Mapping,
  56    Optional,
  57    Tuple,
  58    Union,
  59)
  60
  61if sys.version_info < (3, 8):
  62    from typing_extensions import Literal
  63else:
  64    from typing import Literal
  65
  66_logger = logging.getLogger('arvados.collection')
  67
  68ADD = "add"
  69"""Argument value for `Collection` methods to represent an added item"""
  70DEL = "del"
  71"""Argument value for `Collection` methods to represent a removed item"""
  72MOD = "mod"
  73"""Argument value for `Collection` methods to represent a modified item"""
  74TOK = "tok"
  75"""Argument value for `Collection` methods to represent an item with token differences"""
  76FILE = "file"
  77"""`create_type` value for `Collection.find_or_create`"""
  78COLLECTION = "collection"
  79"""`create_type` value for `Collection.find_or_create`"""
  80
  81ChangeList = List[Union[
  82    Tuple[Literal[ADD, DEL], str, 'Collection'],
  83    Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
  84]]
  85ChangeType = Literal[ADD, DEL, MOD, TOK]
  86CollectionItem = Union[ArvadosFile, 'Collection']
  87ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
  88CreateType = Literal[COLLECTION, FILE]
  89Properties = Dict[str, Any]
  90StorageClasses = List[str]
  91
  92class CollectionBase(object):
  93    """Abstract base class for Collection classes
  94
  95    .. ATTENTION:: Internal
  96       This class is meant to be used by other parts of the SDK. User code
  97       should instantiate or subclass `Collection` or one of its subclasses
  98       directly.
  99    """
 100
 101    def __enter__(self):
 102        """Enter a context block with this collection instance"""
 103        return self
 104
 105    def __exit__(self, exc_type, exc_value, traceback):
 106        """Exit a context block with this collection instance"""
 107        pass
 108
 109    def _my_keep(self):
 110        if self._keep_client is None:
 111            self._keep_client = KeepClient(api_client=self._api_client,
 112                                           num_retries=self.num_retries)
 113        return self._keep_client
 114
 115    def stripped_manifest(self) -> str:
 116        """Create a copy of the collection manifest with only size hints
 117
 118        This method returns a string with the current collection's manifest
 119        text with all non-portable locator hints like permission hints and
 120        remote cluster hints removed. The only hints in the returned manifest
 121        will be size hints.
 122        """
 123        raw = self.manifest_text()
 124        clean = []
 125        for line in raw.split("\n"):
 126            fields = line.split()
 127            if fields:
 128                clean_fields = fields[:1] + [
 129                    (re.sub(r'\+[^\d][^\+]*', '', x)
 130                     if re.match(arvados.util.keep_locator_pattern, x)
 131                     else x)
 132                    for x in fields[1:]]
 133                clean += [' '.join(clean_fields), "\n"]
 134        return ''.join(clean)
 135
 136
 137class _WriterFile(_FileLikeObjectBase):
 138    def __init__(self, coll_writer, name):
 139        super(_WriterFile, self).__init__(name, 'wb')
 140        self.dest = coll_writer
 141
 142    def close(self):
 143        super(_WriterFile, self).close()
 144        self.dest.finish_current_file()
 145
 146    @_FileLikeObjectBase._before_close
 147    def write(self, data):
 148        self.dest.write(data)
 149
 150    @_FileLikeObjectBase._before_close
 151    def writelines(self, seq):
 152        for data in seq:
 153            self.write(data)
 154
 155    @_FileLikeObjectBase._before_close
 156    def flush(self):
 157        self.dest.flush_data()
 158
 159
 160class RichCollectionBase(CollectionBase):
 161    """Base class for Collection classes
 162
 163    .. ATTENTION:: Internal
 164       This class is meant to be used by other parts of the SDK. User code
 165       should instantiate or subclass `Collection` or one of its subclasses
 166       directly.
 167    """
 168
 169    def __init__(self, parent=None):
 170        self.parent = parent
 171        self._committed = False
 172        self._has_remote_blocks = False
 173        self._callback = None
 174        self._items = {}
 175
 176    def _my_api(self):
 177        raise NotImplementedError()
 178
 179    def _my_keep(self):
 180        raise NotImplementedError()
 181
 182    def _my_block_manager(self):
 183        raise NotImplementedError()
 184
 185    def writable(self) -> bool:
 186        """Indicate whether this collection object can be modified
 187
 188        This method returns `False` if this object is a `CollectionReader`,
 189        else `True`.
 190        """
 191        raise NotImplementedError()
 192
 193    def root_collection(self) -> 'Collection':
 194        """Get this collection's root collection object
 195
 196        If you open a subcollection with `Collection.find`, calling this method
 197        on that subcollection returns the source Collection object.
 198        """
 199        raise NotImplementedError()
 200
 201    def stream_name(self) -> str:
 202        """Get the name of the manifest stream represented by this collection
 203
 204        If you open a subcollection with `Collection.find`, calling this method
 205        on that subcollection returns the name of the stream you opened.
 206        """
 207        raise NotImplementedError()
 208
 209    @synchronized
 210    def has_remote_blocks(self) -> bool:
 211        """Indiciate whether the collection refers to remote data
 212
 213        Returns `True` if the collection manifest includes any Keep locators
 214        with a remote hint (`+R`), else `False`.
 215        """
 216        if self._has_remote_blocks:
 217            return True
 218        for item in self:
 219            if self[item].has_remote_blocks():
 220                return True
 221        return False
 222
 223    @synchronized
 224    def set_has_remote_blocks(self, val: bool) -> None:
 225        """Cache whether this collection refers to remote blocks
 226
 227        .. ATTENTION:: Internal
 228           This method is only meant to be used by other Collection methods.
 229
 230        Set this collection's cached "has remote blocks" flag to the given
 231        value.
 232        """
 233        self._has_remote_blocks = val
 234        if self.parent:
 235            self.parent.set_has_remote_blocks(val)
 236
 237    @must_be_writable
 238    @synchronized
 239    def find_or_create(
 240            self,
 241            path: str,
 242            create_type: CreateType,
 243    ) -> CollectionItem:
 244        """Get the item at the given path, creating it if necessary
 245
 246        If `path` refers to a stream in this collection, returns a
 247        corresponding `Subcollection` object. If `path` refers to a file in
 248        this collection, returns a corresponding
 249        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 250        this collection, then this method creates a new object and returns
 251        it, creating parent streams as needed. The type of object created is
 252        determined by the value of `create_type`.
 253
 254        Arguments:
 255
 256        * path: str --- The path to find or create within this collection.
 257
 258        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 259          create at `path` if one does not exist. Passing `COLLECTION`
 260          creates a stream and returns the corresponding
 261          `Subcollection`. Passing `FILE` creates a new file and returns the
 262          corresponding `arvados.arvfile.ArvadosFile`.
 263        """
 264        pathcomponents = path.split("/", 1)
 265        if pathcomponents[0]:
 266            item = self._items.get(pathcomponents[0])
 267            if len(pathcomponents) == 1:
 268                if item is None:
 269                    # create new file
 270                    if create_type == COLLECTION:
 271                        item = Subcollection(self, pathcomponents[0])
 272                    else:
 273                        item = ArvadosFile(self, pathcomponents[0])
 274                    self._items[pathcomponents[0]] = item
 275                    self.set_committed(False)
 276                    self.notify(ADD, self, pathcomponents[0], item)
 277                return item
 278            else:
 279                if item is None:
 280                    # create new collection
 281                    item = Subcollection(self, pathcomponents[0])
 282                    self._items[pathcomponents[0]] = item
 283                    self.set_committed(False)
 284                    self.notify(ADD, self, pathcomponents[0], item)
 285                if isinstance(item, RichCollectionBase):
 286                    return item.find_or_create(pathcomponents[1], create_type)
 287                else:
 288                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 289        else:
 290            return self
 291
 292    @synchronized
 293    def find(self, path: str) -> CollectionItem:
 294        """Get the item at the given path
 295
 296        If `path` refers to a stream in this collection, returns a
 297        corresponding `Subcollection` object. If `path` refers to a file in
 298        this collection, returns a corresponding
 299        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 300        this collection, then this method raises `NotADirectoryError`.
 301
 302        Arguments:
 303
 304        * path: str --- The path to find or create within this collection.
 305        """
 306        if not path:
 307            raise errors.ArgumentError("Parameter 'path' is empty.")
 308
 309        pathcomponents = path.split("/", 1)
 310        if pathcomponents[0] == '':
 311            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 312
 313        item = self._items.get(pathcomponents[0])
 314        if item is None:
 315            return None
 316        elif len(pathcomponents) == 1:
 317            return item
 318        else:
 319            if isinstance(item, RichCollectionBase):
 320                if pathcomponents[1]:
 321                    return item.find(pathcomponents[1])
 322                else:
 323                    return item
 324            else:
 325                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 326
 327    @synchronized
 328    def mkdirs(self, path: str) -> 'Subcollection':
 329        """Create and return a subcollection at `path`
 330
 331        If `path` exists within this collection, raises `FileExistsError`.
 332        Otherwise, creates a stream at that path and returns the
 333        corresponding `Subcollection`.
 334        """
 335        if self.find(path) != None:
 336            raise IOError(errno.EEXIST, "Directory or file exists", path)
 337
 338        return self.find_or_create(path, COLLECTION)
 339
 340    def open(
 341            self,
 342            path: str,
 343            mode: str="r",
 344            encoding: Optional[str]=None
 345    ) -> IO:
 346        """Open a file-like object within the collection
 347
 348        This method returns a file-like object that can read and/or write the
 349        file located at `path` within the collection. If you attempt to write
 350        a `path` that does not exist, the file is created with `find_or_create`.
 351        If the file cannot be opened for any other reason, this method raises
 352        `OSError` with an appropriate errno.
 353
 354        Arguments:
 355
 356        * path: str --- The path of the file to open within this collection
 357
 358        * mode: str --- The mode to open this file. Supports all the same
 359          values as `builtins.open`.
 360
 361        * encoding: str | None --- The text encoding of the file. Only used
 362          when the file is opened in text mode. The default is
 363          platform-dependent.
 364
 365        """
 366        if not re.search(r'^[rwa][bt]?\+?$', mode):
 367            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 368
 369        if mode[0] == 'r' and '+' not in mode:
 370            fclass = ArvadosFileReader
 371            arvfile = self.find(path)
 372        elif not self.writable():
 373            raise IOError(errno.EROFS, "Collection is read only")
 374        else:
 375            fclass = ArvadosFileWriter
 376            arvfile = self.find_or_create(path, FILE)
 377
 378        if arvfile is None:
 379            raise IOError(errno.ENOENT, "File not found", path)
 380        if not isinstance(arvfile, ArvadosFile):
 381            raise IOError(errno.EISDIR, "Is a directory", path)
 382
 383        if mode[0] == 'w':
 384            arvfile.truncate(0)
 385
 386        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 387        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 388        if 'b' not in mode:
 389            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 390            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 391        return f
 392
 393    def modified(self) -> bool:
 394        """Indicate whether this collection has an API server record
 395
 396        Returns `False` if this collection corresponds to a record loaded from
 397        the API server, `True` otherwise.
 398        """
 399        return not self.committed()
 400
 401    @synchronized
 402    def committed(self):
 403        """Indicate whether this collection has an API server record
 404
 405        Returns `True` if this collection corresponds to a record loaded from
 406        the API server, `False` otherwise.
 407        """
 408        return self._committed
 409
 410    @synchronized
 411    def set_committed(self, value: bool=True):
 412        """Cache whether this collection has an API server record
 413
 414        .. ATTENTION:: Internal
 415           This method is only meant to be used by other Collection methods.
 416
 417        Set this collection's cached "committed" flag to the given
 418        value and propagates it as needed.
 419        """
 420        if value == self._committed:
 421            return
 422        if value:
 423            for k,v in listitems(self._items):
 424                v.set_committed(True)
 425            self._committed = True
 426        else:
 427            self._committed = False
 428            if self.parent is not None:
 429                self.parent.set_committed(False)
 430
 431    @synchronized
 432    def __iter__(self) -> Iterator[str]:
 433        """Iterate names of streams and files in this collection
 434
 435        This method does not recurse. It only iterates the contents of this
 436        collection's corresponding stream.
 437        """
 438        return iter(viewkeys(self._items))
 439
 440    @synchronized
 441    def __getitem__(self, k: str) -> CollectionItem:
 442        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 443
 444        This method does not recurse. If you want to search a path, use
 445        `RichCollectionBase.find` instead.
 446        """
 447        return self._items[k]
 448
 449    @synchronized
 450    def __contains__(self, k: str) -> bool:
 451        """Indicate whether this collection has an item with this name
 452
 453        This method does not recurse. It you want to check a path, use
 454        `RichCollectionBase.exists` instead.
 455        """
 456        return k in self._items
 457
 458    @synchronized
 459    def __len__(self):
 460        """Get the number of items directly contained in this collection
 461
 462        This method does not recurse. It only counts the streams and files
 463        in this collection's corresponding stream.
 464        """
 465        return len(self._items)
 466
 467    @must_be_writable
 468    @synchronized
 469    def __delitem__(self, p: str) -> None:
 470        """Delete an item from this collection's stream
 471
 472        This method does not recurse. If you want to remove an item by a
 473        path, use `RichCollectionBase.remove` instead.
 474        """
 475        del self._items[p]
 476        self.set_committed(False)
 477        self.notify(DEL, self, p, None)
 478
 479    @synchronized
 480    def keys(self) -> Iterator[str]:
 481        """Iterate names of streams and files in this collection
 482
 483        This method does not recurse. It only iterates the contents of this
 484        collection's corresponding stream.
 485        """
 486        return self._items.keys()
 487
 488    @synchronized
 489    def values(self) -> List[CollectionItem]:
 490        """Get a list of objects in this collection's stream
 491
 492        The return value includes a `Subcollection` for every stream, and an
 493        `arvados.arvfile.ArvadosFile` for every file, directly within this
 494        collection's stream.  This method does not recurse.
 495        """
 496        return listvalues(self._items)
 497
 498    @synchronized
 499    def items(self) -> List[Tuple[str, CollectionItem]]:
 500        """Get a list of `(name, object)` tuples from this collection's stream
 501
 502        The return value includes a `Subcollection` for every stream, and an
 503        `arvados.arvfile.ArvadosFile` for every file, directly within this
 504        collection's stream.  This method does not recurse.
 505        """
 506        return listitems(self._items)
 507
 508    def exists(self, path: str) -> bool:
 509        """Indicate whether this collection includes an item at `path`
 510
 511        This method returns `True` if `path` refers to a stream or file within
 512        this collection, else `False`.
 513
 514        Arguments:
 515
 516        * path: str --- The path to check for existence within this collection
 517        """
 518        return self.find(path) is not None
 519
 520    @must_be_writable
 521    @synchronized
 522    def remove(self, path: str, recursive: bool=False) -> None:
 523        """Remove the file or stream at `path`
 524
 525        Arguments:
 526
 527        * path: str --- The path of the item to remove from the collection
 528
 529        * recursive: bool --- Controls the method's behavior if `path` refers
 530          to a nonempty stream. If `False` (the default), this method raises
 531          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 532          items under the stream.
 533        """
 534        if not path:
 535            raise errors.ArgumentError("Parameter 'path' is empty.")
 536
 537        pathcomponents = path.split("/", 1)
 538        item = self._items.get(pathcomponents[0])
 539        if item is None:
 540            raise IOError(errno.ENOENT, "File not found", path)
 541        if len(pathcomponents) == 1:
 542            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 543                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 544            deleteditem = self._items[pathcomponents[0]]
 545            del self._items[pathcomponents[0]]
 546            self.set_committed(False)
 547            self.notify(DEL, self, pathcomponents[0], deleteditem)
 548        else:
 549            item.remove(pathcomponents[1], recursive=recursive)
 550
 551    def _clonefrom(self, source):
 552        for k,v in listitems(source):
 553            self._items[k] = v.clone(self, k)
 554
 555    def clone(self):
 556        raise NotImplementedError()
 557
 558    @must_be_writable
 559    @synchronized
 560    def add(
 561            self,
 562            source_obj: CollectionItem,
 563            target_name: str,
 564            overwrite: bool=False,
 565            reparent: bool=False,
 566    ) -> None:
 567        """Copy or move a file or subcollection object to this collection
 568
 569        Arguments:
 570
 571        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 572          to add to this collection
 573
 574        * target_name: str --- The path inside this collection where
 575          `source_obj` should be added.
 576
 577        * overwrite: bool --- Controls the behavior of this method when the
 578          collection already contains an object at `target_name`. If `False`
 579          (the default), this method will raise `FileExistsError`. If `True`,
 580          the object at `target_name` will be replaced with `source_obj`.
 581
 582        * reparent: bool --- Controls whether this method copies or moves
 583          `source_obj`. If `False` (the default), `source_obj` is copied into
 584          this collection. If `True`, `source_obj` is moved into this
 585          collection.
 586        """
 587        if target_name in self and not overwrite:
 588            raise IOError(errno.EEXIST, "File already exists", target_name)
 589
 590        modified_from = None
 591        if target_name in self:
 592            modified_from = self[target_name]
 593
 594        # Actually make the move or copy.
 595        if reparent:
 596            source_obj._reparent(self, target_name)
 597            item = source_obj
 598        else:
 599            item = source_obj.clone(self, target_name)
 600
 601        self._items[target_name] = item
 602        self.set_committed(False)
 603        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 604            self.set_has_remote_blocks(True)
 605
 606        if modified_from:
 607            self.notify(MOD, self, target_name, (modified_from, item))
 608        else:
 609            self.notify(ADD, self, target_name, item)
 610
 611    def _get_src_target(self, source, target_path, source_collection, create_dest):
 612        if source_collection is None:
 613            source_collection = self
 614
 615        # Find the object
 616        if isinstance(source, basestring):
 617            source_obj = source_collection.find(source)
 618            if source_obj is None:
 619                raise IOError(errno.ENOENT, "File not found", source)
 620            sourcecomponents = source.split("/")
 621        else:
 622            source_obj = source
 623            sourcecomponents = None
 624
 625        # Find parent collection the target path
 626        targetcomponents = target_path.split("/")
 627
 628        # Determine the name to use.
 629        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 630
 631        if not target_name:
 632            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 633
 634        if create_dest:
 635            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 636        else:
 637            if len(targetcomponents) > 1:
 638                target_dir = self.find("/".join(targetcomponents[0:-1]))
 639            else:
 640                target_dir = self
 641
 642        if target_dir is None:
 643            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 644
 645        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 646            target_dir = target_dir[target_name]
 647            target_name = sourcecomponents[-1]
 648
 649        return (source_obj, target_dir, target_name)
 650
 651    @must_be_writable
 652    @synchronized
 653    def copy(
 654            self,
 655            source: Union[str, CollectionItem],
 656            target_path: str,
 657            source_collection: Optional['RichCollectionBase']=None,
 658            overwrite: bool=False,
 659    ) -> None:
 660        """Copy a file or subcollection object to this collection
 661
 662        Arguments:
 663
 664        * source: str | arvados.arvfile.ArvadosFile |
 665          arvados.collection.Subcollection --- The file or subcollection to
 666          add to this collection. If `source` is a str, the object will be
 667          found by looking up this path from `source_collection` (see
 668          below).
 669
 670        * target_path: str --- The path inside this collection where the
 671          source object should be added.
 672
 673        * source_collection: arvados.collection.Collection | None --- The
 674          collection to find the source object from when `source` is a
 675          path. Defaults to the current collection (`self`).
 676
 677        * overwrite: bool --- Controls the behavior of this method when the
 678          collection already contains an object at `target_path`. If `False`
 679          (the default), this method will raise `FileExistsError`. If `True`,
 680          the object at `target_path` will be replaced with `source_obj`.
 681        """
 682        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 683        target_dir.add(source_obj, target_name, overwrite, False)
 684
 685    @must_be_writable
 686    @synchronized
 687    def rename(
 688            self,
 689            source: Union[str, CollectionItem],
 690            target_path: str,
 691            source_collection: Optional['RichCollectionBase']=None,
 692            overwrite: bool=False,
 693    ) -> None:
 694        """Move a file or subcollection object to this collection
 695
 696        Arguments:
 697
 698        * source: str | arvados.arvfile.ArvadosFile |
 699          arvados.collection.Subcollection --- The file or subcollection to
 700          add to this collection. If `source` is a str, the object will be
 701          found by looking up this path from `source_collection` (see
 702          below).
 703
 704        * target_path: str --- The path inside this collection where the
 705          source object should be added.
 706
 707        * source_collection: arvados.collection.Collection | None --- The
 708          collection to find the source object from when `source` is a
 709          path. Defaults to the current collection (`self`).
 710
 711        * overwrite: bool --- Controls the behavior of this method when the
 712          collection already contains an object at `target_path`. If `False`
 713          (the default), this method will raise `FileExistsError`. If `True`,
 714          the object at `target_path` will be replaced with `source_obj`.
 715        """
 716        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 717        if not source_obj.writable():
 718            raise IOError(errno.EROFS, "Source collection is read only", source)
 719        target_dir.add(source_obj, target_name, overwrite, True)
 720
 721    def portable_manifest_text(self, stream_name: str=".") -> str:
 722        """Get the portable manifest text for this collection
 723
 724        The portable manifest text is normalized, and does not include access
 725        tokens. This method does not flush outstanding blocks to Keep.
 726
 727        Arguments:
 728
 729        * stream_name: str --- The name to use for this collection's stream in
 730          the generated manifest. Default `'.'`.
 731        """
 732        return self._get_manifest_text(stream_name, True, True)
 733
 734    @synchronized
 735    def manifest_text(
 736            self,
 737            stream_name: str=".",
 738            strip: bool=False,
 739            normalize: bool=False,
 740            only_committed: bool=False,
 741    ) -> str:
 742        """Get the manifest text for this collection
 743
 744        Arguments:
 745
 746        * stream_name: str --- The name to use for this collection's stream in
 747          the generated manifest. Default `'.'`.
 748
 749        * strip: bool --- Controls whether or not the returned manifest text
 750          includes access tokens. If `False` (the default), the manifest text
 751          will include access tokens. If `True`, the manifest text will not
 752          include access tokens.
 753
 754        * normalize: bool --- Controls whether or not the returned manifest
 755          text is normalized. Default `False`.
 756
 757        * only_committed: bool --- Controls whether or not this method uploads
 758          pending data to Keep before building and returning the manifest text.
 759          If `False` (the default), this method will finish uploading all data
 760          to Keep, then return the final manifest. If `True`, this method will
 761          build and return a manifest that only refers to the data that has
 762          finished uploading at the time this method was called.
 763        """
 764        if not only_committed:
 765            self._my_block_manager().commit_all()
 766        return self._get_manifest_text(stream_name, strip, normalize,
 767                                       only_committed=only_committed)
 768
 769    @synchronized
 770    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 771        """Get the manifest text for this collection, sub collections and files.
 772
 773        :stream_name:
 774          Name to use for this stream (directory)
 775
 776        :strip:
 777          If True, remove signing tokens from block locators if present.
 778          If False (default), block locators are left unchanged.
 779
 780        :normalize:
 781          If True, always export the manifest text in normalized form
 782          even if the Collection is not modified.  If False (default) and the collection
 783          is not modified, return the original manifest text even if it is not
 784          in normalized form.
 785
 786        :only_committed:
 787          If True, only include blocks that were already committed to Keep.
 788
 789        """
 790
 791        if not self.committed() or self._manifest_text is None or normalize:
 792            stream = {}
 793            buf = []
 794            sorted_keys = sorted(self.keys())
 795            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 796                # Create a stream per file `k`
 797                arvfile = self[filename]
 798                filestream = []
 799                for segment in arvfile.segments():
 800                    loc = segment.locator
 801                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 802                        if only_committed:
 803                            continue
 804                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 805                    if strip:
 806                        loc = KeepLocator(loc).stripped()
 807                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
 808                                         segment.segment_offset, segment.range_size))
 809                stream[filename] = filestream
 810            if stream:
 811                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
 812            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 813                buf.append(self[dirname].manifest_text(
 814                    stream_name=os.path.join(stream_name, dirname),
 815                    strip=strip, normalize=True, only_committed=only_committed))
 816            return "".join(buf)
 817        else:
 818            if strip:
 819                return self.stripped_manifest()
 820            else:
 821                return self._manifest_text
 822
 823    @synchronized
 824    def _copy_remote_blocks(self, remote_blocks={}):
 825        """Scan through the entire collection and ask Keep to copy remote blocks.
 826
 827        When accessing a remote collection, blocks will have a remote signature
 828        (+R instead of +A). Collect these signatures and request Keep to copy the
 829        blocks to the local cluster, returning local (+A) signatures.
 830
 831        :remote_blocks:
 832          Shared cache of remote to local block mappings. This is used to avoid
 833          doing extra work when blocks are shared by more than one file in
 834          different subdirectories.
 835
 836        """
 837        for item in self:
 838            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 839        return remote_blocks
 840
 841    @synchronized
 842    def diff(
 843            self,
 844            end_collection: 'RichCollectionBase',
 845            prefix: str=".",
 846            holding_collection: Optional['Collection']=None,
 847    ) -> ChangeList:
 848        """Build a list of differences between this collection and another
 849
 850        Arguments:
 851
 852        * end_collection: arvados.collection.RichCollectionBase --- A
 853          collection object with the desired end state. The returned diff
 854          list will describe how to go from the current collection object
 855          `self` to `end_collection`.
 856
 857        * prefix: str --- The name to use for this collection's stream in
 858          the diff list. Default `'.'`.
 859
 860        * holding_collection: arvados.collection.Collection | None --- A
 861          collection object used to hold objects for the returned diff
 862          list. By default, a new empty collection is created.
 863        """
 864        changes = []
 865        if holding_collection is None:
 866            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 867        for k in self:
 868            if k not in end_collection:
 869               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 870        for k in end_collection:
 871            if k in self:
 872                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 873                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 874                elif end_collection[k] != self[k]:
 875                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 876                else:
 877                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 878            else:
 879                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 880        return changes
 881
 882    @must_be_writable
 883    @synchronized
 884    def apply(self, changes: ChangeList) -> None:
 885        """Apply a list of changes from to this collection
 886
 887        This method takes a list of changes generated by
 888        `RichCollectionBase.diff` and applies it to this
 889        collection. Afterward, the state of this collection object will
 890        match the state of `end_collection` passed to `diff`. If a change
 891        conflicts with a local change, it will be saved to an alternate path
 892        indicating the conflict.
 893
 894        Arguments:
 895
 896        * changes: arvados.collection.ChangeList --- The list of differences
 897          generated by `RichCollectionBase.diff`.
 898        """
 899        if changes:
 900            self.set_committed(False)
 901        for change in changes:
 902            event_type = change[0]
 903            path = change[1]
 904            initial = change[2]
 905            local = self.find(path)
 906            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 907                                                                    time.gmtime()))
 908            if event_type == ADD:
 909                if local is None:
 910                    # No local file at path, safe to copy over new file
 911                    self.copy(initial, path)
 912                elif local is not None and local != initial:
 913                    # There is already local file and it is different:
 914                    # save change to conflict file.
 915                    self.copy(initial, conflictpath)
 916            elif event_type == MOD or event_type == TOK:
 917                final = change[3]
 918                if local == initial:
 919                    # Local matches the "initial" item so it has not
 920                    # changed locally and is safe to update.
 921                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 922                        # Replace contents of local file with new contents
 923                        local.replace_contents(final)
 924                    else:
 925                        # Overwrite path with new item; this can happen if
 926                        # path was a file and is now a collection or vice versa
 927                        self.copy(final, path, overwrite=True)
 928                else:
 929                    # Local is missing (presumably deleted) or local doesn't
 930                    # match the "start" value, so save change to conflict file
 931                    self.copy(final, conflictpath)
 932            elif event_type == DEL:
 933                if local == initial:
 934                    # Local item matches "initial" value, so it is safe to remove.
 935                    self.remove(path, recursive=True)
 936                # else, the file is modified or already removed, in either
 937                # case we don't want to try to remove it.
 938
 939    def portable_data_hash(self) -> str:
 940        """Get the portable data hash for this collection's manifest"""
 941        if self._manifest_locator and self.committed():
 942            # If the collection is already saved on the API server, and it's committed
 943            # then return API server's PDH response.
 944            return self._portable_data_hash
 945        else:
 946            stripped = self.portable_manifest_text().encode()
 947            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 948
 949    @synchronized
 950    def subscribe(self, callback: ChangeCallback) -> None:
 951        """Set a notify callback for changes to this collection
 952
 953        Arguments:
 954
 955        * callback: arvados.collection.ChangeCallback --- The callable to
 956          call each time the collection is changed.
 957        """
 958        if self._callback is None:
 959            self._callback = callback
 960        else:
 961            raise errors.ArgumentError("A callback is already set on this collection.")
 962
 963    @synchronized
 964    def unsubscribe(self) -> None:
 965        """Remove any notify callback set for changes to this collection"""
 966        if self._callback is not None:
 967            self._callback = None
 968
 969    @synchronized
 970    def notify(
 971            self,
 972            event: ChangeType,
 973            collection: 'RichCollectionBase',
 974            name: str,
 975            item: CollectionItem,
 976    ) -> None:
 977        """Notify any subscribed callback about a change to this collection
 978
 979        .. ATTENTION:: Internal
 980           This method is only meant to be used by other Collection methods.
 981
 982        If a callback has been registered with `RichCollectionBase.subscribe`,
 983        it will be called with information about a change to this collection.
 984        Then this notification will be propagated to this collection's root.
 985
 986        Arguments:
 987
 988        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 989          the collection.
 990
 991        * collection: arvados.collection.RichCollectionBase --- The
 992          collection that was modified.
 993
 994        * name: str --- The name of the file or stream within `collection` that
 995          was modified.
 996
 997        * item: arvados.arvfile.ArvadosFile |
 998          arvados.collection.Subcollection --- The new contents at `name`
 999          within `collection`.
1000        """
1001        if self._callback:
1002            self._callback(event, collection, name, item)
1003        self.root_collection().notify(event, collection, name, item)
1004
1005    @synchronized
1006    def __eq__(self, other: Any) -> bool:
1007        """Indicate whether this collection object is equal to another"""
1008        if other is self:
1009            return True
1010        if not isinstance(other, RichCollectionBase):
1011            return False
1012        if len(self._items) != len(other):
1013            return False
1014        for k in self._items:
1015            if k not in other:
1016                return False
1017            if self._items[k] != other[k]:
1018                return False
1019        return True
1020
1021    def __ne__(self, other: Any) -> bool:
1022        """Indicate whether this collection object is not equal to another"""
1023        return not self.__eq__(other)
1024
1025    @synchronized
1026    def flush(self) -> None:
1027        """Upload any pending data to Keep"""
1028        for e in listvalues(self):
1029            e.flush()
1030
1031
1032class Collection(RichCollectionBase):
1033    """Read and manipulate an Arvados collection
1034
1035    This class provides a high-level interface to create, read, and update
1036    Arvados collections and their contents. Refer to the Arvados Python SDK
1037    cookbook for [an introduction to using the Collection class][cookbook].
1038
1039    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1040    """
1041
1042    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1043                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1044                 keep_client: Optional['arvados.keep.KeepClient']=None,
1045                 num_retries: int=10,
1046                 parent: Optional['Collection']=None,
1047                 apiconfig: Optional[Mapping[str, str]]=None,
1048                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1049                 replication_desired: Optional[int]=None,
1050                 storage_classes_desired: Optional[List[str]]=None,
1051                 put_threads: Optional[int]=None):
1052        """Initialize a Collection object
1053
1054        Arguments:
1055
1056        * manifest_locator_or_text: str | None --- This string can contain a
1057          collection manifest text, portable data hash, or UUID. When given a
1058          portable data hash or UUID, this instance will load a collection
1059          record from the API server. Otherwise, this instance will represent a
1060          new collection without an API server record. The default value `None`
1061          instantiates a new collection with an empty manifest.
1062
1063        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1064          Arvados API client object this instance uses to make requests. If
1065          none is given, this instance creates its own client using the
1066          settings from `apiconfig` (see below). If your client instantiates
1067          many Collection objects, you can help limit memory utilization by
1068          calling `arvados.api.api` to construct an
1069          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1070          for every Collection.
1071
1072        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1073          object this instance uses to make requests. If none is given, this
1074          instance creates its own client using its `api_client`.
1075
1076        * num_retries: int --- The number of times that client requests are
1077          retried. Default 10.
1078
1079        * parent: arvados.collection.Collection | None --- The parent Collection
1080          object of this instance, if any. This argument is primarily used by
1081          other Collection methods; user client code shouldn't need to use it.
1082
1083        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1084          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1085          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1086          Collection object constructs one from these settings. If no
1087          mapping is provided, calls `arvados.config.settings` to get these
1088          parameters from user configuration.
1089
1090        * block_manager: arvados.arvfile._BlockManager | None --- The
1091          _BlockManager object used by this instance to coordinate reading
1092          and writing Keep data blocks. If none is given, this instance
1093          constructs its own. This argument is primarily used by other
1094          Collection methods; user client code shouldn't need to use it.
1095
1096        * replication_desired: int | None --- This controls both the value of
1097          the `replication_desired` field on API collection records saved by
1098          this class, as well as the number of Keep services that the object
1099          writes new data blocks to. If none is given, uses the default value
1100          configured for the cluster.
1101
1102        * storage_classes_desired: list[str] | None --- This controls both
1103          the value of the `storage_classes_desired` field on API collection
1104          records saved by this class, as well as selecting which specific
1105          Keep services the object writes new data blocks to. If none is
1106          given, defaults to an empty list.
1107
1108        * put_threads: int | None --- The number of threads to run
1109          simultaneously to upload data blocks to Keep. This value is used when
1110          building a new `block_manager`. It is unused when a `block_manager`
1111          is provided.
1112        """
1113
1114        if storage_classes_desired and type(storage_classes_desired) is not list:
1115            raise errors.ArgumentError("storage_classes_desired must be list type.")
1116
1117        super(Collection, self).__init__(parent)
1118        self._api_client = api_client
1119        self._keep_client = keep_client
1120
1121        # Use the keep client from ThreadSafeApiCache
1122        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1123            self._keep_client = self._api_client.keep
1124
1125        self._block_manager = block_manager
1126        self.replication_desired = replication_desired
1127        self._storage_classes_desired = storage_classes_desired
1128        self.put_threads = put_threads
1129
1130        if apiconfig:
1131            self._config = apiconfig
1132        else:
1133            self._config = config.settings()
1134
1135        self.num_retries = num_retries
1136        self._manifest_locator = None
1137        self._manifest_text = None
1138        self._portable_data_hash = None
1139        self._api_response = None
1140        self._past_versions = set()
1141
1142        self.lock = threading.RLock()
1143        self.events = None
1144
1145        if manifest_locator_or_text:
1146            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1147                self._manifest_locator = manifest_locator_or_text
1148            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1149                self._manifest_locator = manifest_locator_or_text
1150                if not self._has_local_collection_uuid():
1151                    self._has_remote_blocks = True
1152            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1153                self._manifest_text = manifest_locator_or_text
1154                if '+R' in self._manifest_text:
1155                    self._has_remote_blocks = True
1156            else:
1157                raise errors.ArgumentError(
1158                    "Argument to CollectionReader is not a manifest or a collection UUID")
1159
1160            try:
1161                self._populate()
1162            except errors.SyntaxError as e:
1163                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1164
1165    def storage_classes_desired(self) -> List[str]:
1166        """Get this collection's `storage_classes_desired` value"""
1167        return self._storage_classes_desired or []
1168
1169    def root_collection(self) -> 'Collection':
1170        return self
1171
1172    def get_properties(self) -> Properties:
1173        """Get this collection's properties
1174
1175        This method always returns a dict. If this collection object does not
1176        have an associated API record, or that record does not have any
1177        properties set, this method returns an empty dict.
1178        """
1179        if self._api_response and self._api_response["properties"]:
1180            return self._api_response["properties"]
1181        else:
1182            return {}
1183
1184    def get_trash_at(self) -> Optional[datetime.datetime]:
1185        """Get this collection's `trash_at` field
1186
1187        This method parses the `trash_at` field of the collection's API
1188        record and returns a datetime from it. If that field is not set, or
1189        this collection object does not have an associated API record,
1190        returns None.
1191        """
1192        if self._api_response and self._api_response["trash_at"]:
1193            try:
1194                return ciso8601.parse_datetime(self._api_response["trash_at"])
1195            except ValueError:
1196                return None
1197        else:
1198            return None
1199
1200    def stream_name(self) -> str:
1201        return "."
1202
1203    def writable(self) -> bool:
1204        return True
1205
1206    @synchronized
1207    def known_past_version(
1208            self,
1209            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1210    ) -> bool:
1211        """Indicate whether an API record for this collection has been seen before
1212
1213        As this collection object loads records from the API server, it records
1214        their `modified_at` and `portable_data_hash` fields. This method accepts
1215        a 2-tuple with values for those fields, and returns `True` if the
1216        combination was previously loaded.
1217        """
1218        return modified_at_and_portable_data_hash in self._past_versions
1219
1220    @synchronized
1221    @retry_method
1222    def update(
1223            self,
1224            other: Optional['Collection']=None,
1225            num_retries: Optional[int]=None,
1226    ) -> None:
1227        """Merge another collection's contents into this one
1228
1229        This method compares the manifest of this collection instance with
1230        another, then updates this instance's manifest with changes from the
1231        other, renaming files to flag conflicts where necessary.
1232
1233        When called without any arguments, this method reloads the collection's
1234        API record, and updates this instance with any changes that have
1235        appeared server-side. If this instance does not have a corresponding
1236        API record, this method raises `arvados.errors.ArgumentError`.
1237
1238        Arguments:
1239
1240        * other: arvados.collection.Collection | None --- The collection
1241          whose contents should be merged into this instance. When not
1242          provided, this method reloads this collection's API record and
1243          constructs a Collection object from it.  If this instance does not
1244          have a corresponding API record, this method raises
1245          `arvados.errors.ArgumentError`.
1246
1247        * num_retries: int | None --- The number of times to retry reloading
1248          the collection's API record from the API server. If not specified,
1249          uses the `num_retries` provided when this instance was constructed.
1250        """
1251        if other is None:
1252            if self._manifest_locator is None:
1253                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1254            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1255            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1256                response.get("portable_data_hash") != self.portable_data_hash()):
1257                # The record on the server is different from our current one, but we've seen it before,
1258                # so ignore it because it's already been merged.
1259                # However, if it's the same as our current record, proceed with the update, because we want to update
1260                # our tokens.
1261                return
1262            else:
1263                self._remember_api_response(response)
1264            other = CollectionReader(response["manifest_text"])
1265        baseline = CollectionReader(self._manifest_text)
1266        self.apply(baseline.diff(other))
1267        self._manifest_text = self.manifest_text()
1268
1269    @synchronized
1270    def _my_api(self):
1271        if self._api_client is None:
1272            self._api_client = ThreadSafeApiCache(self._config, version='v1')
1273            if self._keep_client is None:
1274                self._keep_client = self._api_client.keep
1275        return self._api_client
1276
1277    @synchronized
1278    def _my_keep(self):
1279        if self._keep_client is None:
1280            if self._api_client is None:
1281                self._my_api()
1282            else:
1283                self._keep_client = KeepClient(api_client=self._api_client)
1284        return self._keep_client
1285
1286    @synchronized
1287    def _my_block_manager(self):
1288        if self._block_manager is None:
1289            copies = (self.replication_desired or
1290                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1291                                                   2))
1292            self._block_manager = _BlockManager(self._my_keep(),
1293                                                copies=copies,
1294                                                put_threads=self.put_threads,
1295                                                num_retries=self.num_retries,
1296                                                storage_classes_func=self.storage_classes_desired)
1297        return self._block_manager
1298
1299    def _remember_api_response(self, response):
1300        self._api_response = response
1301        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1302
1303    def _populate_from_api_server(self):
1304        # As in KeepClient itself, we must wait until the last
1305        # possible moment to instantiate an API client, in order to
1306        # avoid tripping up clients that don't have access to an API
1307        # server.  If we do build one, make sure our Keep client uses
1308        # it.  If instantiation fails, we'll fall back to the except
1309        # clause, just like any other Collection lookup
1310        # failure. Return an exception, or None if successful.
1311        self._remember_api_response(self._my_api().collections().get(
1312            uuid=self._manifest_locator).execute(
1313                num_retries=self.num_retries))
1314        self._manifest_text = self._api_response['manifest_text']
1315        self._portable_data_hash = self._api_response['portable_data_hash']
1316        # If not overriden via kwargs, we should try to load the
1317        # replication_desired and storage_classes_desired from the API server
1318        if self.replication_desired is None:
1319            self.replication_desired = self._api_response.get('replication_desired', None)
1320        if self._storage_classes_desired is None:
1321            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1322
1323    def _populate(self):
1324        if self._manifest_text is None:
1325            if self._manifest_locator is None:
1326                return
1327            else:
1328                self._populate_from_api_server()
1329        self._baseline_manifest = self._manifest_text
1330        self._import_manifest(self._manifest_text)
1331
1332    def _has_collection_uuid(self):
1333        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1334
1335    def _has_local_collection_uuid(self):
1336        return self._has_collection_uuid and \
1337            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1338
1339    def __enter__(self):
1340        return self
1341
1342    def __exit__(self, exc_type, exc_value, traceback):
1343        """Exit a context with this collection instance
1344
1345        If no exception was raised inside the context block, and this
1346        collection is writable and has a corresponding API record, that
1347        record will be updated to match the state of this instance at the end
1348        of the block.
1349        """
1350        if exc_type is None:
1351            if self.writable() and self._has_collection_uuid():
1352                self.save()
1353        self.stop_threads()
1354
1355    def stop_threads(self) -> None:
1356        """Stop background Keep upload/download threads"""
1357        if self._block_manager is not None:
1358            self._block_manager.stop_threads()
1359
1360    @synchronized
1361    def manifest_locator(self) -> Optional[str]:
1362        """Get this collection's manifest locator, if any
1363
1364        * If this collection instance is associated with an API record with a
1365          UUID, return that.
1366        * Otherwise, if this collection instance was loaded from an API record
1367          by portable data hash, return that.
1368        * Otherwise, return `None`.
1369        """
1370        return self._manifest_locator
1371
1372    @synchronized
1373    def clone(
1374            self,
1375            new_parent: Optional['Collection']=None,
1376            new_name: Optional[str]=None,
1377            readonly: bool=False,
1378            new_config: Optional[Mapping[str, str]]=None,
1379    ) -> 'Collection':
1380        """Create a Collection object with the same contents as this instance
1381
1382        This method creates a new Collection object with contents that match
1383        this instance's. The new collection will not be associated with any API
1384        record.
1385
1386        Arguments:
1387
1388        * new_parent: arvados.collection.Collection | None --- This value is
1389          passed to the new Collection's constructor as the `parent`
1390          argument.
1391
1392        * new_name: str | None --- This value is unused.
1393
1394        * readonly: bool --- If this value is true, this method constructs and
1395          returns a `CollectionReader`. Otherwise, it returns a mutable
1396          `Collection`. Default `False`.
1397
1398        * new_config: Mapping[str, str] | None --- This value is passed to the
1399          new Collection's constructor as `apiconfig`. If no value is provided,
1400          defaults to the configuration passed to this instance's constructor.
1401        """
1402        if new_config is None:
1403            new_config = self._config
1404        if readonly:
1405            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1406        else:
1407            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1408
1409        newcollection._clonefrom(self)
1410        return newcollection
1411
1412    @synchronized
1413    def api_response(self) -> Optional[Dict[str, Any]]:
1414        """Get this instance's associated API record
1415
1416        If this Collection instance has an associated API record, return it.
1417        Otherwise, return `None`.
1418        """
1419        return self._api_response
1420
1421    def find_or_create(
1422            self,
1423            path: str,
1424            create_type: CreateType,
1425    ) -> CollectionItem:
1426        if path == ".":
1427            return self
1428        else:
1429            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1430
1431    def find(self, path: str) -> CollectionItem:
1432        if path == ".":
1433            return self
1434        else:
1435            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1436
1437    def remove(self, path: str, recursive: bool=False) -> None:
1438        if path == ".":
1439            raise errors.ArgumentError("Cannot remove '.'")
1440        else:
1441            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1442
1443    @must_be_writable
1444    @synchronized
1445    @retry_method
1446    def save(
1447            self,
1448            properties: Optional[Properties]=None,
1449            storage_classes: Optional[StorageClasses]=None,
1450            trash_at: Optional[datetime.datetime]=None,
1451            merge: bool=True,
1452            num_retries: Optional[int]=None,
1453            preserve_version: bool=False,
1454    ) -> str:
1455        """Save collection to an existing API record
1456
1457        This method updates the instance's corresponding API record to match
1458        the instance's state. If this instance does not have a corresponding API
1459        record yet, raises `AssertionError`. (To create a new API record, use
1460        `Collection.save_new`.) This method returns the saved collection
1461        manifest.
1462
1463        Arguments:
1464
1465        * properties: dict[str, Any] | None --- If provided, the API record will
1466          be updated with these properties. Note this will completely replace
1467          any existing properties.
1468
1469        * storage_classes: list[str] | None --- If provided, the API record will
1470          be updated with this value in the `storage_classes_desired` field.
1471          This value will also be saved on the instance and used for any
1472          changes that follow.
1473
1474        * trash_at: datetime.datetime | None --- If provided, the API record
1475          will be updated with this value in the `trash_at` field.
1476
1477        * merge: bool --- If `True` (the default), this method will first
1478          reload this collection's API record, and merge any new contents into
1479          this instance before saving changes. See `Collection.update` for
1480          details.
1481
1482        * num_retries: int | None --- The number of times to retry reloading
1483          the collection's API record from the API server. If not specified,
1484          uses the `num_retries` provided when this instance was constructed.
1485
1486        * preserve_version: bool --- This value will be passed to directly
1487          to the underlying API call. If `True`, the Arvados API will
1488          preserve the versions of this collection both immediately before
1489          and after the update. If `True` when the API server is not
1490          configured with collection versioning, this method raises
1491          `arvados.errors.ArgumentError`.
1492        """
1493        if properties and type(properties) is not dict:
1494            raise errors.ArgumentError("properties must be dictionary type.")
1495
1496        if storage_classes and type(storage_classes) is not list:
1497            raise errors.ArgumentError("storage_classes must be list type.")
1498        if storage_classes:
1499            self._storage_classes_desired = storage_classes
1500
1501        if trash_at and type(trash_at) is not datetime.datetime:
1502            raise errors.ArgumentError("trash_at must be datetime type.")
1503
1504        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1505            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1506
1507        body={}
1508        if properties:
1509            body["properties"] = properties
1510        if self.storage_classes_desired():
1511            body["storage_classes_desired"] = self.storage_classes_desired()
1512        if trash_at:
1513            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1514            body["trash_at"] = t
1515        if preserve_version:
1516            body["preserve_version"] = preserve_version
1517
1518        if not self.committed():
1519            if self._has_remote_blocks:
1520                # Copy any remote blocks to the local cluster.
1521                self._copy_remote_blocks(remote_blocks={})
1522                self._has_remote_blocks = False
1523            if not self._has_collection_uuid():
1524                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1525            elif not self._has_local_collection_uuid():
1526                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1527
1528            self._my_block_manager().commit_all()
1529
1530            if merge:
1531                self.update()
1532
1533            text = self.manifest_text(strip=False)
1534            body['manifest_text'] = text
1535
1536            self._remember_api_response(self._my_api().collections().update(
1537                uuid=self._manifest_locator,
1538                body=body
1539                ).execute(num_retries=num_retries))
1540            self._manifest_text = self._api_response["manifest_text"]
1541            self._portable_data_hash = self._api_response["portable_data_hash"]
1542            self.set_committed(True)
1543        elif body:
1544            self._remember_api_response(self._my_api().collections().update(
1545                uuid=self._manifest_locator,
1546                body=body
1547                ).execute(num_retries=num_retries))
1548
1549        return self._manifest_text
1550
1551
1552    @must_be_writable
1553    @synchronized
1554    @retry_method
1555    def save_new(
1556            self,
1557            name: Optional[str]=None,
1558            create_collection_record: bool=True,
1559            owner_uuid: Optional[str]=None,
1560            properties: Optional[Properties]=None,
1561            storage_classes: Optional[StorageClasses]=None,
1562            trash_at: Optional[datetime.datetime]=None,
1563            ensure_unique_name: bool=False,
1564            num_retries: Optional[int]=None,
1565            preserve_version: bool=False,
1566    ):
1567        """Save collection to a new API record
1568
1569        This method finishes uploading new data blocks and (optionally)
1570        creates a new API collection record with the provided data. If a new
1571        record is created, this instance becomes associated with that record
1572        for future updates like `save()`. This method returns the saved
1573        collection manifest.
1574
1575        Arguments:
1576
1577        * name: str | None --- The `name` field to use on the new collection
1578          record. If not specified, a generic default name is generated.
1579
1580        * create_collection_record: bool --- If `True` (the default), creates a
1581          collection record on the API server. If `False`, the method finishes
1582          all data uploads and only returns the resulting collection manifest
1583          without sending it to the API server.
1584
1585        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1586          new collection record.
1587
1588        * properties: dict[str, Any] | None --- The `properties` field to use on
1589          the new collection record.
1590
1591        * storage_classes: list[str] | None --- The
1592          `storage_classes_desired` field to use on the new collection record.
1593
1594        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1595          on the new collection record.
1596
1597        * ensure_unique_name: bool --- This value is passed directly to the
1598          Arvados API when creating the collection record. If `True`, the API
1599          server may modify the submitted `name` to ensure the collection's
1600          `name`+`owner_uuid` combination is unique. If `False` (the default),
1601          if a collection already exists with this same `name`+`owner_uuid`
1602          combination, creating a collection record will raise a validation
1603          error.
1604
1605        * num_retries: int | None --- The number of times to retry reloading
1606          the collection's API record from the API server. If not specified,
1607          uses the `num_retries` provided when this instance was constructed.
1608
1609        * preserve_version: bool --- This value will be passed to directly
1610          to the underlying API call. If `True`, the Arvados API will
1611          preserve the versions of this collection both immediately before
1612          and after the update. If `True` when the API server is not
1613          configured with collection versioning, this method raises
1614          `arvados.errors.ArgumentError`.
1615        """
1616        if properties and type(properties) is not dict:
1617            raise errors.ArgumentError("properties must be dictionary type.")
1618
1619        if storage_classes and type(storage_classes) is not list:
1620            raise errors.ArgumentError("storage_classes must be list type.")
1621
1622        if trash_at and type(trash_at) is not datetime.datetime:
1623            raise errors.ArgumentError("trash_at must be datetime type.")
1624
1625        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1626            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1627
1628        if self._has_remote_blocks:
1629            # Copy any remote blocks to the local cluster.
1630            self._copy_remote_blocks(remote_blocks={})
1631            self._has_remote_blocks = False
1632
1633        if storage_classes:
1634            self._storage_classes_desired = storage_classes
1635
1636        self._my_block_manager().commit_all()
1637        text = self.manifest_text(strip=False)
1638
1639        if create_collection_record:
1640            if name is None:
1641                name = "New collection"
1642                ensure_unique_name = True
1643
1644            body = {"manifest_text": text,
1645                    "name": name,
1646                    "replication_desired": self.replication_desired}
1647            if owner_uuid:
1648                body["owner_uuid"] = owner_uuid
1649            if properties:
1650                body["properties"] = properties
1651            if self.storage_classes_desired():
1652                body["storage_classes_desired"] = self.storage_classes_desired()
1653            if trash_at:
1654                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1655                body["trash_at"] = t
1656            if preserve_version:
1657                body["preserve_version"] = preserve_version
1658
1659            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1660            text = self._api_response["manifest_text"]
1661
1662            self._manifest_locator = self._api_response["uuid"]
1663            self._portable_data_hash = self._api_response["portable_data_hash"]
1664
1665            self._manifest_text = text
1666            self.set_committed(True)
1667
1668        return text
1669
1670    _token_re = re.compile(r'(\S+)(\s+|$)')
1671    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1672    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1673
1674    def _unescape_manifest_path(self, path):
1675        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1676
1677    @synchronized
1678    def _import_manifest(self, manifest_text):
1679        """Import a manifest into a `Collection`.
1680
1681        :manifest_text:
1682          The manifest text to import from.
1683
1684        """
1685        if len(self) > 0:
1686            raise ArgumentError("Can only import manifest into an empty collection")
1687
1688        STREAM_NAME = 0
1689        BLOCKS = 1
1690        SEGMENTS = 2
1691
1692        stream_name = None
1693        state = STREAM_NAME
1694
1695        for token_and_separator in self._token_re.finditer(manifest_text):
1696            tok = token_and_separator.group(1)
1697            sep = token_and_separator.group(2)
1698
1699            if state == STREAM_NAME:
1700                # starting a new stream
1701                stream_name = self._unescape_manifest_path(tok)
1702                blocks = []
1703                segments = []
1704                streamoffset = 0
1705                state = BLOCKS
1706                self.find_or_create(stream_name, COLLECTION)
1707                continue
1708
1709            if state == BLOCKS:
1710                block_locator = self._block_re.match(tok)
1711                if block_locator:
1712                    blocksize = int(block_locator.group(1))
1713                    blocks.append(Range(tok, streamoffset, blocksize, 0))
1714                    streamoffset += blocksize
1715                else:
1716                    state = SEGMENTS
1717
1718            if state == SEGMENTS:
1719                file_segment = self._segment_re.match(tok)
1720                if file_segment:
1721                    pos = int(file_segment.group(1))
1722                    size = int(file_segment.group(2))
1723                    name = self._unescape_manifest_path(file_segment.group(3))
1724                    if name.split('/')[-1] == '.':
1725                        # placeholder for persisting an empty directory, not a real file
1726                        if len(name) > 2:
1727                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1728                    else:
1729                        filepath = os.path.join(stream_name, name)
1730                        try:
1731                            afile = self.find_or_create(filepath, FILE)
1732                        except IOError as e:
1733                            if e.errno == errno.ENOTDIR:
1734                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1735                            else:
1736                                raise e from None
1737                        if isinstance(afile, ArvadosFile):
1738                            afile.add_segment(blocks, pos, size)
1739                        else:
1740                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1741                else:
1742                    # error!
1743                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1744
1745            if sep == "\n":
1746                stream_name = None
1747                state = STREAM_NAME
1748
1749        self.set_committed(True)
1750
1751    @synchronized
1752    def notify(
1753            self,
1754            event: ChangeType,
1755            collection: 'RichCollectionBase',
1756            name: str,
1757            item: CollectionItem,
1758    ) -> None:
1759        if self._callback:
1760            self._callback(event, collection, name, item)
1761
1762
1763class Subcollection(RichCollectionBase):
1764    """Read and manipulate a stream/directory within an Arvados collection
1765
1766    This class represents a single stream (like a directory) within an Arvados
1767    `Collection`. It is returned by `Collection.find` and provides the same API.
1768    Operations that work on the API collection record propagate to the parent
1769    `Collection` object.
1770    """
1771
1772    def __init__(self, parent, name):
1773        super(Subcollection, self).__init__(parent)
1774        self.lock = self.root_collection().lock
1775        self._manifest_text = None
1776        self.name = name
1777        self.num_retries = parent.num_retries
1778
1779    def root_collection(self) -> 'Collection':
1780        return self.parent.root_collection()
1781
1782    def writable(self) -> bool:
1783        return self.root_collection().writable()
1784
1785    def _my_api(self):
1786        return self.root_collection()._my_api()
1787
1788    def _my_keep(self):
1789        return self.root_collection()._my_keep()
1790
1791    def _my_block_manager(self):
1792        return self.root_collection()._my_block_manager()
1793
1794    def stream_name(self) -> str:
1795        return os.path.join(self.parent.stream_name(), self.name)
1796
1797    @synchronized
1798    def clone(
1799            self,
1800            new_parent: Optional['Collection']=None,
1801            new_name: Optional[str]=None,
1802    ) -> 'Subcollection':
1803        c = Subcollection(new_parent, new_name)
1804        c._clonefrom(self)
1805        return c
1806
1807    @must_be_writable
1808    @synchronized
1809    def _reparent(self, newparent, newname):
1810        self.set_committed(False)
1811        self.flush()
1812        self.parent.remove(self.name, recursive=True)
1813        self.parent = newparent
1814        self.name = newname
1815        self.lock = self.parent.root_collection().lock
1816
1817    @synchronized
1818    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1819        """Encode empty directories by using an \056-named (".") empty file"""
1820        if len(self._items) == 0:
1821            return "%s %s 0:0:\\056\n" % (
1822                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1823        return super(Subcollection, self)._get_manifest_text(stream_name,
1824                                                             strip, normalize,
1825                                                             only_committed)
1826
1827
1828class CollectionReader(Collection):
1829    """Read-only `Collection` subclass
1830
1831    This class will never create or update any API collection records. You can
1832    use this class for additional code safety when you only need to read
1833    existing collections.
1834    """
1835    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1836        self._in_init = True
1837        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1838        self._in_init = False
1839
1840        # Forego any locking since it should never change once initialized.
1841        self.lock = NoopLock()
1842
1843        # Backwards compatability with old CollectionReader
1844        # all_streams() and all_files()
1845        self._streams = None
1846
1847    def writable(self) -> bool:
1848        return self._in_init
1849
1850    def _populate_streams(orig_func):
1851        @functools.wraps(orig_func)
1852        def populate_streams_wrapper(self, *args, **kwargs):
1853            # Defer populating self._streams until needed since it creates a copy of the manifest.
1854            if self._streams is None:
1855                if self._manifest_text:
1856                    self._streams = [sline.split()
1857                                     for sline in self._manifest_text.split("\n")
1858                                     if sline]
1859                else:
1860                    self._streams = []
1861            return orig_func(self, *args, **kwargs)
1862        return populate_streams_wrapper
1863
1864    @arvados.util._deprecated('3.0', 'Collection iteration')
1865    @_populate_streams
1866    def normalize(self):
1867        """Normalize the streams returned by `all_streams`"""
1868        streams = {}
1869        for s in self.all_streams():
1870            for f in s.all_files():
1871                streamname, filename = split(s.name() + "/" + f.name())
1872                if streamname not in streams:
1873                    streams[streamname] = {}
1874                if filename not in streams[streamname]:
1875                    streams[streamname][filename] = []
1876                for r in f.segments:
1877                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1878
1879        self._streams = [normalize_stream(s, streams[s])
1880                         for s in sorted(streams)]
1881
1882    @arvados.util._deprecated('3.0', 'Collection iteration')
1883    @_populate_streams
1884    def all_streams(self):
1885        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1886                for s in self._streams]
1887
1888    @arvados.util._deprecated('3.0', 'Collection iteration')
1889    @_populate_streams
1890    def all_files(self):
1891        for s in self.all_streams():
1892            for f in s.all_files():
1893                yield f
1894
1895
1896class CollectionWriter(CollectionBase):
1897    """Create a new collection from scratch
1898
1899    .. WARNING:: Deprecated
1900       This class is deprecated. Prefer `arvados.collection.Collection`
1901       instead.
1902    """
1903
1904    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1905    def __init__(self, api_client=None, num_retries=0, replication=None):
1906        """Instantiate a CollectionWriter.
1907
1908        CollectionWriter lets you build a new Arvados Collection from scratch.
1909        Write files to it.  The CollectionWriter will upload data to Keep as
1910        appropriate, and provide you with the Collection manifest text when
1911        you're finished.
1912
1913        Arguments:
1914        * api_client: The API client to use to look up Collections.  If not
1915          provided, CollectionReader will build one from available Arvados
1916          configuration.
1917        * num_retries: The default number of times to retry failed
1918          service requests.  Default 0.  You may change this value
1919          after instantiation, but note those changes may not
1920          propagate to related objects like the Keep client.
1921        * replication: The number of copies of each block to store.
1922          If this argument is None or not supplied, replication is
1923          the server-provided default if available, otherwise 2.
1924        """
1925        self._api_client = api_client
1926        self.num_retries = num_retries
1927        self.replication = (2 if replication is None else replication)
1928        self._keep_client = None
1929        self._data_buffer = []
1930        self._data_buffer_len = 0
1931        self._current_stream_files = []
1932        self._current_stream_length = 0
1933        self._current_stream_locators = []
1934        self._current_stream_name = '.'
1935        self._current_file_name = None
1936        self._current_file_pos = 0
1937        self._finished_streams = []
1938        self._close_file = None
1939        self._queued_file = None
1940        self._queued_dirents = deque()
1941        self._queued_trees = deque()
1942        self._last_open = None
1943
1944    def __exit__(self, exc_type, exc_value, traceback):
1945        if exc_type is None:
1946            self.finish()
1947
1948    def do_queued_work(self):
1949        # The work queue consists of three pieces:
1950        # * _queued_file: The file object we're currently writing to the
1951        #   Collection.
1952        # * _queued_dirents: Entries under the current directory
1953        #   (_queued_trees[0]) that we want to write or recurse through.
1954        #   This may contain files from subdirectories if
1955        #   max_manifest_depth == 0 for this directory.
1956        # * _queued_trees: Directories that should be written as separate
1957        #   streams to the Collection.
1958        # This function handles the smallest piece of work currently queued
1959        # (current file, then current directory, then next directory) until
1960        # no work remains.  The _work_THING methods each do a unit of work on
1961        # THING.  _queue_THING methods add a THING to the work queue.
1962        while True:
1963            if self._queued_file:
1964                self._work_file()
1965            elif self._queued_dirents:
1966                self._work_dirents()
1967            elif self._queued_trees:
1968                self._work_trees()
1969            else:
1970                break
1971
1972    def _work_file(self):
1973        while True:
1974            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1975            if not buf:
1976                break
1977            self.write(buf)
1978        self.finish_current_file()
1979        if self._close_file:
1980            self._queued_file.close()
1981        self._close_file = None
1982        self._queued_file = None
1983
1984    def _work_dirents(self):
1985        path, stream_name, max_manifest_depth = self._queued_trees[0]
1986        if stream_name != self.current_stream_name():
1987            self.start_new_stream(stream_name)
1988        while self._queued_dirents:
1989            dirent = self._queued_dirents.popleft()
1990            target = os.path.join(path, dirent)
1991            if os.path.isdir(target):
1992                self._queue_tree(target,
1993                                 os.path.join(stream_name, dirent),
1994                                 max_manifest_depth - 1)
1995            else:
1996                self._queue_file(target, dirent)
1997                break
1998        if not self._queued_dirents:
1999            self._queued_trees.popleft()
2000
2001    def _work_trees(self):
2002        path, stream_name, max_manifest_depth = self._queued_trees[0]
2003        d = arvados.util.listdir_recursive(
2004            path, max_depth = (None if max_manifest_depth == 0 else 0))
2005        if d:
2006            self._queue_dirents(stream_name, d)
2007        else:
2008            self._queued_trees.popleft()
2009
2010    def _queue_file(self, source, filename=None):
2011        assert (self._queued_file is None), "tried to queue more than one file"
2012        if not hasattr(source, 'read'):
2013            source = open(source, 'rb')
2014            self._close_file = True
2015        else:
2016            self._close_file = False
2017        if filename is None:
2018            filename = os.path.basename(source.name)
2019        self.start_new_file(filename)
2020        self._queued_file = source
2021
2022    def _queue_dirents(self, stream_name, dirents):
2023        assert (not self._queued_dirents), "tried to queue more than one tree"
2024        self._queued_dirents = deque(sorted(dirents))
2025
2026    def _queue_tree(self, path, stream_name, max_manifest_depth):
2027        self._queued_trees.append((path, stream_name, max_manifest_depth))
2028
2029    def write_file(self, source, filename=None):
2030        self._queue_file(source, filename)
2031        self.do_queued_work()
2032
2033    def write_directory_tree(self,
2034                             path, stream_name='.', max_manifest_depth=-1):
2035        self._queue_tree(path, stream_name, max_manifest_depth)
2036        self.do_queued_work()
2037
2038    def write(self, newdata):
2039        if isinstance(newdata, bytes):
2040            pass
2041        elif isinstance(newdata, str):
2042            newdata = newdata.encode()
2043        elif hasattr(newdata, '__iter__'):
2044            for s in newdata:
2045                self.write(s)
2046            return
2047        self._data_buffer.append(newdata)
2048        self._data_buffer_len += len(newdata)
2049        self._current_stream_length += len(newdata)
2050        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2051            self.flush_data()
2052
2053    def open(self, streampath, filename=None):
2054        """open(streampath[, filename]) -> file-like object
2055
2056        Pass in the path of a file to write to the Collection, either as a
2057        single string or as two separate stream name and file name arguments.
2058        This method returns a file-like object you can write to add it to the
2059        Collection.
2060
2061        You may only have one file object from the Collection open at a time,
2062        so be sure to close the object when you're done.  Using the object in
2063        a with statement makes that easy:
2064
2065            with cwriter.open('./doc/page1.txt') as outfile:
2066                outfile.write(page1_data)
2067            with cwriter.open('./doc/page2.txt') as outfile:
2068                outfile.write(page2_data)
2069        """
2070        if filename is None:
2071            streampath, filename = split(streampath)
2072        if self._last_open and not self._last_open.closed:
2073            raise errors.AssertionError(
2074                u"can't open '{}' when '{}' is still open".format(
2075                    filename, self._last_open.name))
2076        if streampath != self.current_stream_name():
2077            self.start_new_stream(streampath)
2078        self.set_current_file_name(filename)
2079        self._last_open = _WriterFile(self, filename)
2080        return self._last_open
2081
2082    def flush_data(self):
2083        data_buffer = b''.join(self._data_buffer)
2084        if data_buffer:
2085            self._current_stream_locators.append(
2086                self._my_keep().put(
2087                    data_buffer[0:config.KEEP_BLOCK_SIZE],
2088                    copies=self.replication))
2089            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2090            self._data_buffer_len = len(self._data_buffer[0])
2091
2092    def start_new_file(self, newfilename=None):
2093        self.finish_current_file()
2094        self.set_current_file_name(newfilename)
2095
2096    def set_current_file_name(self, newfilename):
2097        if re.search(r'[\t\n]', newfilename):
2098            raise errors.AssertionError(
2099                "Manifest filenames cannot contain whitespace: %s" %
2100                newfilename)
2101        elif re.search(r'\x00', newfilename):
2102            raise errors.AssertionError(
2103                "Manifest filenames cannot contain NUL characters: %s" %
2104                newfilename)
2105        self._current_file_name = newfilename
2106
2107    def current_file_name(self):
2108        return self._current_file_name
2109
2110    def finish_current_file(self):
2111        if self._current_file_name is None:
2112            if self._current_file_pos == self._current_stream_length:
2113                return
2114            raise errors.AssertionError(
2115                "Cannot finish an unnamed file " +
2116                "(%d bytes at offset %d in '%s' stream)" %
2117                (self._current_stream_length - self._current_file_pos,
2118                 self._current_file_pos,
2119                 self._current_stream_name))
2120        self._current_stream_files.append([
2121                self._current_file_pos,
2122                self._current_stream_length - self._current_file_pos,
2123                self._current_file_name])
2124        self._current_file_pos = self._current_stream_length
2125        self._current_file_name = None
2126
2127    def start_new_stream(self, newstreamname='.'):
2128        self.finish_current_stream()
2129        self.set_current_stream_name(newstreamname)
2130
2131    def set_current_stream_name(self, newstreamname):
2132        if re.search(r'[\t\n]', newstreamname):
2133            raise errors.AssertionError(
2134                "Manifest stream names cannot contain whitespace: '%s'" %
2135                (newstreamname))
2136        self._current_stream_name = '.' if newstreamname=='' else newstreamname
2137
2138    def current_stream_name(self):
2139        return self._current_stream_name
2140
2141    def finish_current_stream(self):
2142        self.finish_current_file()
2143        self.flush_data()
2144        if not self._current_stream_files:
2145            pass
2146        elif self._current_stream_name is None:
2147            raise errors.AssertionError(
2148                "Cannot finish an unnamed stream (%d bytes in %d files)" %
2149                (self._current_stream_length, len(self._current_stream_files)))
2150        else:
2151            if not self._current_stream_locators:
2152                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2153            self._finished_streams.append([self._current_stream_name,
2154                                           self._current_stream_locators,
2155                                           self._current_stream_files])
2156        self._current_stream_files = []
2157        self._current_stream_length = 0
2158        self._current_stream_locators = []
2159        self._current_stream_name = None
2160        self._current_file_pos = 0
2161        self._current_file_name = None
2162
2163    def finish(self):
2164        """Store the manifest in Keep and return its locator.
2165
2166        This is useful for storing manifest fragments (task outputs)
2167        temporarily in Keep during a Crunch job.
2168
2169        In other cases you should make a collection instead, by
2170        sending manifest_text() to the API server's "create
2171        collection" endpoint.
2172        """
2173        return self._my_keep().put(self.manifest_text().encode(),
2174                                   copies=self.replication)
2175
2176    def portable_data_hash(self):
2177        stripped = self.stripped_manifest().encode()
2178        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2179
2180    def manifest_text(self):
2181        self.finish_current_stream()
2182        manifest = ''
2183
2184        for stream in self._finished_streams:
2185            if not re.search(r'^\.(/.*)?$', stream[0]):
2186                manifest += './'
2187            manifest += stream[0].replace(' ', '\\040')
2188            manifest += ' ' + ' '.join(stream[1])
2189            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2190            manifest += "\n"
2191
2192        return manifest
2193
2194    def data_locators(self):
2195        ret = []
2196        for name, locators, files in self._finished_streams:
2197            ret += locators
2198        return ret
2199
2200    def save_new(self, name=None):
2201        return self._api_client.collections().create(
2202            ensure_unique_name=True,
2203            body={
2204                'name': name,
2205                'manifest_text': self.manifest_text(),
2206            }).execute(num_retries=self.num_retries)
2207
2208
2209class ResumableCollectionWriter(CollectionWriter):
2210    """CollectionWriter that can serialize internal state to disk
2211
2212    .. WARNING:: Deprecated
2213       This class is deprecated. Prefer `arvados.collection.Collection`
2214       instead.
2215    """
2216
2217    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2218                   '_current_stream_locators', '_current_stream_name',
2219                   '_current_file_name', '_current_file_pos', '_close_file',
2220                   '_data_buffer', '_dependencies', '_finished_streams',
2221                   '_queued_dirents', '_queued_trees']
2222
2223    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2224    def __init__(self, api_client=None, **kwargs):
2225        self._dependencies = {}
2226        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2227
2228    @classmethod
2229    def from_state(cls, state, *init_args, **init_kwargs):
2230        # Try to build a new writer from scratch with the given state.
2231        # If the state is not suitable to resume (because files have changed,
2232        # been deleted, aren't predictable, etc.), raise a
2233        # StaleWriterStateError.  Otherwise, return the initialized writer.
2234        # The caller is responsible for calling writer.do_queued_work()
2235        # appropriately after it's returned.
2236        writer = cls(*init_args, **init_kwargs)
2237        for attr_name in cls.STATE_PROPS:
2238            attr_value = state[attr_name]
2239            attr_class = getattr(writer, attr_name).__class__
2240            # Coerce the value into the same type as the initial value, if
2241            # needed.
2242            if attr_class not in (type(None), attr_value.__class__):
2243                attr_value = attr_class(attr_value)
2244            setattr(writer, attr_name, attr_value)
2245        # Check dependencies before we try to resume anything.
2246        if any(KeepLocator(ls).permission_expired()
2247               for ls in writer._current_stream_locators):
2248            raise errors.StaleWriterStateError(
2249                "locators include expired permission hint")
2250        writer.check_dependencies()
2251        if state['_current_file'] is not None:
2252            path, pos = state['_current_file']
2253            try:
2254                writer._queued_file = open(path, 'rb')
2255                writer._queued_file.seek(pos)
2256            except IOError as error:
2257                raise errors.StaleWriterStateError(
2258                    u"failed to reopen active file {}: {}".format(path, error))
2259        return writer
2260
2261    def check_dependencies(self):
2262        for path, orig_stat in listitems(self._dependencies):
2263            if not S_ISREG(orig_stat[ST_MODE]):
2264                raise errors.StaleWriterStateError(u"{} not file".format(path))
2265            try:
2266                now_stat = tuple(os.stat(path))
2267            except OSError as error:
2268                raise errors.StaleWriterStateError(
2269                    u"failed to stat {}: {}".format(path, error))
2270            if ((not S_ISREG(now_stat[ST_MODE])) or
2271                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2272                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2273                raise errors.StaleWriterStateError(u"{} changed".format(path))
2274
2275    def dump_state(self, copy_func=lambda x: x):
2276        state = {attr: copy_func(getattr(self, attr))
2277                 for attr in self.STATE_PROPS}
2278        if self._queued_file is None:
2279            state['_current_file'] = None
2280        else:
2281            state['_current_file'] = (os.path.realpath(self._queued_file.name),
2282                                      self._queued_file.tell())
2283        return state
2284
2285    def _queue_file(self, source, filename=None):
2286        try:
2287            src_path = os.path.realpath(source)
2288        except Exception:
2289            raise errors.AssertionError(u"{} not a file path".format(source))
2290        try:
2291            path_stat = os.stat(src_path)
2292        except OSError as stat_error:
2293            path_stat = None
2294        super(ResumableCollectionWriter, self)._queue_file(source, filename)
2295        fd_stat = os.fstat(self._queued_file.fileno())
2296        if not S_ISREG(fd_stat.st_mode):
2297            # We won't be able to resume from this cache anyway, so don't
2298            # worry about further checks.
2299            self._dependencies[source] = tuple(fd_stat)
2300        elif path_stat is None:
2301            raise errors.AssertionError(
2302                u"could not stat {}: {}".format(source, stat_error))
2303        elif path_stat.st_ino != fd_stat.st_ino:
2304            raise errors.AssertionError(
2305                u"{} changed between open and stat calls".format(source))
2306        else:
2307            self._dependencies[src_path] = tuple(fd_stat)
2308
2309    def write(self, data):
2310        if self._queued_file is None:
2311            raise errors.AssertionError(
2312                "resumable writer can't accept unsourced data")
2313        return super(ResumableCollectionWriter, self).write(data)
ADD = 'add'

Argument value for Collection methods to represent an added item

DEL = 'del'

Argument value for Collection methods to represent a removed item

MOD = 'mod'

Argument value for Collection methods to represent a modified item

TOK = 'tok'

Argument value for Collection methods to represent an item with token differences

FILE = 'file'

create_type value for Collection.find_or_create

COLLECTION = 'collection'

create_type value for Collection.find_or_create

ChangeList = typing.List[typing.Union[typing.Tuple[typing.Literal['add', 'del'], str, ForwardRef('Collection')], typing.Tuple[typing.Literal['mod', 'tok'], str, ForwardRef('Collection'), ForwardRef('Collection')]]]
ChangeType = typing.Literal['add', 'del', 'mod', 'tok']
CollectionItem = typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]
ChangeCallback = typing.Callable[[typing.Literal['add', 'del', 'mod', 'tok'], ForwardRef('Collection'), str, typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]], object]
CreateType = typing.Literal['collection', 'file']
Properties = typing.Dict[str, typing.Any]
StorageClasses = typing.List[str]
class CollectionBase:
 93class CollectionBase(object):
 94    """Abstract base class for Collection classes
 95
 96    .. ATTENTION:: Internal
 97       This class is meant to be used by other parts of the SDK. User code
 98       should instantiate or subclass `Collection` or one of its subclasses
 99       directly.
100    """
101
102    def __enter__(self):
103        """Enter a context block with this collection instance"""
104        return self
105
106    def __exit__(self, exc_type, exc_value, traceback):
107        """Exit a context block with this collection instance"""
108        pass
109
110    def _my_keep(self):
111        if self._keep_client is None:
112            self._keep_client = KeepClient(api_client=self._api_client,
113                                           num_retries=self.num_retries)
114        return self._keep_client
115
116    def stripped_manifest(self) -> str:
117        """Create a copy of the collection manifest with only size hints
118
119        This method returns a string with the current collection's manifest
120        text with all non-portable locator hints like permission hints and
121        remote cluster hints removed. The only hints in the returned manifest
122        will be size hints.
123        """
124        raw = self.manifest_text()
125        clean = []
126        for line in raw.split("\n"):
127            fields = line.split()
128            if fields:
129                clean_fields = fields[:1] + [
130                    (re.sub(r'\+[^\d][^\+]*', '', x)
131                     if re.match(arvados.util.keep_locator_pattern, x)
132                     else x)
133                    for x in fields[1:]]
134                clean += [' '.join(clean_fields), "\n"]
135        return ''.join(clean)

Abstract base class for Collection classes

def stripped_manifest(self) -> str:
116    def stripped_manifest(self) -> str:
117        """Create a copy of the collection manifest with only size hints
118
119        This method returns a string with the current collection's manifest
120        text with all non-portable locator hints like permission hints and
121        remote cluster hints removed. The only hints in the returned manifest
122        will be size hints.
123        """
124        raw = self.manifest_text()
125        clean = []
126        for line in raw.split("\n"):
127            fields = line.split()
128            if fields:
129                clean_fields = fields[:1] + [
130                    (re.sub(r'\+[^\d][^\+]*', '', x)
131                     if re.match(arvados.util.keep_locator_pattern, x)
132                     else x)
133                    for x in fields[1:]]
134                clean += [' '.join(clean_fields), "\n"]
135        return ''.join(clean)

Create a copy of the collection manifest with only size hints

This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.

class RichCollectionBase(CollectionBase):
 161class RichCollectionBase(CollectionBase):
 162    """Base class for Collection classes
 163
 164    .. ATTENTION:: Internal
 165       This class is meant to be used by other parts of the SDK. User code
 166       should instantiate or subclass `Collection` or one of its subclasses
 167       directly.
 168    """
 169
 170    def __init__(self, parent=None):
 171        self.parent = parent
 172        self._committed = False
 173        self._has_remote_blocks = False
 174        self._callback = None
 175        self._items = {}
 176
 177    def _my_api(self):
 178        raise NotImplementedError()
 179
 180    def _my_keep(self):
 181        raise NotImplementedError()
 182
 183    def _my_block_manager(self):
 184        raise NotImplementedError()
 185
 186    def writable(self) -> bool:
 187        """Indicate whether this collection object can be modified
 188
 189        This method returns `False` if this object is a `CollectionReader`,
 190        else `True`.
 191        """
 192        raise NotImplementedError()
 193
 194    def root_collection(self) -> 'Collection':
 195        """Get this collection's root collection object
 196
 197        If you open a subcollection with `Collection.find`, calling this method
 198        on that subcollection returns the source Collection object.
 199        """
 200        raise NotImplementedError()
 201
 202    def stream_name(self) -> str:
 203        """Get the name of the manifest stream represented by this collection
 204
 205        If you open a subcollection with `Collection.find`, calling this method
 206        on that subcollection returns the name of the stream you opened.
 207        """
 208        raise NotImplementedError()
 209
 210    @synchronized
 211    def has_remote_blocks(self) -> bool:
 212        """Indiciate whether the collection refers to remote data
 213
 214        Returns `True` if the collection manifest includes any Keep locators
 215        with a remote hint (`+R`), else `False`.
 216        """
 217        if self._has_remote_blocks:
 218            return True
 219        for item in self:
 220            if self[item].has_remote_blocks():
 221                return True
 222        return False
 223
 224    @synchronized
 225    def set_has_remote_blocks(self, val: bool) -> None:
 226        """Cache whether this collection refers to remote blocks
 227
 228        .. ATTENTION:: Internal
 229           This method is only meant to be used by other Collection methods.
 230
 231        Set this collection's cached "has remote blocks" flag to the given
 232        value.
 233        """
 234        self._has_remote_blocks = val
 235        if self.parent:
 236            self.parent.set_has_remote_blocks(val)
 237
 238    @must_be_writable
 239    @synchronized
 240    def find_or_create(
 241            self,
 242            path: str,
 243            create_type: CreateType,
 244    ) -> CollectionItem:
 245        """Get the item at the given path, creating it if necessary
 246
 247        If `path` refers to a stream in this collection, returns a
 248        corresponding `Subcollection` object. If `path` refers to a file in
 249        this collection, returns a corresponding
 250        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 251        this collection, then this method creates a new object and returns
 252        it, creating parent streams as needed. The type of object created is
 253        determined by the value of `create_type`.
 254
 255        Arguments:
 256
 257        * path: str --- The path to find or create within this collection.
 258
 259        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 260          create at `path` if one does not exist. Passing `COLLECTION`
 261          creates a stream and returns the corresponding
 262          `Subcollection`. Passing `FILE` creates a new file and returns the
 263          corresponding `arvados.arvfile.ArvadosFile`.
 264        """
 265        pathcomponents = path.split("/", 1)
 266        if pathcomponents[0]:
 267            item = self._items.get(pathcomponents[0])
 268            if len(pathcomponents) == 1:
 269                if item is None:
 270                    # create new file
 271                    if create_type == COLLECTION:
 272                        item = Subcollection(self, pathcomponents[0])
 273                    else:
 274                        item = ArvadosFile(self, pathcomponents[0])
 275                    self._items[pathcomponents[0]] = item
 276                    self.set_committed(False)
 277                    self.notify(ADD, self, pathcomponents[0], item)
 278                return item
 279            else:
 280                if item is None:
 281                    # create new collection
 282                    item = Subcollection(self, pathcomponents[0])
 283                    self._items[pathcomponents[0]] = item
 284                    self.set_committed(False)
 285                    self.notify(ADD, self, pathcomponents[0], item)
 286                if isinstance(item, RichCollectionBase):
 287                    return item.find_or_create(pathcomponents[1], create_type)
 288                else:
 289                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 290        else:
 291            return self
 292
 293    @synchronized
 294    def find(self, path: str) -> CollectionItem:
 295        """Get the item at the given path
 296
 297        If `path` refers to a stream in this collection, returns a
 298        corresponding `Subcollection` object. If `path` refers to a file in
 299        this collection, returns a corresponding
 300        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 301        this collection, then this method raises `NotADirectoryError`.
 302
 303        Arguments:
 304
 305        * path: str --- The path to find or create within this collection.
 306        """
 307        if not path:
 308            raise errors.ArgumentError("Parameter 'path' is empty.")
 309
 310        pathcomponents = path.split("/", 1)
 311        if pathcomponents[0] == '':
 312            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 313
 314        item = self._items.get(pathcomponents[0])
 315        if item is None:
 316            return None
 317        elif len(pathcomponents) == 1:
 318            return item
 319        else:
 320            if isinstance(item, RichCollectionBase):
 321                if pathcomponents[1]:
 322                    return item.find(pathcomponents[1])
 323                else:
 324                    return item
 325            else:
 326                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 327
 328    @synchronized
 329    def mkdirs(self, path: str) -> 'Subcollection':
 330        """Create and return a subcollection at `path`
 331
 332        If `path` exists within this collection, raises `FileExistsError`.
 333        Otherwise, creates a stream at that path and returns the
 334        corresponding `Subcollection`.
 335        """
 336        if self.find(path) != None:
 337            raise IOError(errno.EEXIST, "Directory or file exists", path)
 338
 339        return self.find_or_create(path, COLLECTION)
 340
 341    def open(
 342            self,
 343            path: str,
 344            mode: str="r",
 345            encoding: Optional[str]=None
 346    ) -> IO:
 347        """Open a file-like object within the collection
 348
 349        This method returns a file-like object that can read and/or write the
 350        file located at `path` within the collection. If you attempt to write
 351        a `path` that does not exist, the file is created with `find_or_create`.
 352        If the file cannot be opened for any other reason, this method raises
 353        `OSError` with an appropriate errno.
 354
 355        Arguments:
 356
 357        * path: str --- The path of the file to open within this collection
 358
 359        * mode: str --- The mode to open this file. Supports all the same
 360          values as `builtins.open`.
 361
 362        * encoding: str | None --- The text encoding of the file. Only used
 363          when the file is opened in text mode. The default is
 364          platform-dependent.
 365
 366        """
 367        if not re.search(r'^[rwa][bt]?\+?$', mode):
 368            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 369
 370        if mode[0] == 'r' and '+' not in mode:
 371            fclass = ArvadosFileReader
 372            arvfile = self.find(path)
 373        elif not self.writable():
 374            raise IOError(errno.EROFS, "Collection is read only")
 375        else:
 376            fclass = ArvadosFileWriter
 377            arvfile = self.find_or_create(path, FILE)
 378
 379        if arvfile is None:
 380            raise IOError(errno.ENOENT, "File not found", path)
 381        if not isinstance(arvfile, ArvadosFile):
 382            raise IOError(errno.EISDIR, "Is a directory", path)
 383
 384        if mode[0] == 'w':
 385            arvfile.truncate(0)
 386
 387        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 388        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 389        if 'b' not in mode:
 390            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 391            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 392        return f
 393
 394    def modified(self) -> bool:
 395        """Indicate whether this collection has an API server record
 396
 397        Returns `False` if this collection corresponds to a record loaded from
 398        the API server, `True` otherwise.
 399        """
 400        return not self.committed()
 401
 402    @synchronized
 403    def committed(self):
 404        """Indicate whether this collection has an API server record
 405
 406        Returns `True` if this collection corresponds to a record loaded from
 407        the API server, `False` otherwise.
 408        """
 409        return self._committed
 410
 411    @synchronized
 412    def set_committed(self, value: bool=True):
 413        """Cache whether this collection has an API server record
 414
 415        .. ATTENTION:: Internal
 416           This method is only meant to be used by other Collection methods.
 417
 418        Set this collection's cached "committed" flag to the given
 419        value and propagates it as needed.
 420        """
 421        if value == self._committed:
 422            return
 423        if value:
 424            for k,v in listitems(self._items):
 425                v.set_committed(True)
 426            self._committed = True
 427        else:
 428            self._committed = False
 429            if self.parent is not None:
 430                self.parent.set_committed(False)
 431
 432    @synchronized
 433    def __iter__(self) -> Iterator[str]:
 434        """Iterate names of streams and files in this collection
 435
 436        This method does not recurse. It only iterates the contents of this
 437        collection's corresponding stream.
 438        """
 439        return iter(viewkeys(self._items))
 440
 441    @synchronized
 442    def __getitem__(self, k: str) -> CollectionItem:
 443        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 444
 445        This method does not recurse. If you want to search a path, use
 446        `RichCollectionBase.find` instead.
 447        """
 448        return self._items[k]
 449
 450    @synchronized
 451    def __contains__(self, k: str) -> bool:
 452        """Indicate whether this collection has an item with this name
 453
 454        This method does not recurse. It you want to check a path, use
 455        `RichCollectionBase.exists` instead.
 456        """
 457        return k in self._items
 458
 459    @synchronized
 460    def __len__(self):
 461        """Get the number of items directly contained in this collection
 462
 463        This method does not recurse. It only counts the streams and files
 464        in this collection's corresponding stream.
 465        """
 466        return len(self._items)
 467
 468    @must_be_writable
 469    @synchronized
 470    def __delitem__(self, p: str) -> None:
 471        """Delete an item from this collection's stream
 472
 473        This method does not recurse. If you want to remove an item by a
 474        path, use `RichCollectionBase.remove` instead.
 475        """
 476        del self._items[p]
 477        self.set_committed(False)
 478        self.notify(DEL, self, p, None)
 479
 480    @synchronized
 481    def keys(self) -> Iterator[str]:
 482        """Iterate names of streams and files in this collection
 483
 484        This method does not recurse. It only iterates the contents of this
 485        collection's corresponding stream.
 486        """
 487        return self._items.keys()
 488
 489    @synchronized
 490    def values(self) -> List[CollectionItem]:
 491        """Get a list of objects in this collection's stream
 492
 493        The return value includes a `Subcollection` for every stream, and an
 494        `arvados.arvfile.ArvadosFile` for every file, directly within this
 495        collection's stream.  This method does not recurse.
 496        """
 497        return listvalues(self._items)
 498
 499    @synchronized
 500    def items(self) -> List[Tuple[str, CollectionItem]]:
 501        """Get a list of `(name, object)` tuples from this collection's stream
 502
 503        The return value includes a `Subcollection` for every stream, and an
 504        `arvados.arvfile.ArvadosFile` for every file, directly within this
 505        collection's stream.  This method does not recurse.
 506        """
 507        return listitems(self._items)
 508
 509    def exists(self, path: str) -> bool:
 510        """Indicate whether this collection includes an item at `path`
 511
 512        This method returns `True` if `path` refers to a stream or file within
 513        this collection, else `False`.
 514
 515        Arguments:
 516
 517        * path: str --- The path to check for existence within this collection
 518        """
 519        return self.find(path) is not None
 520
 521    @must_be_writable
 522    @synchronized
 523    def remove(self, path: str, recursive: bool=False) -> None:
 524        """Remove the file or stream at `path`
 525
 526        Arguments:
 527
 528        * path: str --- The path of the item to remove from the collection
 529
 530        * recursive: bool --- Controls the method's behavior if `path` refers
 531          to a nonempty stream. If `False` (the default), this method raises
 532          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 533          items under the stream.
 534        """
 535        if not path:
 536            raise errors.ArgumentError("Parameter 'path' is empty.")
 537
 538        pathcomponents = path.split("/", 1)
 539        item = self._items.get(pathcomponents[0])
 540        if item is None:
 541            raise IOError(errno.ENOENT, "File not found", path)
 542        if len(pathcomponents) == 1:
 543            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 544                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 545            deleteditem = self._items[pathcomponents[0]]
 546            del self._items[pathcomponents[0]]
 547            self.set_committed(False)
 548            self.notify(DEL, self, pathcomponents[0], deleteditem)
 549        else:
 550            item.remove(pathcomponents[1], recursive=recursive)
 551
 552    def _clonefrom(self, source):
 553        for k,v in listitems(source):
 554            self._items[k] = v.clone(self, k)
 555
 556    def clone(self):
 557        raise NotImplementedError()
 558
 559    @must_be_writable
 560    @synchronized
 561    def add(
 562            self,
 563            source_obj: CollectionItem,
 564            target_name: str,
 565            overwrite: bool=False,
 566            reparent: bool=False,
 567    ) -> None:
 568        """Copy or move a file or subcollection object to this collection
 569
 570        Arguments:
 571
 572        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 573          to add to this collection
 574
 575        * target_name: str --- The path inside this collection where
 576          `source_obj` should be added.
 577
 578        * overwrite: bool --- Controls the behavior of this method when the
 579          collection already contains an object at `target_name`. If `False`
 580          (the default), this method will raise `FileExistsError`. If `True`,
 581          the object at `target_name` will be replaced with `source_obj`.
 582
 583        * reparent: bool --- Controls whether this method copies or moves
 584          `source_obj`. If `False` (the default), `source_obj` is copied into
 585          this collection. If `True`, `source_obj` is moved into this
 586          collection.
 587        """
 588        if target_name in self and not overwrite:
 589            raise IOError(errno.EEXIST, "File already exists", target_name)
 590
 591        modified_from = None
 592        if target_name in self:
 593            modified_from = self[target_name]
 594
 595        # Actually make the move or copy.
 596        if reparent:
 597            source_obj._reparent(self, target_name)
 598            item = source_obj
 599        else:
 600            item = source_obj.clone(self, target_name)
 601
 602        self._items[target_name] = item
 603        self.set_committed(False)
 604        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 605            self.set_has_remote_blocks(True)
 606
 607        if modified_from:
 608            self.notify(MOD, self, target_name, (modified_from, item))
 609        else:
 610            self.notify(ADD, self, target_name, item)
 611
 612    def _get_src_target(self, source, target_path, source_collection, create_dest):
 613        if source_collection is None:
 614            source_collection = self
 615
 616        # Find the object
 617        if isinstance(source, basestring):
 618            source_obj = source_collection.find(source)
 619            if source_obj is None:
 620                raise IOError(errno.ENOENT, "File not found", source)
 621            sourcecomponents = source.split("/")
 622        else:
 623            source_obj = source
 624            sourcecomponents = None
 625
 626        # Find parent collection the target path
 627        targetcomponents = target_path.split("/")
 628
 629        # Determine the name to use.
 630        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 631
 632        if not target_name:
 633            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 634
 635        if create_dest:
 636            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 637        else:
 638            if len(targetcomponents) > 1:
 639                target_dir = self.find("/".join(targetcomponents[0:-1]))
 640            else:
 641                target_dir = self
 642
 643        if target_dir is None:
 644            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 645
 646        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 647            target_dir = target_dir[target_name]
 648            target_name = sourcecomponents[-1]
 649
 650        return (source_obj, target_dir, target_name)
 651
 652    @must_be_writable
 653    @synchronized
 654    def copy(
 655            self,
 656            source: Union[str, CollectionItem],
 657            target_path: str,
 658            source_collection: Optional['RichCollectionBase']=None,
 659            overwrite: bool=False,
 660    ) -> None:
 661        """Copy a file or subcollection object to this collection
 662
 663        Arguments:
 664
 665        * source: str | arvados.arvfile.ArvadosFile |
 666          arvados.collection.Subcollection --- The file or subcollection to
 667          add to this collection. If `source` is a str, the object will be
 668          found by looking up this path from `source_collection` (see
 669          below).
 670
 671        * target_path: str --- The path inside this collection where the
 672          source object should be added.
 673
 674        * source_collection: arvados.collection.Collection | None --- The
 675          collection to find the source object from when `source` is a
 676          path. Defaults to the current collection (`self`).
 677
 678        * overwrite: bool --- Controls the behavior of this method when the
 679          collection already contains an object at `target_path`. If `False`
 680          (the default), this method will raise `FileExistsError`. If `True`,
 681          the object at `target_path` will be replaced with `source_obj`.
 682        """
 683        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 684        target_dir.add(source_obj, target_name, overwrite, False)
 685
 686    @must_be_writable
 687    @synchronized
 688    def rename(
 689            self,
 690            source: Union[str, CollectionItem],
 691            target_path: str,
 692            source_collection: Optional['RichCollectionBase']=None,
 693            overwrite: bool=False,
 694    ) -> None:
 695        """Move a file or subcollection object to this collection
 696
 697        Arguments:
 698
 699        * source: str | arvados.arvfile.ArvadosFile |
 700          arvados.collection.Subcollection --- The file or subcollection to
 701          add to this collection. If `source` is a str, the object will be
 702          found by looking up this path from `source_collection` (see
 703          below).
 704
 705        * target_path: str --- The path inside this collection where the
 706          source object should be added.
 707
 708        * source_collection: arvados.collection.Collection | None --- The
 709          collection to find the source object from when `source` is a
 710          path. Defaults to the current collection (`self`).
 711
 712        * overwrite: bool --- Controls the behavior of this method when the
 713          collection already contains an object at `target_path`. If `False`
 714          (the default), this method will raise `FileExistsError`. If `True`,
 715          the object at `target_path` will be replaced with `source_obj`.
 716        """
 717        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 718        if not source_obj.writable():
 719            raise IOError(errno.EROFS, "Source collection is read only", source)
 720        target_dir.add(source_obj, target_name, overwrite, True)
 721
 722    def portable_manifest_text(self, stream_name: str=".") -> str:
 723        """Get the portable manifest text for this collection
 724
 725        The portable manifest text is normalized, and does not include access
 726        tokens. This method does not flush outstanding blocks to Keep.
 727
 728        Arguments:
 729
 730        * stream_name: str --- The name to use for this collection's stream in
 731          the generated manifest. Default `'.'`.
 732        """
 733        return self._get_manifest_text(stream_name, True, True)
 734
 735    @synchronized
 736    def manifest_text(
 737            self,
 738            stream_name: str=".",
 739            strip: bool=False,
 740            normalize: bool=False,
 741            only_committed: bool=False,
 742    ) -> str:
 743        """Get the manifest text for this collection
 744
 745        Arguments:
 746
 747        * stream_name: str --- The name to use for this collection's stream in
 748          the generated manifest. Default `'.'`.
 749
 750        * strip: bool --- Controls whether or not the returned manifest text
 751          includes access tokens. If `False` (the default), the manifest text
 752          will include access tokens. If `True`, the manifest text will not
 753          include access tokens.
 754
 755        * normalize: bool --- Controls whether or not the returned manifest
 756          text is normalized. Default `False`.
 757
 758        * only_committed: bool --- Controls whether or not this method uploads
 759          pending data to Keep before building and returning the manifest text.
 760          If `False` (the default), this method will finish uploading all data
 761          to Keep, then return the final manifest. If `True`, this method will
 762          build and return a manifest that only refers to the data that has
 763          finished uploading at the time this method was called.
 764        """
 765        if not only_committed:
 766            self._my_block_manager().commit_all()
 767        return self._get_manifest_text(stream_name, strip, normalize,
 768                                       only_committed=only_committed)
 769
 770    @synchronized
 771    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 772        """Get the manifest text for this collection, sub collections and files.
 773
 774        :stream_name:
 775          Name to use for this stream (directory)
 776
 777        :strip:
 778          If True, remove signing tokens from block locators if present.
 779          If False (default), block locators are left unchanged.
 780
 781        :normalize:
 782          If True, always export the manifest text in normalized form
 783          even if the Collection is not modified.  If False (default) and the collection
 784          is not modified, return the original manifest text even if it is not
 785          in normalized form.
 786
 787        :only_committed:
 788          If True, only include blocks that were already committed to Keep.
 789
 790        """
 791
 792        if not self.committed() or self._manifest_text is None or normalize:
 793            stream = {}
 794            buf = []
 795            sorted_keys = sorted(self.keys())
 796            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 797                # Create a stream per file `k`
 798                arvfile = self[filename]
 799                filestream = []
 800                for segment in arvfile.segments():
 801                    loc = segment.locator
 802                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 803                        if only_committed:
 804                            continue
 805                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 806                    if strip:
 807                        loc = KeepLocator(loc).stripped()
 808                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
 809                                         segment.segment_offset, segment.range_size))
 810                stream[filename] = filestream
 811            if stream:
 812                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
 813            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 814                buf.append(self[dirname].manifest_text(
 815                    stream_name=os.path.join(stream_name, dirname),
 816                    strip=strip, normalize=True, only_committed=only_committed))
 817            return "".join(buf)
 818        else:
 819            if strip:
 820                return self.stripped_manifest()
 821            else:
 822                return self._manifest_text
 823
 824    @synchronized
 825    def _copy_remote_blocks(self, remote_blocks={}):
 826        """Scan through the entire collection and ask Keep to copy remote blocks.
 827
 828        When accessing a remote collection, blocks will have a remote signature
 829        (+R instead of +A). Collect these signatures and request Keep to copy the
 830        blocks to the local cluster, returning local (+A) signatures.
 831
 832        :remote_blocks:
 833          Shared cache of remote to local block mappings. This is used to avoid
 834          doing extra work when blocks are shared by more than one file in
 835          different subdirectories.
 836
 837        """
 838        for item in self:
 839            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 840        return remote_blocks
 841
 842    @synchronized
 843    def diff(
 844            self,
 845            end_collection: 'RichCollectionBase',
 846            prefix: str=".",
 847            holding_collection: Optional['Collection']=None,
 848    ) -> ChangeList:
 849        """Build a list of differences between this collection and another
 850
 851        Arguments:
 852
 853        * end_collection: arvados.collection.RichCollectionBase --- A
 854          collection object with the desired end state. The returned diff
 855          list will describe how to go from the current collection object
 856          `self` to `end_collection`.
 857
 858        * prefix: str --- The name to use for this collection's stream in
 859          the diff list. Default `'.'`.
 860
 861        * holding_collection: arvados.collection.Collection | None --- A
 862          collection object used to hold objects for the returned diff
 863          list. By default, a new empty collection is created.
 864        """
 865        changes = []
 866        if holding_collection is None:
 867            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 868        for k in self:
 869            if k not in end_collection:
 870               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 871        for k in end_collection:
 872            if k in self:
 873                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 874                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 875                elif end_collection[k] != self[k]:
 876                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 877                else:
 878                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 879            else:
 880                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 881        return changes
 882
 883    @must_be_writable
 884    @synchronized
 885    def apply(self, changes: ChangeList) -> None:
 886        """Apply a list of changes from to this collection
 887
 888        This method takes a list of changes generated by
 889        `RichCollectionBase.diff` and applies it to this
 890        collection. Afterward, the state of this collection object will
 891        match the state of `end_collection` passed to `diff`. If a change
 892        conflicts with a local change, it will be saved to an alternate path
 893        indicating the conflict.
 894
 895        Arguments:
 896
 897        * changes: arvados.collection.ChangeList --- The list of differences
 898          generated by `RichCollectionBase.diff`.
 899        """
 900        if changes:
 901            self.set_committed(False)
 902        for change in changes:
 903            event_type = change[0]
 904            path = change[1]
 905            initial = change[2]
 906            local = self.find(path)
 907            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 908                                                                    time.gmtime()))
 909            if event_type == ADD:
 910                if local is None:
 911                    # No local file at path, safe to copy over new file
 912                    self.copy(initial, path)
 913                elif local is not None and local != initial:
 914                    # There is already local file and it is different:
 915                    # save change to conflict file.
 916                    self.copy(initial, conflictpath)
 917            elif event_type == MOD or event_type == TOK:
 918                final = change[3]
 919                if local == initial:
 920                    # Local matches the "initial" item so it has not
 921                    # changed locally and is safe to update.
 922                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 923                        # Replace contents of local file with new contents
 924                        local.replace_contents(final)
 925                    else:
 926                        # Overwrite path with new item; this can happen if
 927                        # path was a file and is now a collection or vice versa
 928                        self.copy(final, path, overwrite=True)
 929                else:
 930                    # Local is missing (presumably deleted) or local doesn't
 931                    # match the "start" value, so save change to conflict file
 932                    self.copy(final, conflictpath)
 933            elif event_type == DEL:
 934                if local == initial:
 935                    # Local item matches "initial" value, so it is safe to remove.
 936                    self.remove(path, recursive=True)
 937                # else, the file is modified or already removed, in either
 938                # case we don't want to try to remove it.
 939
 940    def portable_data_hash(self) -> str:
 941        """Get the portable data hash for this collection's manifest"""
 942        if self._manifest_locator and self.committed():
 943            # If the collection is already saved on the API server, and it's committed
 944            # then return API server's PDH response.
 945            return self._portable_data_hash
 946        else:
 947            stripped = self.portable_manifest_text().encode()
 948            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 949
 950    @synchronized
 951    def subscribe(self, callback: ChangeCallback) -> None:
 952        """Set a notify callback for changes to this collection
 953
 954        Arguments:
 955
 956        * callback: arvados.collection.ChangeCallback --- The callable to
 957          call each time the collection is changed.
 958        """
 959        if self._callback is None:
 960            self._callback = callback
 961        else:
 962            raise errors.ArgumentError("A callback is already set on this collection.")
 963
 964    @synchronized
 965    def unsubscribe(self) -> None:
 966        """Remove any notify callback set for changes to this collection"""
 967        if self._callback is not None:
 968            self._callback = None
 969
 970    @synchronized
 971    def notify(
 972            self,
 973            event: ChangeType,
 974            collection: 'RichCollectionBase',
 975            name: str,
 976            item: CollectionItem,
 977    ) -> None:
 978        """Notify any subscribed callback about a change to this collection
 979
 980        .. ATTENTION:: Internal
 981           This method is only meant to be used by other Collection methods.
 982
 983        If a callback has been registered with `RichCollectionBase.subscribe`,
 984        it will be called with information about a change to this collection.
 985        Then this notification will be propagated to this collection's root.
 986
 987        Arguments:
 988
 989        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 990          the collection.
 991
 992        * collection: arvados.collection.RichCollectionBase --- The
 993          collection that was modified.
 994
 995        * name: str --- The name of the file or stream within `collection` that
 996          was modified.
 997
 998        * item: arvados.arvfile.ArvadosFile |
 999          arvados.collection.Subcollection --- The new contents at `name`
1000          within `collection`.
1001        """
1002        if self._callback:
1003            self._callback(event, collection, name, item)
1004        self.root_collection().notify(event, collection, name, item)
1005
1006    @synchronized
1007    def __eq__(self, other: Any) -> bool:
1008        """Indicate whether this collection object is equal to another"""
1009        if other is self:
1010            return True
1011        if not isinstance(other, RichCollectionBase):
1012            return False
1013        if len(self._items) != len(other):
1014            return False
1015        for k in self._items:
1016            if k not in other:
1017                return False
1018            if self._items[k] != other[k]:
1019                return False
1020        return True
1021
1022    def __ne__(self, other: Any) -> bool:
1023        """Indicate whether this collection object is not equal to another"""
1024        return not self.__eq__(other)
1025
1026    @synchronized
1027    def flush(self) -> None:
1028        """Upload any pending data to Keep"""
1029        for e in listvalues(self):
1030            e.flush()

Base class for Collection classes

RichCollectionBase(parent=None)
170    def __init__(self, parent=None):
171        self.parent = parent
172        self._committed = False
173        self._has_remote_blocks = False
174        self._callback = None
175        self._items = {}
parent
def writable(self) -> bool:
186    def writable(self) -> bool:
187        """Indicate whether this collection object can be modified
188
189        This method returns `False` if this object is a `CollectionReader`,
190        else `True`.
191        """
192        raise NotImplementedError()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def root_collection(self) -> Collection:
194    def root_collection(self) -> 'Collection':
195        """Get this collection's root collection object
196
197        If you open a subcollection with `Collection.find`, calling this method
198        on that subcollection returns the source Collection object.
199        """
200        raise NotImplementedError()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def stream_name(self) -> str:
202    def stream_name(self) -> str:
203        """Get the name of the manifest stream represented by this collection
204
205        If you open a subcollection with `Collection.find`, calling this method
206        on that subcollection returns the name of the stream you opened.
207        """
208        raise NotImplementedError()

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def has_remote_blocks(self) -> bool:
210    @synchronized
211    def has_remote_blocks(self) -> bool:
212        """Indiciate whether the collection refers to remote data
213
214        Returns `True` if the collection manifest includes any Keep locators
215        with a remote hint (`+R`), else `False`.
216        """
217        if self._has_remote_blocks:
218            return True
219        for item in self:
220            if self[item].has_remote_blocks():
221                return True
222        return False

Indiciate whether the collection refers to remote data

Returns True if the collection manifest includes any Keep locators with a remote hint (+R), else False.

@synchronized
def set_has_remote_blocks(self, val: bool) -> None:
224    @synchronized
225    def set_has_remote_blocks(self, val: bool) -> None:
226        """Cache whether this collection refers to remote blocks
227
228        .. ATTENTION:: Internal
229           This method is only meant to be used by other Collection methods.
230
231        Set this collection's cached "has remote blocks" flag to the given
232        value.
233        """
234        self._has_remote_blocks = val
235        if self.parent:
236            self.parent.set_has_remote_blocks(val)

Cache whether this collection refers to remote blocks

Set this collection’s cached “has remote blocks” flag to the given value.

@must_be_writable
@synchronized
def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
238    @must_be_writable
239    @synchronized
240    def find_or_create(
241            self,
242            path: str,
243            create_type: CreateType,
244    ) -> CollectionItem:
245        """Get the item at the given path, creating it if necessary
246
247        If `path` refers to a stream in this collection, returns a
248        corresponding `Subcollection` object. If `path` refers to a file in
249        this collection, returns a corresponding
250        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
251        this collection, then this method creates a new object and returns
252        it, creating parent streams as needed. The type of object created is
253        determined by the value of `create_type`.
254
255        Arguments:
256
257        * path: str --- The path to find or create within this collection.
258
259        * create_type: Literal[COLLECTION, FILE] --- The type of object to
260          create at `path` if one does not exist. Passing `COLLECTION`
261          creates a stream and returns the corresponding
262          `Subcollection`. Passing `FILE` creates a new file and returns the
263          corresponding `arvados.arvfile.ArvadosFile`.
264        """
265        pathcomponents = path.split("/", 1)
266        if pathcomponents[0]:
267            item = self._items.get(pathcomponents[0])
268            if len(pathcomponents) == 1:
269                if item is None:
270                    # create new file
271                    if create_type == COLLECTION:
272                        item = Subcollection(self, pathcomponents[0])
273                    else:
274                        item = ArvadosFile(self, pathcomponents[0])
275                    self._items[pathcomponents[0]] = item
276                    self.set_committed(False)
277                    self.notify(ADD, self, pathcomponents[0], item)
278                return item
279            else:
280                if item is None:
281                    # create new collection
282                    item = Subcollection(self, pathcomponents[0])
283                    self._items[pathcomponents[0]] = item
284                    self.set_committed(False)
285                    self.notify(ADD, self, pathcomponents[0], item)
286                if isinstance(item, RichCollectionBase):
287                    return item.find_or_create(pathcomponents[1], create_type)
288                else:
289                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
290        else:
291            return self

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

@synchronized
def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
293    @synchronized
294    def find(self, path: str) -> CollectionItem:
295        """Get the item at the given path
296
297        If `path` refers to a stream in this collection, returns a
298        corresponding `Subcollection` object. If `path` refers to a file in
299        this collection, returns a corresponding
300        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
301        this collection, then this method raises `NotADirectoryError`.
302
303        Arguments:
304
305        * path: str --- The path to find or create within this collection.
306        """
307        if not path:
308            raise errors.ArgumentError("Parameter 'path' is empty.")
309
310        pathcomponents = path.split("/", 1)
311        if pathcomponents[0] == '':
312            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
313
314        item = self._items.get(pathcomponents[0])
315        if item is None:
316            return None
317        elif len(pathcomponents) == 1:
318            return item
319        else:
320            if isinstance(item, RichCollectionBase):
321                if pathcomponents[1]:
322                    return item.find(pathcomponents[1])
323                else:
324                    return item
325            else:
326                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
@synchronized
def mkdirs(self, path: str) -> Subcollection:
328    @synchronized
329    def mkdirs(self, path: str) -> 'Subcollection':
330        """Create and return a subcollection at `path`
331
332        If `path` exists within this collection, raises `FileExistsError`.
333        Otherwise, creates a stream at that path and returns the
334        corresponding `Subcollection`.
335        """
336        if self.find(path) != None:
337            raise IOError(errno.EEXIST, "Directory or file exists", path)
338
339        return self.find_or_create(path, COLLECTION)

Create and return a subcollection at path

If path exists within this collection, raises FileExistsError. Otherwise, creates a stream at that path and returns the corresponding Subcollection.

def open( self, path: str, mode: str = 'r', encoding: Optional[str] = None) -> <class 'IO'>:
341    def open(
342            self,
343            path: str,
344            mode: str="r",
345            encoding: Optional[str]=None
346    ) -> IO:
347        """Open a file-like object within the collection
348
349        This method returns a file-like object that can read and/or write the
350        file located at `path` within the collection. If you attempt to write
351        a `path` that does not exist, the file is created with `find_or_create`.
352        If the file cannot be opened for any other reason, this method raises
353        `OSError` with an appropriate errno.
354
355        Arguments:
356
357        * path: str --- The path of the file to open within this collection
358
359        * mode: str --- The mode to open this file. Supports all the same
360          values as `builtins.open`.
361
362        * encoding: str | None --- The text encoding of the file. Only used
363          when the file is opened in text mode. The default is
364          platform-dependent.
365
366        """
367        if not re.search(r'^[rwa][bt]?\+?$', mode):
368            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
369
370        if mode[0] == 'r' and '+' not in mode:
371            fclass = ArvadosFileReader
372            arvfile = self.find(path)
373        elif not self.writable():
374            raise IOError(errno.EROFS, "Collection is read only")
375        else:
376            fclass = ArvadosFileWriter
377            arvfile = self.find_or_create(path, FILE)
378
379        if arvfile is None:
380            raise IOError(errno.ENOENT, "File not found", path)
381        if not isinstance(arvfile, ArvadosFile):
382            raise IOError(errno.EISDIR, "Is a directory", path)
383
384        if mode[0] == 'w':
385            arvfile.truncate(0)
386
387        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
388        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
389        if 'b' not in mode:
390            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
391            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
392        return f

Open a file-like object within the collection

This method returns a file-like object that can read and/or write the file located at path within the collection. If you attempt to write a path that does not exist, the file is created with find_or_create. If the file cannot be opened for any other reason, this method raises OSError with an appropriate errno.

Arguments:

  • path: str — The path of the file to open within this collection

  • mode: str — The mode to open this file. Supports all the same values as builtins.open.

  • encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.

def modified(self) -> bool:
394    def modified(self) -> bool:
395        """Indicate whether this collection has an API server record
396
397        Returns `False` if this collection corresponds to a record loaded from
398        the API server, `True` otherwise.
399        """
400        return not self.committed()

Indicate whether this collection has an API server record

Returns False if this collection corresponds to a record loaded from the API server, True otherwise.

@synchronized
def committed(self):
402    @synchronized
403    def committed(self):
404        """Indicate whether this collection has an API server record
405
406        Returns `True` if this collection corresponds to a record loaded from
407        the API server, `False` otherwise.
408        """
409        return self._committed

Indicate whether this collection has an API server record

Returns True if this collection corresponds to a record loaded from the API server, False otherwise.

@synchronized
def set_committed(self, value: bool = True):
411    @synchronized
412    def set_committed(self, value: bool=True):
413        """Cache whether this collection has an API server record
414
415        .. ATTENTION:: Internal
416           This method is only meant to be used by other Collection methods.
417
418        Set this collection's cached "committed" flag to the given
419        value and propagates it as needed.
420        """
421        if value == self._committed:
422            return
423        if value:
424            for k,v in listitems(self._items):
425                v.set_committed(True)
426            self._committed = True
427        else:
428            self._committed = False
429            if self.parent is not None:
430                self.parent.set_committed(False)

Cache whether this collection has an API server record

Set this collection’s cached “committed” flag to the given value and propagates it as needed.

@synchronized
def keys(self) -> Iterator[str]:
480    @synchronized
481    def keys(self) -> Iterator[str]:
482        """Iterate names of streams and files in this collection
483
484        This method does not recurse. It only iterates the contents of this
485        collection's corresponding stream.
486        """
487        return self._items.keys()

Iterate names of streams and files in this collection

This method does not recurse. It only iterates the contents of this collection’s corresponding stream.

@synchronized
def values( self) -> List[Union[arvados.arvfile.ArvadosFile, Collection]]:
489    @synchronized
490    def values(self) -> List[CollectionItem]:
491        """Get a list of objects in this collection's stream
492
493        The return value includes a `Subcollection` for every stream, and an
494        `arvados.arvfile.ArvadosFile` for every file, directly within this
495        collection's stream.  This method does not recurse.
496        """
497        return listvalues(self._items)

Get a list of objects in this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

@synchronized
def items( self) -> List[Tuple[str, Union[arvados.arvfile.ArvadosFile, Collection]]]:
499    @synchronized
500    def items(self) -> List[Tuple[str, CollectionItem]]:
501        """Get a list of `(name, object)` tuples from this collection's stream
502
503        The return value includes a `Subcollection` for every stream, and an
504        `arvados.arvfile.ArvadosFile` for every file, directly within this
505        collection's stream.  This method does not recurse.
506        """
507        return listitems(self._items)

Get a list of (name, object) tuples from this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

def exists(self, path: str) -> bool:
509    def exists(self, path: str) -> bool:
510        """Indicate whether this collection includes an item at `path`
511
512        This method returns `True` if `path` refers to a stream or file within
513        this collection, else `False`.
514
515        Arguments:
516
517        * path: str --- The path to check for existence within this collection
518        """
519        return self.find(path) is not None

Indicate whether this collection includes an item at path

This method returns True if path refers to a stream or file within this collection, else False.

Arguments:

  • path: str — The path to check for existence within this collection
@must_be_writable
@synchronized
def remove(self, path: str, recursive: bool = False) -> None:
521    @must_be_writable
522    @synchronized
523    def remove(self, path: str, recursive: bool=False) -> None:
524        """Remove the file or stream at `path`
525
526        Arguments:
527
528        * path: str --- The path of the item to remove from the collection
529
530        * recursive: bool --- Controls the method's behavior if `path` refers
531          to a nonempty stream. If `False` (the default), this method raises
532          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
533          items under the stream.
534        """
535        if not path:
536            raise errors.ArgumentError("Parameter 'path' is empty.")
537
538        pathcomponents = path.split("/", 1)
539        item = self._items.get(pathcomponents[0])
540        if item is None:
541            raise IOError(errno.ENOENT, "File not found", path)
542        if len(pathcomponents) == 1:
543            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
544                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
545            deleteditem = self._items[pathcomponents[0]]
546            del self._items[pathcomponents[0]]
547            self.set_committed(False)
548            self.notify(DEL, self, pathcomponents[0], deleteditem)
549        else:
550            item.remove(pathcomponents[1], recursive=recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

def clone(self):
556    def clone(self):
557        raise NotImplementedError()
@must_be_writable
@synchronized
def add( self, source_obj: Union[arvados.arvfile.ArvadosFile, Collection], target_name: str, overwrite: bool = False, reparent: bool = False) -> None:
559    @must_be_writable
560    @synchronized
561    def add(
562            self,
563            source_obj: CollectionItem,
564            target_name: str,
565            overwrite: bool=False,
566            reparent: bool=False,
567    ) -> None:
568        """Copy or move a file or subcollection object to this collection
569
570        Arguments:
571
572        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
573          to add to this collection
574
575        * target_name: str --- The path inside this collection where
576          `source_obj` should be added.
577
578        * overwrite: bool --- Controls the behavior of this method when the
579          collection already contains an object at `target_name`. If `False`
580          (the default), this method will raise `FileExistsError`. If `True`,
581          the object at `target_name` will be replaced with `source_obj`.
582
583        * reparent: bool --- Controls whether this method copies or moves
584          `source_obj`. If `False` (the default), `source_obj` is copied into
585          this collection. If `True`, `source_obj` is moved into this
586          collection.
587        """
588        if target_name in self and not overwrite:
589            raise IOError(errno.EEXIST, "File already exists", target_name)
590
591        modified_from = None
592        if target_name in self:
593            modified_from = self[target_name]
594
595        # Actually make the move or copy.
596        if reparent:
597            source_obj._reparent(self, target_name)
598            item = source_obj
599        else:
600            item = source_obj.clone(self, target_name)
601
602        self._items[target_name] = item
603        self.set_committed(False)
604        if not self._has_remote_blocks and source_obj.has_remote_blocks():
605            self.set_has_remote_blocks(True)
606
607        if modified_from:
608            self.notify(MOD, self, target_name, (modified_from, item))
609        else:
610            self.notify(ADD, self, target_name, item)

Copy or move a file or subcollection object to this collection

Arguments:

  • source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection

  • target_name: str — The path inside this collection where source_obj should be added.

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_name. If False (the default), this method will raise FileExistsError. If True, the object at target_name will be replaced with source_obj.

  • reparent: bool — Controls whether this method copies or moves source_obj. If False (the default), source_obj is copied into this collection. If True, source_obj is moved into this collection.

@must_be_writable
@synchronized
def copy( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
652    @must_be_writable
653    @synchronized
654    def copy(
655            self,
656            source: Union[str, CollectionItem],
657            target_path: str,
658            source_collection: Optional['RichCollectionBase']=None,
659            overwrite: bool=False,
660    ) -> None:
661        """Copy a file or subcollection object to this collection
662
663        Arguments:
664
665        * source: str | arvados.arvfile.ArvadosFile |
666          arvados.collection.Subcollection --- The file or subcollection to
667          add to this collection. If `source` is a str, the object will be
668          found by looking up this path from `source_collection` (see
669          below).
670
671        * target_path: str --- The path inside this collection where the
672          source object should be added.
673
674        * source_collection: arvados.collection.Collection | None --- The
675          collection to find the source object from when `source` is a
676          path. Defaults to the current collection (`self`).
677
678        * overwrite: bool --- Controls the behavior of this method when the
679          collection already contains an object at `target_path`. If `False`
680          (the default), this method will raise `FileExistsError`. If `True`,
681          the object at `target_path` will be replaced with `source_obj`.
682        """
683        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
684        target_dir.add(source_obj, target_name, overwrite, False)

Copy a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

@must_be_writable
@synchronized
def rename( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
686    @must_be_writable
687    @synchronized
688    def rename(
689            self,
690            source: Union[str, CollectionItem],
691            target_path: str,
692            source_collection: Optional['RichCollectionBase']=None,
693            overwrite: bool=False,
694    ) -> None:
695        """Move a file or subcollection object to this collection
696
697        Arguments:
698
699        * source: str | arvados.arvfile.ArvadosFile |
700          arvados.collection.Subcollection --- The file or subcollection to
701          add to this collection. If `source` is a str, the object will be
702          found by looking up this path from `source_collection` (see
703          below).
704
705        * target_path: str --- The path inside this collection where the
706          source object should be added.
707
708        * source_collection: arvados.collection.Collection | None --- The
709          collection to find the source object from when `source` is a
710          path. Defaults to the current collection (`self`).
711
712        * overwrite: bool --- Controls the behavior of this method when the
713          collection already contains an object at `target_path`. If `False`
714          (the default), this method will raise `FileExistsError`. If `True`,
715          the object at `target_path` will be replaced with `source_obj`.
716        """
717        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
718        if not source_obj.writable():
719            raise IOError(errno.EROFS, "Source collection is read only", source)
720        target_dir.add(source_obj, target_name, overwrite, True)

Move a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

def portable_manifest_text(self, stream_name: str = '.') -> str:
722    def portable_manifest_text(self, stream_name: str=".") -> str:
723        """Get the portable manifest text for this collection
724
725        The portable manifest text is normalized, and does not include access
726        tokens. This method does not flush outstanding blocks to Keep.
727
728        Arguments:
729
730        * stream_name: str --- The name to use for this collection's stream in
731          the generated manifest. Default `'.'`.
732        """
733        return self._get_manifest_text(stream_name, True, True)

Get the portable manifest text for this collection

The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.
@synchronized
def manifest_text( self, stream_name: str = '.', strip: bool = False, normalize: bool = False, only_committed: bool = False) -> str:
735    @synchronized
736    def manifest_text(
737            self,
738            stream_name: str=".",
739            strip: bool=False,
740            normalize: bool=False,
741            only_committed: bool=False,
742    ) -> str:
743        """Get the manifest text for this collection
744
745        Arguments:
746
747        * stream_name: str --- The name to use for this collection's stream in
748          the generated manifest. Default `'.'`.
749
750        * strip: bool --- Controls whether or not the returned manifest text
751          includes access tokens. If `False` (the default), the manifest text
752          will include access tokens. If `True`, the manifest text will not
753          include access tokens.
754
755        * normalize: bool --- Controls whether or not the returned manifest
756          text is normalized. Default `False`.
757
758        * only_committed: bool --- Controls whether or not this method uploads
759          pending data to Keep before building and returning the manifest text.
760          If `False` (the default), this method will finish uploading all data
761          to Keep, then return the final manifest. If `True`, this method will
762          build and return a manifest that only refers to the data that has
763          finished uploading at the time this method was called.
764        """
765        if not only_committed:
766            self._my_block_manager().commit_all()
767        return self._get_manifest_text(stream_name, strip, normalize,
768                                       only_committed=only_committed)

Get the manifest text for this collection

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.

  • strip: bool — Controls whether or not the returned manifest text includes access tokens. If False (the default), the manifest text will include access tokens. If True, the manifest text will not include access tokens.

  • normalize: bool — Controls whether or not the returned manifest text is normalized. Default False.

  • only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If False (the default), this method will finish uploading all data to Keep, then return the final manifest. If True, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.

@synchronized
def diff( self, end_collection: RichCollectionBase, prefix: str = '.', holding_collection: Optional[Collection] = None) -> List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]:
842    @synchronized
843    def diff(
844            self,
845            end_collection: 'RichCollectionBase',
846            prefix: str=".",
847            holding_collection: Optional['Collection']=None,
848    ) -> ChangeList:
849        """Build a list of differences between this collection and another
850
851        Arguments:
852
853        * end_collection: arvados.collection.RichCollectionBase --- A
854          collection object with the desired end state. The returned diff
855          list will describe how to go from the current collection object
856          `self` to `end_collection`.
857
858        * prefix: str --- The name to use for this collection's stream in
859          the diff list. Default `'.'`.
860
861        * holding_collection: arvados.collection.Collection | None --- A
862          collection object used to hold objects for the returned diff
863          list. By default, a new empty collection is created.
864        """
865        changes = []
866        if holding_collection is None:
867            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
868        for k in self:
869            if k not in end_collection:
870               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
871        for k in end_collection:
872            if k in self:
873                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
874                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
875                elif end_collection[k] != self[k]:
876                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
877                else:
878                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
879            else:
880                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
881        return changes

Build a list of differences between this collection and another

Arguments:

  • end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object self to end_collection.

  • prefix: str — The name to use for this collection’s stream in the diff list. Default '.'.

  • holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.

@must_be_writable
@synchronized
def apply( self, changes: List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]) -> None:
883    @must_be_writable
884    @synchronized
885    def apply(self, changes: ChangeList) -> None:
886        """Apply a list of changes from to this collection
887
888        This method takes a list of changes generated by
889        `RichCollectionBase.diff` and applies it to this
890        collection. Afterward, the state of this collection object will
891        match the state of `end_collection` passed to `diff`. If a change
892        conflicts with a local change, it will be saved to an alternate path
893        indicating the conflict.
894
895        Arguments:
896
897        * changes: arvados.collection.ChangeList --- The list of differences
898          generated by `RichCollectionBase.diff`.
899        """
900        if changes:
901            self.set_committed(False)
902        for change in changes:
903            event_type = change[0]
904            path = change[1]
905            initial = change[2]
906            local = self.find(path)
907            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
908                                                                    time.gmtime()))
909            if event_type == ADD:
910                if local is None:
911                    # No local file at path, safe to copy over new file
912                    self.copy(initial, path)
913                elif local is not None and local != initial:
914                    # There is already local file and it is different:
915                    # save change to conflict file.
916                    self.copy(initial, conflictpath)
917            elif event_type == MOD or event_type == TOK:
918                final = change[3]
919                if local == initial:
920                    # Local matches the "initial" item so it has not
921                    # changed locally and is safe to update.
922                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
923                        # Replace contents of local file with new contents
924                        local.replace_contents(final)
925                    else:
926                        # Overwrite path with new item; this can happen if
927                        # path was a file and is now a collection or vice versa
928                        self.copy(final, path, overwrite=True)
929                else:
930                    # Local is missing (presumably deleted) or local doesn't
931                    # match the "start" value, so save change to conflict file
932                    self.copy(final, conflictpath)
933            elif event_type == DEL:
934                if local == initial:
935                    # Local item matches "initial" value, so it is safe to remove.
936                    self.remove(path, recursive=True)
937                # else, the file is modified or already removed, in either
938                # case we don't want to try to remove it.

Apply a list of changes from to this collection

This method takes a list of changes generated by RichCollectionBase.diff and applies it to this collection. Afterward, the state of this collection object will match the state of end_collection passed to diff. If a change conflicts with a local change, it will be saved to an alternate path indicating the conflict.

Arguments:

def portable_data_hash(self) -> str:
940    def portable_data_hash(self) -> str:
941        """Get the portable data hash for this collection's manifest"""
942        if self._manifest_locator and self.committed():
943            # If the collection is already saved on the API server, and it's committed
944            # then return API server's PDH response.
945            return self._portable_data_hash
946        else:
947            stripped = self.portable_manifest_text().encode()
948            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))

Get the portable data hash for this collection’s manifest

@synchronized
def subscribe( self, callback: Callable[[Literal['add', 'del', 'mod', 'tok'], Collection, str, Union[arvados.arvfile.ArvadosFile, Collection]], object]) -> None:
950    @synchronized
951    def subscribe(self, callback: ChangeCallback) -> None:
952        """Set a notify callback for changes to this collection
953
954        Arguments:
955
956        * callback: arvados.collection.ChangeCallback --- The callable to
957          call each time the collection is changed.
958        """
959        if self._callback is None:
960            self._callback = callback
961        else:
962            raise errors.ArgumentError("A callback is already set on this collection.")

Set a notify callback for changes to this collection

Arguments:

@synchronized
def unsubscribe(self) -> None:
964    @synchronized
965    def unsubscribe(self) -> None:
966        """Remove any notify callback set for changes to this collection"""
967        if self._callback is not None:
968            self._callback = None

Remove any notify callback set for changes to this collection

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
 970    @synchronized
 971    def notify(
 972            self,
 973            event: ChangeType,
 974            collection: 'RichCollectionBase',
 975            name: str,
 976            item: CollectionItem,
 977    ) -> None:
 978        """Notify any subscribed callback about a change to this collection
 979
 980        .. ATTENTION:: Internal
 981           This method is only meant to be used by other Collection methods.
 982
 983        If a callback has been registered with `RichCollectionBase.subscribe`,
 984        it will be called with information about a change to this collection.
 985        Then this notification will be propagated to this collection's root.
 986
 987        Arguments:
 988
 989        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 990          the collection.
 991
 992        * collection: arvados.collection.RichCollectionBase --- The
 993          collection that was modified.
 994
 995        * name: str --- The name of the file or stream within `collection` that
 996          was modified.
 997
 998        * item: arvados.arvfile.ArvadosFile |
 999          arvados.collection.Subcollection --- The new contents at `name`
1000          within `collection`.
1001        """
1002        if self._callback:
1003            self._callback(event, collection, name, item)
1004        self.root_collection().notify(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

@synchronized
def flush(self) -> None:
1026    @synchronized
1027    def flush(self) -> None:
1028        """Upload any pending data to Keep"""
1029        for e in listvalues(self):
1030            e.flush()

Upload any pending data to Keep

Inherited Members
CollectionBase
stripped_manifest
class Collection(RichCollectionBase):
1033class Collection(RichCollectionBase):
1034    """Read and manipulate an Arvados collection
1035
1036    This class provides a high-level interface to create, read, and update
1037    Arvados collections and their contents. Refer to the Arvados Python SDK
1038    cookbook for [an introduction to using the Collection class][cookbook].
1039
1040    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1041    """
1042
1043    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1044                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1045                 keep_client: Optional['arvados.keep.KeepClient']=None,
1046                 num_retries: int=10,
1047                 parent: Optional['Collection']=None,
1048                 apiconfig: Optional[Mapping[str, str]]=None,
1049                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1050                 replication_desired: Optional[int]=None,
1051                 storage_classes_desired: Optional[List[str]]=None,
1052                 put_threads: Optional[int]=None):
1053        """Initialize a Collection object
1054
1055        Arguments:
1056
1057        * manifest_locator_or_text: str | None --- This string can contain a
1058          collection manifest text, portable data hash, or UUID. When given a
1059          portable data hash or UUID, this instance will load a collection
1060          record from the API server. Otherwise, this instance will represent a
1061          new collection without an API server record. The default value `None`
1062          instantiates a new collection with an empty manifest.
1063
1064        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1065          Arvados API client object this instance uses to make requests. If
1066          none is given, this instance creates its own client using the
1067          settings from `apiconfig` (see below). If your client instantiates
1068          many Collection objects, you can help limit memory utilization by
1069          calling `arvados.api.api` to construct an
1070          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1071          for every Collection.
1072
1073        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1074          object this instance uses to make requests. If none is given, this
1075          instance creates its own client using its `api_client`.
1076
1077        * num_retries: int --- The number of times that client requests are
1078          retried. Default 10.
1079
1080        * parent: arvados.collection.Collection | None --- The parent Collection
1081          object of this instance, if any. This argument is primarily used by
1082          other Collection methods; user client code shouldn't need to use it.
1083
1084        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1085          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1086          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1087          Collection object constructs one from these settings. If no
1088          mapping is provided, calls `arvados.config.settings` to get these
1089          parameters from user configuration.
1090
1091        * block_manager: arvados.arvfile._BlockManager | None --- The
1092          _BlockManager object used by this instance to coordinate reading
1093          and writing Keep data blocks. If none is given, this instance
1094          constructs its own. This argument is primarily used by other
1095          Collection methods; user client code shouldn't need to use it.
1096
1097        * replication_desired: int | None --- This controls both the value of
1098          the `replication_desired` field on API collection records saved by
1099          this class, as well as the number of Keep services that the object
1100          writes new data blocks to. If none is given, uses the default value
1101          configured for the cluster.
1102
1103        * storage_classes_desired: list[str] | None --- This controls both
1104          the value of the `storage_classes_desired` field on API collection
1105          records saved by this class, as well as selecting which specific
1106          Keep services the object writes new data blocks to. If none is
1107          given, defaults to an empty list.
1108
1109        * put_threads: int | None --- The number of threads to run
1110          simultaneously to upload data blocks to Keep. This value is used when
1111          building a new `block_manager`. It is unused when a `block_manager`
1112          is provided.
1113        """
1114
1115        if storage_classes_desired and type(storage_classes_desired) is not list:
1116            raise errors.ArgumentError("storage_classes_desired must be list type.")
1117
1118        super(Collection, self).__init__(parent)
1119        self._api_client = api_client
1120        self._keep_client = keep_client
1121
1122        # Use the keep client from ThreadSafeApiCache
1123        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1124            self._keep_client = self._api_client.keep
1125
1126        self._block_manager = block_manager
1127        self.replication_desired = replication_desired
1128        self._storage_classes_desired = storage_classes_desired
1129        self.put_threads = put_threads
1130
1131        if apiconfig:
1132            self._config = apiconfig
1133        else:
1134            self._config = config.settings()
1135
1136        self.num_retries = num_retries
1137        self._manifest_locator = None
1138        self._manifest_text = None
1139        self._portable_data_hash = None
1140        self._api_response = None
1141        self._past_versions = set()
1142
1143        self.lock = threading.RLock()
1144        self.events = None
1145
1146        if manifest_locator_or_text:
1147            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1148                self._manifest_locator = manifest_locator_or_text
1149            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1150                self._manifest_locator = manifest_locator_or_text
1151                if not self._has_local_collection_uuid():
1152                    self._has_remote_blocks = True
1153            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1154                self._manifest_text = manifest_locator_or_text
1155                if '+R' in self._manifest_text:
1156                    self._has_remote_blocks = True
1157            else:
1158                raise errors.ArgumentError(
1159                    "Argument to CollectionReader is not a manifest or a collection UUID")
1160
1161            try:
1162                self._populate()
1163            except errors.SyntaxError as e:
1164                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1165
1166    def storage_classes_desired(self) -> List[str]:
1167        """Get this collection's `storage_classes_desired` value"""
1168        return self._storage_classes_desired or []
1169
1170    def root_collection(self) -> 'Collection':
1171        return self
1172
1173    def get_properties(self) -> Properties:
1174        """Get this collection's properties
1175
1176        This method always returns a dict. If this collection object does not
1177        have an associated API record, or that record does not have any
1178        properties set, this method returns an empty dict.
1179        """
1180        if self._api_response and self._api_response["properties"]:
1181            return self._api_response["properties"]
1182        else:
1183            return {}
1184
1185    def get_trash_at(self) -> Optional[datetime.datetime]:
1186        """Get this collection's `trash_at` field
1187
1188        This method parses the `trash_at` field of the collection's API
1189        record and returns a datetime from it. If that field is not set, or
1190        this collection object does not have an associated API record,
1191        returns None.
1192        """
1193        if self._api_response and self._api_response["trash_at"]:
1194            try:
1195                return ciso8601.parse_datetime(self._api_response["trash_at"])
1196            except ValueError:
1197                return None
1198        else:
1199            return None
1200
1201    def stream_name(self) -> str:
1202        return "."
1203
1204    def writable(self) -> bool:
1205        return True
1206
1207    @synchronized
1208    def known_past_version(
1209            self,
1210            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1211    ) -> bool:
1212        """Indicate whether an API record for this collection has been seen before
1213
1214        As this collection object loads records from the API server, it records
1215        their `modified_at` and `portable_data_hash` fields. This method accepts
1216        a 2-tuple with values for those fields, and returns `True` if the
1217        combination was previously loaded.
1218        """
1219        return modified_at_and_portable_data_hash in self._past_versions
1220
1221    @synchronized
1222    @retry_method
1223    def update(
1224            self,
1225            other: Optional['Collection']=None,
1226            num_retries: Optional[int]=None,
1227    ) -> None:
1228        """Merge another collection's contents into this one
1229
1230        This method compares the manifest of this collection instance with
1231        another, then updates this instance's manifest with changes from the
1232        other, renaming files to flag conflicts where necessary.
1233
1234        When called without any arguments, this method reloads the collection's
1235        API record, and updates this instance with any changes that have
1236        appeared server-side. If this instance does not have a corresponding
1237        API record, this method raises `arvados.errors.ArgumentError`.
1238
1239        Arguments:
1240
1241        * other: arvados.collection.Collection | None --- The collection
1242          whose contents should be merged into this instance. When not
1243          provided, this method reloads this collection's API record and
1244          constructs a Collection object from it.  If this instance does not
1245          have a corresponding API record, this method raises
1246          `arvados.errors.ArgumentError`.
1247
1248        * num_retries: int | None --- The number of times to retry reloading
1249          the collection's API record from the API server. If not specified,
1250          uses the `num_retries` provided when this instance was constructed.
1251        """
1252        if other is None:
1253            if self._manifest_locator is None:
1254                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1255            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1256            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1257                response.get("portable_data_hash") != self.portable_data_hash()):
1258                # The record on the server is different from our current one, but we've seen it before,
1259                # so ignore it because it's already been merged.
1260                # However, if it's the same as our current record, proceed with the update, because we want to update
1261                # our tokens.
1262                return
1263            else:
1264                self._remember_api_response(response)
1265            other = CollectionReader(response["manifest_text"])
1266        baseline = CollectionReader(self._manifest_text)
1267        self.apply(baseline.diff(other))
1268        self._manifest_text = self.manifest_text()
1269
1270    @synchronized
1271    def _my_api(self):
1272        if self._api_client is None:
1273            self._api_client = ThreadSafeApiCache(self._config, version='v1')
1274            if self._keep_client is None:
1275                self._keep_client = self._api_client.keep
1276        return self._api_client
1277
1278    @synchronized
1279    def _my_keep(self):
1280        if self._keep_client is None:
1281            if self._api_client is None:
1282                self._my_api()
1283            else:
1284                self._keep_client = KeepClient(api_client=self._api_client)
1285        return self._keep_client
1286
1287    @synchronized
1288    def _my_block_manager(self):
1289        if self._block_manager is None:
1290            copies = (self.replication_desired or
1291                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1292                                                   2))
1293            self._block_manager = _BlockManager(self._my_keep(),
1294                                                copies=copies,
1295                                                put_threads=self.put_threads,
1296                                                num_retries=self.num_retries,
1297                                                storage_classes_func=self.storage_classes_desired)
1298        return self._block_manager
1299
1300    def _remember_api_response(self, response):
1301        self._api_response = response
1302        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1303
1304    def _populate_from_api_server(self):
1305        # As in KeepClient itself, we must wait until the last
1306        # possible moment to instantiate an API client, in order to
1307        # avoid tripping up clients that don't have access to an API
1308        # server.  If we do build one, make sure our Keep client uses
1309        # it.  If instantiation fails, we'll fall back to the except
1310        # clause, just like any other Collection lookup
1311        # failure. Return an exception, or None if successful.
1312        self._remember_api_response(self._my_api().collections().get(
1313            uuid=self._manifest_locator).execute(
1314                num_retries=self.num_retries))
1315        self._manifest_text = self._api_response['manifest_text']
1316        self._portable_data_hash = self._api_response['portable_data_hash']
1317        # If not overriden via kwargs, we should try to load the
1318        # replication_desired and storage_classes_desired from the API server
1319        if self.replication_desired is None:
1320            self.replication_desired = self._api_response.get('replication_desired', None)
1321        if self._storage_classes_desired is None:
1322            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1323
1324    def _populate(self):
1325        if self._manifest_text is None:
1326            if self._manifest_locator is None:
1327                return
1328            else:
1329                self._populate_from_api_server()
1330        self._baseline_manifest = self._manifest_text
1331        self._import_manifest(self._manifest_text)
1332
1333    def _has_collection_uuid(self):
1334        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1335
1336    def _has_local_collection_uuid(self):
1337        return self._has_collection_uuid and \
1338            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1339
1340    def __enter__(self):
1341        return self
1342
1343    def __exit__(self, exc_type, exc_value, traceback):
1344        """Exit a context with this collection instance
1345
1346        If no exception was raised inside the context block, and this
1347        collection is writable and has a corresponding API record, that
1348        record will be updated to match the state of this instance at the end
1349        of the block.
1350        """
1351        if exc_type is None:
1352            if self.writable() and self._has_collection_uuid():
1353                self.save()
1354        self.stop_threads()
1355
1356    def stop_threads(self) -> None:
1357        """Stop background Keep upload/download threads"""
1358        if self._block_manager is not None:
1359            self._block_manager.stop_threads()
1360
1361    @synchronized
1362    def manifest_locator(self) -> Optional[str]:
1363        """Get this collection's manifest locator, if any
1364
1365        * If this collection instance is associated with an API record with a
1366          UUID, return that.
1367        * Otherwise, if this collection instance was loaded from an API record
1368          by portable data hash, return that.
1369        * Otherwise, return `None`.
1370        """
1371        return self._manifest_locator
1372
1373    @synchronized
1374    def clone(
1375            self,
1376            new_parent: Optional['Collection']=None,
1377            new_name: Optional[str]=None,
1378            readonly: bool=False,
1379            new_config: Optional[Mapping[str, str]]=None,
1380    ) -> 'Collection':
1381        """Create a Collection object with the same contents as this instance
1382
1383        This method creates a new Collection object with contents that match
1384        this instance's. The new collection will not be associated with any API
1385        record.
1386
1387        Arguments:
1388
1389        * new_parent: arvados.collection.Collection | None --- This value is
1390          passed to the new Collection's constructor as the `parent`
1391          argument.
1392
1393        * new_name: str | None --- This value is unused.
1394
1395        * readonly: bool --- If this value is true, this method constructs and
1396          returns a `CollectionReader`. Otherwise, it returns a mutable
1397          `Collection`. Default `False`.
1398
1399        * new_config: Mapping[str, str] | None --- This value is passed to the
1400          new Collection's constructor as `apiconfig`. If no value is provided,
1401          defaults to the configuration passed to this instance's constructor.
1402        """
1403        if new_config is None:
1404            new_config = self._config
1405        if readonly:
1406            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1407        else:
1408            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1409
1410        newcollection._clonefrom(self)
1411        return newcollection
1412
1413    @synchronized
1414    def api_response(self) -> Optional[Dict[str, Any]]:
1415        """Get this instance's associated API record
1416
1417        If this Collection instance has an associated API record, return it.
1418        Otherwise, return `None`.
1419        """
1420        return self._api_response
1421
1422    def find_or_create(
1423            self,
1424            path: str,
1425            create_type: CreateType,
1426    ) -> CollectionItem:
1427        if path == ".":
1428            return self
1429        else:
1430            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1431
1432    def find(self, path: str) -> CollectionItem:
1433        if path == ".":
1434            return self
1435        else:
1436            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1437
1438    def remove(self, path: str, recursive: bool=False) -> None:
1439        if path == ".":
1440            raise errors.ArgumentError("Cannot remove '.'")
1441        else:
1442            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1443
1444    @must_be_writable
1445    @synchronized
1446    @retry_method
1447    def save(
1448            self,
1449            properties: Optional[Properties]=None,
1450            storage_classes: Optional[StorageClasses]=None,
1451            trash_at: Optional[datetime.datetime]=None,
1452            merge: bool=True,
1453            num_retries: Optional[int]=None,
1454            preserve_version: bool=False,
1455    ) -> str:
1456        """Save collection to an existing API record
1457
1458        This method updates the instance's corresponding API record to match
1459        the instance's state. If this instance does not have a corresponding API
1460        record yet, raises `AssertionError`. (To create a new API record, use
1461        `Collection.save_new`.) This method returns the saved collection
1462        manifest.
1463
1464        Arguments:
1465
1466        * properties: dict[str, Any] | None --- If provided, the API record will
1467          be updated with these properties. Note this will completely replace
1468          any existing properties.
1469
1470        * storage_classes: list[str] | None --- If provided, the API record will
1471          be updated with this value in the `storage_classes_desired` field.
1472          This value will also be saved on the instance and used for any
1473          changes that follow.
1474
1475        * trash_at: datetime.datetime | None --- If provided, the API record
1476          will be updated with this value in the `trash_at` field.
1477
1478        * merge: bool --- If `True` (the default), this method will first
1479          reload this collection's API record, and merge any new contents into
1480          this instance before saving changes. See `Collection.update` for
1481          details.
1482
1483        * num_retries: int | None --- The number of times to retry reloading
1484          the collection's API record from the API server. If not specified,
1485          uses the `num_retries` provided when this instance was constructed.
1486
1487        * preserve_version: bool --- This value will be passed to directly
1488          to the underlying API call. If `True`, the Arvados API will
1489          preserve the versions of this collection both immediately before
1490          and after the update. If `True` when the API server is not
1491          configured with collection versioning, this method raises
1492          `arvados.errors.ArgumentError`.
1493        """
1494        if properties and type(properties) is not dict:
1495            raise errors.ArgumentError("properties must be dictionary type.")
1496
1497        if storage_classes and type(storage_classes) is not list:
1498            raise errors.ArgumentError("storage_classes must be list type.")
1499        if storage_classes:
1500            self._storage_classes_desired = storage_classes
1501
1502        if trash_at and type(trash_at) is not datetime.datetime:
1503            raise errors.ArgumentError("trash_at must be datetime type.")
1504
1505        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1506            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1507
1508        body={}
1509        if properties:
1510            body["properties"] = properties
1511        if self.storage_classes_desired():
1512            body["storage_classes_desired"] = self.storage_classes_desired()
1513        if trash_at:
1514            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1515            body["trash_at"] = t
1516        if preserve_version:
1517            body["preserve_version"] = preserve_version
1518
1519        if not self.committed():
1520            if self._has_remote_blocks:
1521                # Copy any remote blocks to the local cluster.
1522                self._copy_remote_blocks(remote_blocks={})
1523                self._has_remote_blocks = False
1524            if not self._has_collection_uuid():
1525                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1526            elif not self._has_local_collection_uuid():
1527                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1528
1529            self._my_block_manager().commit_all()
1530
1531            if merge:
1532                self.update()
1533
1534            text = self.manifest_text(strip=False)
1535            body['manifest_text'] = text
1536
1537            self._remember_api_response(self._my_api().collections().update(
1538                uuid=self._manifest_locator,
1539                body=body
1540                ).execute(num_retries=num_retries))
1541            self._manifest_text = self._api_response["manifest_text"]
1542            self._portable_data_hash = self._api_response["portable_data_hash"]
1543            self.set_committed(True)
1544        elif body:
1545            self._remember_api_response(self._my_api().collections().update(
1546                uuid=self._manifest_locator,
1547                body=body
1548                ).execute(num_retries=num_retries))
1549
1550        return self._manifest_text
1551
1552
1553    @must_be_writable
1554    @synchronized
1555    @retry_method
1556    def save_new(
1557            self,
1558            name: Optional[str]=None,
1559            create_collection_record: bool=True,
1560            owner_uuid: Optional[str]=None,
1561            properties: Optional[Properties]=None,
1562            storage_classes: Optional[StorageClasses]=None,
1563            trash_at: Optional[datetime.datetime]=None,
1564            ensure_unique_name: bool=False,
1565            num_retries: Optional[int]=None,
1566            preserve_version: bool=False,
1567    ):
1568        """Save collection to a new API record
1569
1570        This method finishes uploading new data blocks and (optionally)
1571        creates a new API collection record with the provided data. If a new
1572        record is created, this instance becomes associated with that record
1573        for future updates like `save()`. This method returns the saved
1574        collection manifest.
1575
1576        Arguments:
1577
1578        * name: str | None --- The `name` field to use on the new collection
1579          record. If not specified, a generic default name is generated.
1580
1581        * create_collection_record: bool --- If `True` (the default), creates a
1582          collection record on the API server. If `False`, the method finishes
1583          all data uploads and only returns the resulting collection manifest
1584          without sending it to the API server.
1585
1586        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1587          new collection record.
1588
1589        * properties: dict[str, Any] | None --- The `properties` field to use on
1590          the new collection record.
1591
1592        * storage_classes: list[str] | None --- The
1593          `storage_classes_desired` field to use on the new collection record.
1594
1595        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1596          on the new collection record.
1597
1598        * ensure_unique_name: bool --- This value is passed directly to the
1599          Arvados API when creating the collection record. If `True`, the API
1600          server may modify the submitted `name` to ensure the collection's
1601          `name`+`owner_uuid` combination is unique. If `False` (the default),
1602          if a collection already exists with this same `name`+`owner_uuid`
1603          combination, creating a collection record will raise a validation
1604          error.
1605
1606        * num_retries: int | None --- The number of times to retry reloading
1607          the collection's API record from the API server. If not specified,
1608          uses the `num_retries` provided when this instance was constructed.
1609
1610        * preserve_version: bool --- This value will be passed to directly
1611          to the underlying API call. If `True`, the Arvados API will
1612          preserve the versions of this collection both immediately before
1613          and after the update. If `True` when the API server is not
1614          configured with collection versioning, this method raises
1615          `arvados.errors.ArgumentError`.
1616        """
1617        if properties and type(properties) is not dict:
1618            raise errors.ArgumentError("properties must be dictionary type.")
1619
1620        if storage_classes and type(storage_classes) is not list:
1621            raise errors.ArgumentError("storage_classes must be list type.")
1622
1623        if trash_at and type(trash_at) is not datetime.datetime:
1624            raise errors.ArgumentError("trash_at must be datetime type.")
1625
1626        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1627            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1628
1629        if self._has_remote_blocks:
1630            # Copy any remote blocks to the local cluster.
1631            self._copy_remote_blocks(remote_blocks={})
1632            self._has_remote_blocks = False
1633
1634        if storage_classes:
1635            self._storage_classes_desired = storage_classes
1636
1637        self._my_block_manager().commit_all()
1638        text = self.manifest_text(strip=False)
1639
1640        if create_collection_record:
1641            if name is None:
1642                name = "New collection"
1643                ensure_unique_name = True
1644
1645            body = {"manifest_text": text,
1646                    "name": name,
1647                    "replication_desired": self.replication_desired}
1648            if owner_uuid:
1649                body["owner_uuid"] = owner_uuid
1650            if properties:
1651                body["properties"] = properties
1652            if self.storage_classes_desired():
1653                body["storage_classes_desired"] = self.storage_classes_desired()
1654            if trash_at:
1655                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1656                body["trash_at"] = t
1657            if preserve_version:
1658                body["preserve_version"] = preserve_version
1659
1660            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1661            text = self._api_response["manifest_text"]
1662
1663            self._manifest_locator = self._api_response["uuid"]
1664            self._portable_data_hash = self._api_response["portable_data_hash"]
1665
1666            self._manifest_text = text
1667            self.set_committed(True)
1668
1669        return text
1670
1671    _token_re = re.compile(r'(\S+)(\s+|$)')
1672    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1673    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1674
1675    def _unescape_manifest_path(self, path):
1676        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1677
1678    @synchronized
1679    def _import_manifest(self, manifest_text):
1680        """Import a manifest into a `Collection`.
1681
1682        :manifest_text:
1683          The manifest text to import from.
1684
1685        """
1686        if len(self) > 0:
1687            raise ArgumentError("Can only import manifest into an empty collection")
1688
1689        STREAM_NAME = 0
1690        BLOCKS = 1
1691        SEGMENTS = 2
1692
1693        stream_name = None
1694        state = STREAM_NAME
1695
1696        for token_and_separator in self._token_re.finditer(manifest_text):
1697            tok = token_and_separator.group(1)
1698            sep = token_and_separator.group(2)
1699
1700            if state == STREAM_NAME:
1701                # starting a new stream
1702                stream_name = self._unescape_manifest_path(tok)
1703                blocks = []
1704                segments = []
1705                streamoffset = 0
1706                state = BLOCKS
1707                self.find_or_create(stream_name, COLLECTION)
1708                continue
1709
1710            if state == BLOCKS:
1711                block_locator = self._block_re.match(tok)
1712                if block_locator:
1713                    blocksize = int(block_locator.group(1))
1714                    blocks.append(Range(tok, streamoffset, blocksize, 0))
1715                    streamoffset += blocksize
1716                else:
1717                    state = SEGMENTS
1718
1719            if state == SEGMENTS:
1720                file_segment = self._segment_re.match(tok)
1721                if file_segment:
1722                    pos = int(file_segment.group(1))
1723                    size = int(file_segment.group(2))
1724                    name = self._unescape_manifest_path(file_segment.group(3))
1725                    if name.split('/')[-1] == '.':
1726                        # placeholder for persisting an empty directory, not a real file
1727                        if len(name) > 2:
1728                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1729                    else:
1730                        filepath = os.path.join(stream_name, name)
1731                        try:
1732                            afile = self.find_or_create(filepath, FILE)
1733                        except IOError as e:
1734                            if e.errno == errno.ENOTDIR:
1735                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1736                            else:
1737                                raise e from None
1738                        if isinstance(afile, ArvadosFile):
1739                            afile.add_segment(blocks, pos, size)
1740                        else:
1741                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1742                else:
1743                    # error!
1744                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1745
1746            if sep == "\n":
1747                stream_name = None
1748                state = STREAM_NAME
1749
1750        self.set_committed(True)
1751
1752    @synchronized
1753    def notify(
1754            self,
1755            event: ChangeType,
1756            collection: 'RichCollectionBase',
1757            name: str,
1758            item: CollectionItem,
1759    ) -> None:
1760        if self._callback:
1761            self._callback(event, collection, name, item)

Read and manipulate an Arvados collection

This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

Collection( manifest_locator_or_text: Optional[str] = None, api_client: Optional[arvados.api_resources.ArvadosAPIClient] = None, keep_client: Optional[arvados.keep.KeepClient] = None, num_retries: int = 10, parent: Optional[Collection] = None, apiconfig: Optional[Mapping[str, str]] = None, block_manager: Optional[arvados.arvfile._BlockManager] = None, replication_desired: Optional[int] = None, storage_classes_desired: Optional[List[str]] = None, put_threads: Optional[int] = None)
1043    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1044                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1045                 keep_client: Optional['arvados.keep.KeepClient']=None,
1046                 num_retries: int=10,
1047                 parent: Optional['Collection']=None,
1048                 apiconfig: Optional[Mapping[str, str]]=None,
1049                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1050                 replication_desired: Optional[int]=None,
1051                 storage_classes_desired: Optional[List[str]]=None,
1052                 put_threads: Optional[int]=None):
1053        """Initialize a Collection object
1054
1055        Arguments:
1056
1057        * manifest_locator_or_text: str | None --- This string can contain a
1058          collection manifest text, portable data hash, or UUID. When given a
1059          portable data hash or UUID, this instance will load a collection
1060          record from the API server. Otherwise, this instance will represent a
1061          new collection without an API server record. The default value `None`
1062          instantiates a new collection with an empty manifest.
1063
1064        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1065          Arvados API client object this instance uses to make requests. If
1066          none is given, this instance creates its own client using the
1067          settings from `apiconfig` (see below). If your client instantiates
1068          many Collection objects, you can help limit memory utilization by
1069          calling `arvados.api.api` to construct an
1070          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1071          for every Collection.
1072
1073        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1074          object this instance uses to make requests. If none is given, this
1075          instance creates its own client using its `api_client`.
1076
1077        * num_retries: int --- The number of times that client requests are
1078          retried. Default 10.
1079
1080        * parent: arvados.collection.Collection | None --- The parent Collection
1081          object of this instance, if any. This argument is primarily used by
1082          other Collection methods; user client code shouldn't need to use it.
1083
1084        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1085          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1086          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1087          Collection object constructs one from these settings. If no
1088          mapping is provided, calls `arvados.config.settings` to get these
1089          parameters from user configuration.
1090
1091        * block_manager: arvados.arvfile._BlockManager | None --- The
1092          _BlockManager object used by this instance to coordinate reading
1093          and writing Keep data blocks. If none is given, this instance
1094          constructs its own. This argument is primarily used by other
1095          Collection methods; user client code shouldn't need to use it.
1096
1097        * replication_desired: int | None --- This controls both the value of
1098          the `replication_desired` field on API collection records saved by
1099          this class, as well as the number of Keep services that the object
1100          writes new data blocks to. If none is given, uses the default value
1101          configured for the cluster.
1102
1103        * storage_classes_desired: list[str] | None --- This controls both
1104          the value of the `storage_classes_desired` field on API collection
1105          records saved by this class, as well as selecting which specific
1106          Keep services the object writes new data blocks to. If none is
1107          given, defaults to an empty list.
1108
1109        * put_threads: int | None --- The number of threads to run
1110          simultaneously to upload data blocks to Keep. This value is used when
1111          building a new `block_manager`. It is unused when a `block_manager`
1112          is provided.
1113        """
1114
1115        if storage_classes_desired and type(storage_classes_desired) is not list:
1116            raise errors.ArgumentError("storage_classes_desired must be list type.")
1117
1118        super(Collection, self).__init__(parent)
1119        self._api_client = api_client
1120        self._keep_client = keep_client
1121
1122        # Use the keep client from ThreadSafeApiCache
1123        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1124            self._keep_client = self._api_client.keep
1125
1126        self._block_manager = block_manager
1127        self.replication_desired = replication_desired
1128        self._storage_classes_desired = storage_classes_desired
1129        self.put_threads = put_threads
1130
1131        if apiconfig:
1132            self._config = apiconfig
1133        else:
1134            self._config = config.settings()
1135
1136        self.num_retries = num_retries
1137        self._manifest_locator = None
1138        self._manifest_text = None
1139        self._portable_data_hash = None
1140        self._api_response = None
1141        self._past_versions = set()
1142
1143        self.lock = threading.RLock()
1144        self.events = None
1145
1146        if manifest_locator_or_text:
1147            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1148                self._manifest_locator = manifest_locator_or_text
1149            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1150                self._manifest_locator = manifest_locator_or_text
1151                if not self._has_local_collection_uuid():
1152                    self._has_remote_blocks = True
1153            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1154                self._manifest_text = manifest_locator_or_text
1155                if '+R' in self._manifest_text:
1156                    self._has_remote_blocks = True
1157            else:
1158                raise errors.ArgumentError(
1159                    "Argument to CollectionReader is not a manifest or a collection UUID")
1160
1161            try:
1162                self._populate()
1163            except errors.SyntaxError as e:
1164                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.safeapi.ThreadSafeApiCache, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

replication_desired
put_threads
num_retries
lock
events
def storage_classes_desired(self) -> List[str]:
1166    def storage_classes_desired(self) -> List[str]:
1167        """Get this collection's `storage_classes_desired` value"""
1168        return self._storage_classes_desired or []

Get this collection’s storage_classes_desired value

def root_collection(self) -> Collection:
1170    def root_collection(self) -> 'Collection':
1171        return self

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def get_properties(self) -> Dict[str, Any]:
1173    def get_properties(self) -> Properties:
1174        """Get this collection's properties
1175
1176        This method always returns a dict. If this collection object does not
1177        have an associated API record, or that record does not have any
1178        properties set, this method returns an empty dict.
1179        """
1180        if self._api_response and self._api_response["properties"]:
1181            return self._api_response["properties"]
1182        else:
1183            return {}

Get this collection’s properties

This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.

def get_trash_at(self) -> Optional[datetime.datetime]:
1185    def get_trash_at(self) -> Optional[datetime.datetime]:
1186        """Get this collection's `trash_at` field
1187
1188        This method parses the `trash_at` field of the collection's API
1189        record and returns a datetime from it. If that field is not set, or
1190        this collection object does not have an associated API record,
1191        returns None.
1192        """
1193        if self._api_response and self._api_response["trash_at"]:
1194            try:
1195                return ciso8601.parse_datetime(self._api_response["trash_at"])
1196            except ValueError:
1197                return None
1198        else:
1199            return None

Get this collection’s trash_at field

This method parses the trash_at field of the collection’s API record and returns a datetime from it. If that field is not set, or this collection object does not have an associated API record, returns None.

def stream_name(self) -> str:
1201    def stream_name(self) -> str:
1202        return "."

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

def writable(self) -> bool:
1204    def writable(self) -> bool:
1205        return True

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

@synchronized
def known_past_version( self, modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]) -> bool:
1207    @synchronized
1208    def known_past_version(
1209            self,
1210            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1211    ) -> bool:
1212        """Indicate whether an API record for this collection has been seen before
1213
1214        As this collection object loads records from the API server, it records
1215        their `modified_at` and `portable_data_hash` fields. This method accepts
1216        a 2-tuple with values for those fields, and returns `True` if the
1217        combination was previously loaded.
1218        """
1219        return modified_at_and_portable_data_hash in self._past_versions

Indicate whether an API record for this collection has been seen before

As this collection object loads records from the API server, it records their modified_at and portable_data_hash fields. This method accepts a 2-tuple with values for those fields, and returns True if the combination was previously loaded.

@synchronized
@retry_method
def update( self, other: Optional[Collection] = None, num_retries: Optional[int] = None) -> None:
1221    @synchronized
1222    @retry_method
1223    def update(
1224            self,
1225            other: Optional['Collection']=None,
1226            num_retries: Optional[int]=None,
1227    ) -> None:
1228        """Merge another collection's contents into this one
1229
1230        This method compares the manifest of this collection instance with
1231        another, then updates this instance's manifest with changes from the
1232        other, renaming files to flag conflicts where necessary.
1233
1234        When called without any arguments, this method reloads the collection's
1235        API record, and updates this instance with any changes that have
1236        appeared server-side. If this instance does not have a corresponding
1237        API record, this method raises `arvados.errors.ArgumentError`.
1238
1239        Arguments:
1240
1241        * other: arvados.collection.Collection | None --- The collection
1242          whose contents should be merged into this instance. When not
1243          provided, this method reloads this collection's API record and
1244          constructs a Collection object from it.  If this instance does not
1245          have a corresponding API record, this method raises
1246          `arvados.errors.ArgumentError`.
1247
1248        * num_retries: int | None --- The number of times to retry reloading
1249          the collection's API record from the API server. If not specified,
1250          uses the `num_retries` provided when this instance was constructed.
1251        """
1252        if other is None:
1253            if self._manifest_locator is None:
1254                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1255            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1256            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1257                response.get("portable_data_hash") != self.portable_data_hash()):
1258                # The record on the server is different from our current one, but we've seen it before,
1259                # so ignore it because it's already been merged.
1260                # However, if it's the same as our current record, proceed with the update, because we want to update
1261                # our tokens.
1262                return
1263            else:
1264                self._remember_api_response(response)
1265            other = CollectionReader(response["manifest_text"])
1266        baseline = CollectionReader(self._manifest_text)
1267        self.apply(baseline.diff(other))
1268        self._manifest_text = self.manifest_text()

Merge another collection’s contents into this one

This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.

When called without any arguments, this method reloads the collection’s API record, and updates this instance with any changes that have appeared server-side. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

Arguments:

  • other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

def stop_threads(self) -> None:
1356    def stop_threads(self) -> None:
1357        """Stop background Keep upload/download threads"""
1358        if self._block_manager is not None:
1359            self._block_manager.stop_threads()

Stop background Keep upload/download threads

@synchronized
def manifest_locator(self) -> Optional[str]:
1361    @synchronized
1362    def manifest_locator(self) -> Optional[str]:
1363        """Get this collection's manifest locator, if any
1364
1365        * If this collection instance is associated with an API record with a
1366          UUID, return that.
1367        * Otherwise, if this collection instance was loaded from an API record
1368          by portable data hash, return that.
1369        * Otherwise, return `None`.
1370        """
1371        return self._manifest_locator

Get this collection’s manifest locator, if any

  • If this collection instance is associated with an API record with a UUID, return that.
  • Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
  • Otherwise, return None.
@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None, readonly: bool = False, new_config: Optional[Mapping[str, str]] = None) -> Collection:
1373    @synchronized
1374    def clone(
1375            self,
1376            new_parent: Optional['Collection']=None,
1377            new_name: Optional[str]=None,
1378            readonly: bool=False,
1379            new_config: Optional[Mapping[str, str]]=None,
1380    ) -> 'Collection':
1381        """Create a Collection object with the same contents as this instance
1382
1383        This method creates a new Collection object with contents that match
1384        this instance's. The new collection will not be associated with any API
1385        record.
1386
1387        Arguments:
1388
1389        * new_parent: arvados.collection.Collection | None --- This value is
1390          passed to the new Collection's constructor as the `parent`
1391          argument.
1392
1393        * new_name: str | None --- This value is unused.
1394
1395        * readonly: bool --- If this value is true, this method constructs and
1396          returns a `CollectionReader`. Otherwise, it returns a mutable
1397          `Collection`. Default `False`.
1398
1399        * new_config: Mapping[str, str] | None --- This value is passed to the
1400          new Collection's constructor as `apiconfig`. If no value is provided,
1401          defaults to the configuration passed to this instance's constructor.
1402        """
1403        if new_config is None:
1404            new_config = self._config
1405        if readonly:
1406            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1407        else:
1408            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1409
1410        newcollection._clonefrom(self)
1411        return newcollection

Create a Collection object with the same contents as this instance

This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.

Arguments:

  • new_parent: Collection | None — This value is passed to the new Collection’s constructor as the parent argument.

  • new_name: str | None — This value is unused.

  • readonly: bool — If this value is true, this method constructs and returns a CollectionReader. Otherwise, it returns a mutable Collection. Default False.

  • new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as apiconfig. If no value is provided, defaults to the configuration passed to this instance’s constructor.

@synchronized
def api_response(self) -> Optional[Dict[str, Any]]:
1413    @synchronized
1414    def api_response(self) -> Optional[Dict[str, Any]]:
1415        """Get this instance's associated API record
1416
1417        If this Collection instance has an associated API record, return it.
1418        Otherwise, return `None`.
1419        """
1420        return self._api_response

Get this instance’s associated API record

If this Collection instance has an associated API record, return it. Otherwise, return None.

def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1422    def find_or_create(
1423            self,
1424            path: str,
1425            create_type: CreateType,
1426    ) -> CollectionItem:
1427        if path == ".":
1428            return self
1429        else:
1430            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1432    def find(self, path: str) -> CollectionItem:
1433        if path == ".":
1434            return self
1435        else:
1436            return super(Collection, self).find(path[2:] if path.startswith("./") else path)

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
def remove(self, path: str, recursive: bool = False) -> None:
1438    def remove(self, path: str, recursive: bool=False) -> None:
1439        if path == ".":
1440            raise errors.ArgumentError("Cannot remove '.'")
1441        else:
1442            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

@must_be_writable
@synchronized
@retry_method
def save( self, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, merge: bool = True, num_retries: Optional[int] = None, preserve_version: bool = False) -> str:
1444    @must_be_writable
1445    @synchronized
1446    @retry_method
1447    def save(
1448            self,
1449            properties: Optional[Properties]=None,
1450            storage_classes: Optional[StorageClasses]=None,
1451            trash_at: Optional[datetime.datetime]=None,
1452            merge: bool=True,
1453            num_retries: Optional[int]=None,
1454            preserve_version: bool=False,
1455    ) -> str:
1456        """Save collection to an existing API record
1457
1458        This method updates the instance's corresponding API record to match
1459        the instance's state. If this instance does not have a corresponding API
1460        record yet, raises `AssertionError`. (To create a new API record, use
1461        `Collection.save_new`.) This method returns the saved collection
1462        manifest.
1463
1464        Arguments:
1465
1466        * properties: dict[str, Any] | None --- If provided, the API record will
1467          be updated with these properties. Note this will completely replace
1468          any existing properties.
1469
1470        * storage_classes: list[str] | None --- If provided, the API record will
1471          be updated with this value in the `storage_classes_desired` field.
1472          This value will also be saved on the instance and used for any
1473          changes that follow.
1474
1475        * trash_at: datetime.datetime | None --- If provided, the API record
1476          will be updated with this value in the `trash_at` field.
1477
1478        * merge: bool --- If `True` (the default), this method will first
1479          reload this collection's API record, and merge any new contents into
1480          this instance before saving changes. See `Collection.update` for
1481          details.
1482
1483        * num_retries: int | None --- The number of times to retry reloading
1484          the collection's API record from the API server. If not specified,
1485          uses the `num_retries` provided when this instance was constructed.
1486
1487        * preserve_version: bool --- This value will be passed to directly
1488          to the underlying API call. If `True`, the Arvados API will
1489          preserve the versions of this collection both immediately before
1490          and after the update. If `True` when the API server is not
1491          configured with collection versioning, this method raises
1492          `arvados.errors.ArgumentError`.
1493        """
1494        if properties and type(properties) is not dict:
1495            raise errors.ArgumentError("properties must be dictionary type.")
1496
1497        if storage_classes and type(storage_classes) is not list:
1498            raise errors.ArgumentError("storage_classes must be list type.")
1499        if storage_classes:
1500            self._storage_classes_desired = storage_classes
1501
1502        if trash_at and type(trash_at) is not datetime.datetime:
1503            raise errors.ArgumentError("trash_at must be datetime type.")
1504
1505        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1506            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1507
1508        body={}
1509        if properties:
1510            body["properties"] = properties
1511        if self.storage_classes_desired():
1512            body["storage_classes_desired"] = self.storage_classes_desired()
1513        if trash_at:
1514            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1515            body["trash_at"] = t
1516        if preserve_version:
1517            body["preserve_version"] = preserve_version
1518
1519        if not self.committed():
1520            if self._has_remote_blocks:
1521                # Copy any remote blocks to the local cluster.
1522                self._copy_remote_blocks(remote_blocks={})
1523                self._has_remote_blocks = False
1524            if not self._has_collection_uuid():
1525                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1526            elif not self._has_local_collection_uuid():
1527                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1528
1529            self._my_block_manager().commit_all()
1530
1531            if merge:
1532                self.update()
1533
1534            text = self.manifest_text(strip=False)
1535            body['manifest_text'] = text
1536
1537            self._remember_api_response(self._my_api().collections().update(
1538                uuid=self._manifest_locator,
1539                body=body
1540                ).execute(num_retries=num_retries))
1541            self._manifest_text = self._api_response["manifest_text"]
1542            self._portable_data_hash = self._api_response["portable_data_hash"]
1543            self.set_committed(True)
1544        elif body:
1545            self._remember_api_response(self._my_api().collections().update(
1546                uuid=self._manifest_locator,
1547                body=body
1548                ).execute(num_retries=num_retries))
1549
1550        return self._manifest_text

Save collection to an existing API record

This method updates the instance’s corresponding API record to match the instance’s state. If this instance does not have a corresponding API record yet, raises AssertionError. (To create a new API record, use Collection.save_new.) This method returns the saved collection manifest.

Arguments:

  • properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.

  • storage_classes: list[str] | None — If provided, the API record will be updated with this value in the storage_classes_desired field. This value will also be saved on the instance and used for any changes that follow.

  • trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the trash_at field.

  • merge: bool — If True (the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. See Collection.update for details.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@must_be_writable
@synchronized
@retry_method
def save_new( self, name: Optional[str] = None, create_collection_record: bool = True, owner_uuid: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, ensure_unique_name: bool = False, num_retries: Optional[int] = None, preserve_version: bool = False):
1553    @must_be_writable
1554    @synchronized
1555    @retry_method
1556    def save_new(
1557            self,
1558            name: Optional[str]=None,
1559            create_collection_record: bool=True,
1560            owner_uuid: Optional[str]=None,
1561            properties: Optional[Properties]=None,
1562            storage_classes: Optional[StorageClasses]=None,
1563            trash_at: Optional[datetime.datetime]=None,
1564            ensure_unique_name: bool=False,
1565            num_retries: Optional[int]=None,
1566            preserve_version: bool=False,
1567    ):
1568        """Save collection to a new API record
1569
1570        This method finishes uploading new data blocks and (optionally)
1571        creates a new API collection record with the provided data. If a new
1572        record is created, this instance becomes associated with that record
1573        for future updates like `save()`. This method returns the saved
1574        collection manifest.
1575
1576        Arguments:
1577
1578        * name: str | None --- The `name` field to use on the new collection
1579          record. If not specified, a generic default name is generated.
1580
1581        * create_collection_record: bool --- If `True` (the default), creates a
1582          collection record on the API server. If `False`, the method finishes
1583          all data uploads and only returns the resulting collection manifest
1584          without sending it to the API server.
1585
1586        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1587          new collection record.
1588
1589        * properties: dict[str, Any] | None --- The `properties` field to use on
1590          the new collection record.
1591
1592        * storage_classes: list[str] | None --- The
1593          `storage_classes_desired` field to use on the new collection record.
1594
1595        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1596          on the new collection record.
1597
1598        * ensure_unique_name: bool --- This value is passed directly to the
1599          Arvados API when creating the collection record. If `True`, the API
1600          server may modify the submitted `name` to ensure the collection's
1601          `name`+`owner_uuid` combination is unique. If `False` (the default),
1602          if a collection already exists with this same `name`+`owner_uuid`
1603          combination, creating a collection record will raise a validation
1604          error.
1605
1606        * num_retries: int | None --- The number of times to retry reloading
1607          the collection's API record from the API server. If not specified,
1608          uses the `num_retries` provided when this instance was constructed.
1609
1610        * preserve_version: bool --- This value will be passed to directly
1611          to the underlying API call. If `True`, the Arvados API will
1612          preserve the versions of this collection both immediately before
1613          and after the update. If `True` when the API server is not
1614          configured with collection versioning, this method raises
1615          `arvados.errors.ArgumentError`.
1616        """
1617        if properties and type(properties) is not dict:
1618            raise errors.ArgumentError("properties must be dictionary type.")
1619
1620        if storage_classes and type(storage_classes) is not list:
1621            raise errors.ArgumentError("storage_classes must be list type.")
1622
1623        if trash_at and type(trash_at) is not datetime.datetime:
1624            raise errors.ArgumentError("trash_at must be datetime type.")
1625
1626        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1627            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1628
1629        if self._has_remote_blocks:
1630            # Copy any remote blocks to the local cluster.
1631            self._copy_remote_blocks(remote_blocks={})
1632            self._has_remote_blocks = False
1633
1634        if storage_classes:
1635            self._storage_classes_desired = storage_classes
1636
1637        self._my_block_manager().commit_all()
1638        text = self.manifest_text(strip=False)
1639
1640        if create_collection_record:
1641            if name is None:
1642                name = "New collection"
1643                ensure_unique_name = True
1644
1645            body = {"manifest_text": text,
1646                    "name": name,
1647                    "replication_desired": self.replication_desired}
1648            if owner_uuid:
1649                body["owner_uuid"] = owner_uuid
1650            if properties:
1651                body["properties"] = properties
1652            if self.storage_classes_desired():
1653                body["storage_classes_desired"] = self.storage_classes_desired()
1654            if trash_at:
1655                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1656                body["trash_at"] = t
1657            if preserve_version:
1658                body["preserve_version"] = preserve_version
1659
1660            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1661            text = self._api_response["manifest_text"]
1662
1663            self._manifest_locator = self._api_response["uuid"]
1664            self._portable_data_hash = self._api_response["portable_data_hash"]
1665
1666            self._manifest_text = text
1667            self.set_committed(True)
1668
1669        return text

Save collection to a new API record

This method finishes uploading new data blocks and (optionally) creates a new API collection record with the provided data. If a new record is created, this instance becomes associated with that record for future updates like save(). This method returns the saved collection manifest.

Arguments:

  • name: str | None — The name field to use on the new collection record. If not specified, a generic default name is generated.

  • create_collection_record: bool — If True (the default), creates a collection record on the API server. If False, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.

  • owner_uuid: str | None — The owner_uuid field to use on the new collection record.

  • properties: dict[str, Any] | None — The properties field to use on the new collection record.

  • storage_classes: list[str] | None — The storage_classes_desired field to use on the new collection record.

  • trash_at: datetime.datetime | None — The trash_at field to use on the new collection record.

  • ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If True, the API server may modify the submitted name to ensure the collection’s name+owner_uuid combination is unique. If False (the default), if a collection already exists with this same name+owner_uuid combination, creating a collection record will raise a validation error.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
1752    @synchronized
1753    def notify(
1754            self,
1755            event: ChangeType,
1756            collection: 'RichCollectionBase',
1757            name: str,
1758            item: CollectionItem,
1759    ) -> None:
1760        if self._callback:
1761            self._callback(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

class Subcollection(RichCollectionBase):
1764class Subcollection(RichCollectionBase):
1765    """Read and manipulate a stream/directory within an Arvados collection
1766
1767    This class represents a single stream (like a directory) within an Arvados
1768    `Collection`. It is returned by `Collection.find` and provides the same API.
1769    Operations that work on the API collection record propagate to the parent
1770    `Collection` object.
1771    """
1772
1773    def __init__(self, parent, name):
1774        super(Subcollection, self).__init__(parent)
1775        self.lock = self.root_collection().lock
1776        self._manifest_text = None
1777        self.name = name
1778        self.num_retries = parent.num_retries
1779
1780    def root_collection(self) -> 'Collection':
1781        return self.parent.root_collection()
1782
1783    def writable(self) -> bool:
1784        return self.root_collection().writable()
1785
1786    def _my_api(self):
1787        return self.root_collection()._my_api()
1788
1789    def _my_keep(self):
1790        return self.root_collection()._my_keep()
1791
1792    def _my_block_manager(self):
1793        return self.root_collection()._my_block_manager()
1794
1795    def stream_name(self) -> str:
1796        return os.path.join(self.parent.stream_name(), self.name)
1797
1798    @synchronized
1799    def clone(
1800            self,
1801            new_parent: Optional['Collection']=None,
1802            new_name: Optional[str]=None,
1803    ) -> 'Subcollection':
1804        c = Subcollection(new_parent, new_name)
1805        c._clonefrom(self)
1806        return c
1807
1808    @must_be_writable
1809    @synchronized
1810    def _reparent(self, newparent, newname):
1811        self.set_committed(False)
1812        self.flush()
1813        self.parent.remove(self.name, recursive=True)
1814        self.parent = newparent
1815        self.name = newname
1816        self.lock = self.parent.root_collection().lock
1817
1818    @synchronized
1819    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1820        """Encode empty directories by using an \056-named (".") empty file"""
1821        if len(self._items) == 0:
1822            return "%s %s 0:0:\\056\n" % (
1823                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1824        return super(Subcollection, self)._get_manifest_text(stream_name,
1825                                                             strip, normalize,
1826                                                             only_committed)

Read and manipulate a stream/directory within an Arvados collection

This class represents a single stream (like a directory) within an Arvados Collection. It is returned by Collection.find and provides the same API. Operations that work on the API collection record propagate to the parent Collection object.

Subcollection(parent, name)
1773    def __init__(self, parent, name):
1774        super(Subcollection, self).__init__(parent)
1775        self.lock = self.root_collection().lock
1776        self._manifest_text = None
1777        self.name = name
1778        self.num_retries = parent.num_retries
lock
name
num_retries
def root_collection(self) -> Collection:
1780    def root_collection(self) -> 'Collection':
1781        return self.parent.root_collection()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def writable(self) -> bool:
1783    def writable(self) -> bool:
1784        return self.root_collection().writable()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def stream_name(self) -> str:
1795    def stream_name(self) -> str:
1796        return os.path.join(self.parent.stream_name(), self.name)

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None) -> Subcollection:
1798    @synchronized
1799    def clone(
1800            self,
1801            new_parent: Optional['Collection']=None,
1802            new_name: Optional[str]=None,
1803    ) -> 'Subcollection':
1804        c = Subcollection(new_parent, new_name)
1805        c._clonefrom(self)
1806        return c
class CollectionReader(Collection):
1829class CollectionReader(Collection):
1830    """Read-only `Collection` subclass
1831
1832    This class will never create or update any API collection records. You can
1833    use this class for additional code safety when you only need to read
1834    existing collections.
1835    """
1836    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1837        self._in_init = True
1838        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1839        self._in_init = False
1840
1841        # Forego any locking since it should never change once initialized.
1842        self.lock = NoopLock()
1843
1844        # Backwards compatability with old CollectionReader
1845        # all_streams() and all_files()
1846        self._streams = None
1847
1848    def writable(self) -> bool:
1849        return self._in_init
1850
1851    def _populate_streams(orig_func):
1852        @functools.wraps(orig_func)
1853        def populate_streams_wrapper(self, *args, **kwargs):
1854            # Defer populating self._streams until needed since it creates a copy of the manifest.
1855            if self._streams is None:
1856                if self._manifest_text:
1857                    self._streams = [sline.split()
1858                                     for sline in self._manifest_text.split("\n")
1859                                     if sline]
1860                else:
1861                    self._streams = []
1862            return orig_func(self, *args, **kwargs)
1863        return populate_streams_wrapper
1864
1865    @arvados.util._deprecated('3.0', 'Collection iteration')
1866    @_populate_streams
1867    def normalize(self):
1868        """Normalize the streams returned by `all_streams`"""
1869        streams = {}
1870        for s in self.all_streams():
1871            for f in s.all_files():
1872                streamname, filename = split(s.name() + "/" + f.name())
1873                if streamname not in streams:
1874                    streams[streamname] = {}
1875                if filename not in streams[streamname]:
1876                    streams[streamname][filename] = []
1877                for r in f.segments:
1878                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1879
1880        self._streams = [normalize_stream(s, streams[s])
1881                         for s in sorted(streams)]
1882
1883    @arvados.util._deprecated('3.0', 'Collection iteration')
1884    @_populate_streams
1885    def all_streams(self):
1886        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1887                for s in self._streams]
1888
1889    @arvados.util._deprecated('3.0', 'Collection iteration')
1890    @_populate_streams
1891    def all_files(self):
1892        for s in self.all_streams():
1893            for f in s.all_files():
1894                yield f

Read-only Collection subclass

This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.

CollectionReader(manifest_locator_or_text, *args, **kwargs)
1836    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1837        self._in_init = True
1838        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1839        self._in_init = False
1840
1841        # Forego any locking since it should never change once initialized.
1842        self.lock = NoopLock()
1843
1844        # Backwards compatability with old CollectionReader
1845        # all_streams() and all_files()
1846        self._streams = None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.safeapi.ThreadSafeApiCache, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

lock
def writable(self) -> bool:
1848    def writable(self) -> bool:
1849        return self._in_init

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

@arvados.util._deprecated('3.0', 'Collection iteration')
def normalize(self):
1865    @arvados.util._deprecated('3.0', 'Collection iteration')
1866    @_populate_streams
1867    def normalize(self):
1868        """Normalize the streams returned by `all_streams`"""
1869        streams = {}
1870        for s in self.all_streams():
1871            for f in s.all_files():
1872                streamname, filename = split(s.name() + "/" + f.name())
1873                if streamname not in streams:
1874                    streams[streamname] = {}
1875                if filename not in streams[streamname]:
1876                    streams[streamname][filename] = []
1877                for r in f.segments:
1878                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1879
1880        self._streams = [normalize_stream(s, streams[s])
1881                         for s in sorted(streams)]

Normalize the streams returned by all_streams

@arvados.util._deprecated('3.0', 'Collection iteration')
def all_streams(self):
1883    @arvados.util._deprecated('3.0', 'Collection iteration')
1884    @_populate_streams
1885    def all_streams(self):
1886        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1887                for s in self._streams]
@arvados.util._deprecated('3.0', 'Collection iteration')
def all_files(self):
1889    @arvados.util._deprecated('3.0', 'Collection iteration')
1890    @_populate_streams
1891    def all_files(self):
1892        for s in self.all_streams():
1893            for f in s.all_files():
1894                yield f
class CollectionWriter(CollectionBase):
1897class CollectionWriter(CollectionBase):
1898    """Create a new collection from scratch
1899
1900    .. WARNING:: Deprecated
1901       This class is deprecated. Prefer `arvados.collection.Collection`
1902       instead.
1903    """
1904
1905    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1906    def __init__(self, api_client=None, num_retries=0, replication=None):
1907        """Instantiate a CollectionWriter.
1908
1909        CollectionWriter lets you build a new Arvados Collection from scratch.
1910        Write files to it.  The CollectionWriter will upload data to Keep as
1911        appropriate, and provide you with the Collection manifest text when
1912        you're finished.
1913
1914        Arguments:
1915        * api_client: The API client to use to look up Collections.  If not
1916          provided, CollectionReader will build one from available Arvados
1917          configuration.
1918        * num_retries: The default number of times to retry failed
1919          service requests.  Default 0.  You may change this value
1920          after instantiation, but note those changes may not
1921          propagate to related objects like the Keep client.
1922        * replication: The number of copies of each block to store.
1923          If this argument is None or not supplied, replication is
1924          the server-provided default if available, otherwise 2.
1925        """
1926        self._api_client = api_client
1927        self.num_retries = num_retries
1928        self.replication = (2 if replication is None else replication)
1929        self._keep_client = None
1930        self._data_buffer = []
1931        self._data_buffer_len = 0
1932        self._current_stream_files = []
1933        self._current_stream_length = 0
1934        self._current_stream_locators = []
1935        self._current_stream_name = '.'
1936        self._current_file_name = None
1937        self._current_file_pos = 0
1938        self._finished_streams = []
1939        self._close_file = None
1940        self._queued_file = None
1941        self._queued_dirents = deque()
1942        self._queued_trees = deque()
1943        self._last_open = None
1944
1945    def __exit__(self, exc_type, exc_value, traceback):
1946        if exc_type is None:
1947            self.finish()
1948
1949    def do_queued_work(self):
1950        # The work queue consists of three pieces:
1951        # * _queued_file: The file object we're currently writing to the
1952        #   Collection.
1953        # * _queued_dirents: Entries under the current directory
1954        #   (_queued_trees[0]) that we want to write or recurse through.
1955        #   This may contain files from subdirectories if
1956        #   max_manifest_depth == 0 for this directory.
1957        # * _queued_trees: Directories that should be written as separate
1958        #   streams to the Collection.
1959        # This function handles the smallest piece of work currently queued
1960        # (current file, then current directory, then next directory) until
1961        # no work remains.  The _work_THING methods each do a unit of work on
1962        # THING.  _queue_THING methods add a THING to the work queue.
1963        while True:
1964            if self._queued_file:
1965                self._work_file()
1966            elif self._queued_dirents:
1967                self._work_dirents()
1968            elif self._queued_trees:
1969                self._work_trees()
1970            else:
1971                break
1972
1973    def _work_file(self):
1974        while True:
1975            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1976            if not buf:
1977                break
1978            self.write(buf)
1979        self.finish_current_file()
1980        if self._close_file:
1981            self._queued_file.close()
1982        self._close_file = None
1983        self._queued_file = None
1984
1985    def _work_dirents(self):
1986        path, stream_name, max_manifest_depth = self._queued_trees[0]
1987        if stream_name != self.current_stream_name():
1988            self.start_new_stream(stream_name)
1989        while self._queued_dirents:
1990            dirent = self._queued_dirents.popleft()
1991            target = os.path.join(path, dirent)
1992            if os.path.isdir(target):
1993                self._queue_tree(target,
1994                                 os.path.join(stream_name, dirent),
1995                                 max_manifest_depth - 1)
1996            else:
1997                self._queue_file(target, dirent)
1998                break
1999        if not self._queued_dirents:
2000            self._queued_trees.popleft()
2001
2002    def _work_trees(self):
2003        path, stream_name, max_manifest_depth = self._queued_trees[0]
2004        d = arvados.util.listdir_recursive(
2005            path, max_depth = (None if max_manifest_depth == 0 else 0))
2006        if d:
2007            self._queue_dirents(stream_name, d)
2008        else:
2009            self._queued_trees.popleft()
2010
2011    def _queue_file(self, source, filename=None):
2012        assert (self._queued_file is None), "tried to queue more than one file"
2013        if not hasattr(source, 'read'):
2014            source = open(source, 'rb')
2015            self._close_file = True
2016        else:
2017            self._close_file = False
2018        if filename is None:
2019            filename = os.path.basename(source.name)
2020        self.start_new_file(filename)
2021        self._queued_file = source
2022
2023    def _queue_dirents(self, stream_name, dirents):
2024        assert (not self._queued_dirents), "tried to queue more than one tree"
2025        self._queued_dirents = deque(sorted(dirents))
2026
2027    def _queue_tree(self, path, stream_name, max_manifest_depth):
2028        self._queued_trees.append((path, stream_name, max_manifest_depth))
2029
2030    def write_file(self, source, filename=None):
2031        self._queue_file(source, filename)
2032        self.do_queued_work()
2033
2034    def write_directory_tree(self,
2035                             path, stream_name='.', max_manifest_depth=-1):
2036        self._queue_tree(path, stream_name, max_manifest_depth)
2037        self.do_queued_work()
2038
2039    def write(self, newdata):
2040        if isinstance(newdata, bytes):
2041            pass
2042        elif isinstance(newdata, str):
2043            newdata = newdata.encode()
2044        elif hasattr(newdata, '__iter__'):
2045            for s in newdata:
2046                self.write(s)
2047            return
2048        self._data_buffer.append(newdata)
2049        self._data_buffer_len += len(newdata)
2050        self._current_stream_length += len(newdata)
2051        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2052            self.flush_data()
2053
2054    def open(self, streampath, filename=None):
2055        """open(streampath[, filename]) -> file-like object
2056
2057        Pass in the path of a file to write to the Collection, either as a
2058        single string or as two separate stream name and file name arguments.
2059        This method returns a file-like object you can write to add it to the
2060        Collection.
2061
2062        You may only have one file object from the Collection open at a time,
2063        so be sure to close the object when you're done.  Using the object in
2064        a with statement makes that easy:
2065
2066            with cwriter.open('./doc/page1.txt') as outfile:
2067                outfile.write(page1_data)
2068            with cwriter.open('./doc/page2.txt') as outfile:
2069                outfile.write(page2_data)
2070        """
2071        if filename is None:
2072            streampath, filename = split(streampath)
2073        if self._last_open and not self._last_open.closed:
2074            raise errors.AssertionError(
2075                u"can't open '{}' when '{}' is still open".format(
2076                    filename, self._last_open.name))
2077        if streampath != self.current_stream_name():
2078            self.start_new_stream(streampath)
2079        self.set_current_file_name(filename)
2080        self._last_open = _WriterFile(self, filename)
2081        return self._last_open
2082
2083    def flush_data(self):
2084        data_buffer = b''.join(self._data_buffer)
2085        if data_buffer:
2086            self._current_stream_locators.append(
2087                self._my_keep().put(
2088                    data_buffer[0:config.KEEP_BLOCK_SIZE],
2089                    copies=self.replication))
2090            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2091            self._data_buffer_len = len(self._data_buffer[0])
2092
2093    def start_new_file(self, newfilename=None):
2094        self.finish_current_file()
2095        self.set_current_file_name(newfilename)
2096
2097    def set_current_file_name(self, newfilename):
2098        if re.search(r'[\t\n]', newfilename):
2099            raise errors.AssertionError(
2100                "Manifest filenames cannot contain whitespace: %s" %
2101                newfilename)
2102        elif re.search(r'\x00', newfilename):
2103            raise errors.AssertionError(
2104                "Manifest filenames cannot contain NUL characters: %s" %
2105                newfilename)
2106        self._current_file_name = newfilename
2107
2108    def current_file_name(self):
2109        return self._current_file_name
2110
2111    def finish_current_file(self):
2112        if self._current_file_name is None:
2113            if self._current_file_pos == self._current_stream_length:
2114                return
2115            raise errors.AssertionError(
2116                "Cannot finish an unnamed file " +
2117                "(%d bytes at offset %d in '%s' stream)" %
2118                (self._current_stream_length - self._current_file_pos,
2119                 self._current_file_pos,
2120                 self._current_stream_name))
2121        self._current_stream_files.append([
2122                self._current_file_pos,
2123                self._current_stream_length - self._current_file_pos,
2124                self._current_file_name])
2125        self._current_file_pos = self._current_stream_length
2126        self._current_file_name = None
2127
2128    def start_new_stream(self, newstreamname='.'):
2129        self.finish_current_stream()
2130        self.set_current_stream_name(newstreamname)
2131
2132    def set_current_stream_name(self, newstreamname):
2133        if re.search(r'[\t\n]', newstreamname):
2134            raise errors.AssertionError(
2135                "Manifest stream names cannot contain whitespace: '%s'" %
2136                (newstreamname))
2137        self._current_stream_name = '.' if newstreamname=='' else newstreamname
2138
2139    def current_stream_name(self):
2140        return self._current_stream_name
2141
2142    def finish_current_stream(self):
2143        self.finish_current_file()
2144        self.flush_data()
2145        if not self._current_stream_files:
2146            pass
2147        elif self._current_stream_name is None:
2148            raise errors.AssertionError(
2149                "Cannot finish an unnamed stream (%d bytes in %d files)" %
2150                (self._current_stream_length, len(self._current_stream_files)))
2151        else:
2152            if not self._current_stream_locators:
2153                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2154            self._finished_streams.append([self._current_stream_name,
2155                                           self._current_stream_locators,
2156                                           self._current_stream_files])
2157        self._current_stream_files = []
2158        self._current_stream_length = 0
2159        self._current_stream_locators = []
2160        self._current_stream_name = None
2161        self._current_file_pos = 0
2162        self._current_file_name = None
2163
2164    def finish(self):
2165        """Store the manifest in Keep and return its locator.
2166
2167        This is useful for storing manifest fragments (task outputs)
2168        temporarily in Keep during a Crunch job.
2169
2170        In other cases you should make a collection instead, by
2171        sending manifest_text() to the API server's "create
2172        collection" endpoint.
2173        """
2174        return self._my_keep().put(self.manifest_text().encode(),
2175                                   copies=self.replication)
2176
2177    def portable_data_hash(self):
2178        stripped = self.stripped_manifest().encode()
2179        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2180
2181    def manifest_text(self):
2182        self.finish_current_stream()
2183        manifest = ''
2184
2185        for stream in self._finished_streams:
2186            if not re.search(r'^\.(/.*)?$', stream[0]):
2187                manifest += './'
2188            manifest += stream[0].replace(' ', '\\040')
2189            manifest += ' ' + ' '.join(stream[1])
2190            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2191            manifest += "\n"
2192
2193        return manifest
2194
2195    def data_locators(self):
2196        ret = []
2197        for name, locators, files in self._finished_streams:
2198            ret += locators
2199        return ret
2200
2201    def save_new(self, name=None):
2202        return self._api_client.collections().create(
2203            ensure_unique_name=True,
2204            body={
2205                'name': name,
2206                'manifest_text': self.manifest_text(),
2207            }).execute(num_retries=self.num_retries)

Create a new collection from scratch

@arvados.util._deprecated('3.0', 'arvados.collection.Collection')
CollectionWriter(api_client=None, num_retries=0, replication=None)
1905    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1906    def __init__(self, api_client=None, num_retries=0, replication=None):
1907        """Instantiate a CollectionWriter.
1908
1909        CollectionWriter lets you build a new Arvados Collection from scratch.
1910        Write files to it.  The CollectionWriter will upload data to Keep as
1911        appropriate, and provide you with the Collection manifest text when
1912        you're finished.
1913
1914        Arguments:
1915        * api_client: The API client to use to look up Collections.  If not
1916          provided, CollectionReader will build one from available Arvados
1917          configuration.
1918        * num_retries: The default number of times to retry failed
1919          service requests.  Default 0.  You may change this value
1920          after instantiation, but note those changes may not
1921          propagate to related objects like the Keep client.
1922        * replication: The number of copies of each block to store.
1923          If this argument is None or not supplied, replication is
1924          the server-provided default if available, otherwise 2.
1925        """
1926        self._api_client = api_client
1927        self.num_retries = num_retries
1928        self.replication = (2 if replication is None else replication)
1929        self._keep_client = None
1930        self._data_buffer = []
1931        self._data_buffer_len = 0
1932        self._current_stream_files = []
1933        self._current_stream_length = 0
1934        self._current_stream_locators = []
1935        self._current_stream_name = '.'
1936        self._current_file_name = None
1937        self._current_file_pos = 0
1938        self._finished_streams = []
1939        self._close_file = None
1940        self._queued_file = None
1941        self._queued_dirents = deque()
1942        self._queued_trees = deque()
1943        self._last_open = None

Instantiate a CollectionWriter.

CollectionWriter lets you build a new Arvados Collection from scratch. Write files to it. The CollectionWriter will upload data to Keep as appropriate, and provide you with the Collection manifest text when you’re finished.

Arguments:

  • api_client: The API client to use to look up Collections. If not provided, CollectionReader will build one from available Arvados configuration.
  • num_retries: The default number of times to retry failed service requests. Default 0. You may change this value after instantiation, but note those changes may not propagate to related objects like the Keep client.
  • replication: The number of copies of each block to store. If this argument is None or not supplied, replication is the server-provided default if available, otherwise 2.
num_retries
replication
def do_queued_work(self):
1949    def do_queued_work(self):
1950        # The work queue consists of three pieces:
1951        # * _queued_file: The file object we're currently writing to the
1952        #   Collection.
1953        # * _queued_dirents: Entries under the current directory
1954        #   (_queued_trees[0]) that we want to write or recurse through.
1955        #   This may contain files from subdirectories if
1956        #   max_manifest_depth == 0 for this directory.
1957        # * _queued_trees: Directories that should be written as separate
1958        #   streams to the Collection.
1959        # This function handles the smallest piece of work currently queued
1960        # (current file, then current directory, then next directory) until
1961        # no work remains.  The _work_THING methods each do a unit of work on
1962        # THING.  _queue_THING methods add a THING to the work queue.
1963        while True:
1964            if self._queued_file:
1965                self._work_file()
1966            elif self._queued_dirents:
1967                self._work_dirents()
1968            elif self._queued_trees:
1969                self._work_trees()
1970            else:
1971                break
def write_file(self, source, filename=None):
2030    def write_file(self, source, filename=None):
2031        self._queue_file(source, filename)
2032        self.do_queued_work()
def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
2034    def write_directory_tree(self,
2035                             path, stream_name='.', max_manifest_depth=-1):
2036        self._queue_tree(path, stream_name, max_manifest_depth)
2037        self.do_queued_work()
def write(self, newdata):
2039    def write(self, newdata):
2040        if isinstance(newdata, bytes):
2041            pass
2042        elif isinstance(newdata, str):
2043            newdata = newdata.encode()
2044        elif hasattr(newdata, '__iter__'):
2045            for s in newdata:
2046                self.write(s)
2047            return
2048        self._data_buffer.append(newdata)
2049        self._data_buffer_len += len(newdata)
2050        self._current_stream_length += len(newdata)
2051        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2052            self.flush_data()
def open(self, streampath, filename=None):
2054    def open(self, streampath, filename=None):
2055        """open(streampath[, filename]) -> file-like object
2056
2057        Pass in the path of a file to write to the Collection, either as a
2058        single string or as two separate stream name and file name arguments.
2059        This method returns a file-like object you can write to add it to the
2060        Collection.
2061
2062        You may only have one file object from the Collection open at a time,
2063        so be sure to close the object when you're done.  Using the object in
2064        a with statement makes that easy:
2065
2066            with cwriter.open('./doc/page1.txt') as outfile:
2067                outfile.write(page1_data)
2068            with cwriter.open('./doc/page2.txt') as outfile:
2069                outfile.write(page2_data)
2070        """
2071        if filename is None:
2072            streampath, filename = split(streampath)
2073        if self._last_open and not self._last_open.closed:
2074            raise errors.AssertionError(
2075                u"can't open '{}' when '{}' is still open".format(
2076                    filename, self._last_open.name))
2077        if streampath != self.current_stream_name():
2078            self.start_new_stream(streampath)
2079        self.set_current_file_name(filename)
2080        self._last_open = _WriterFile(self, filename)
2081        return self._last_open

open(streampath[, filename]) -> file-like object

Pass in the path of a file to write to the Collection, either as a single string or as two separate stream name and file name arguments. This method returns a file-like object you can write to add it to the Collection.

You may only have one file object from the Collection open at a time, so be sure to close the object when you’re done. Using the object in a with statement makes that easy:

with cwriter.open('./doc/page1.txt') as outfile:
    outfile.write(page1_data)
with cwriter.open('./doc/page2.txt') as outfile:
    outfile.write(page2_data)
def flush_data(self):
2083    def flush_data(self):
2084        data_buffer = b''.join(self._data_buffer)
2085        if data_buffer:
2086            self._current_stream_locators.append(
2087                self._my_keep().put(
2088                    data_buffer[0:config.KEEP_BLOCK_SIZE],
2089                    copies=self.replication))
2090            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2091            self._data_buffer_len = len(self._data_buffer[0])
def start_new_file(self, newfilename=None):
2093    def start_new_file(self, newfilename=None):
2094        self.finish_current_file()
2095        self.set_current_file_name(newfilename)
def set_current_file_name(self, newfilename):
2097    def set_current_file_name(self, newfilename):
2098        if re.search(r'[\t\n]', newfilename):
2099            raise errors.AssertionError(
2100                "Manifest filenames cannot contain whitespace: %s" %
2101                newfilename)
2102        elif re.search(r'\x00', newfilename):
2103            raise errors.AssertionError(
2104                "Manifest filenames cannot contain NUL characters: %s" %
2105                newfilename)
2106        self._current_file_name = newfilename
def current_file_name(self):
2108    def current_file_name(self):
2109        return self._current_file_name
def finish_current_file(self):
2111    def finish_current_file(self):
2112        if self._current_file_name is None:
2113            if self._current_file_pos == self._current_stream_length:
2114                return
2115            raise errors.AssertionError(
2116                "Cannot finish an unnamed file " +
2117                "(%d bytes at offset %d in '%s' stream)" %
2118                (self._current_stream_length - self._current_file_pos,
2119                 self._current_file_pos,
2120                 self._current_stream_name))
2121        self._current_stream_files.append([
2122                self._current_file_pos,
2123                self._current_stream_length - self._current_file_pos,
2124                self._current_file_name])
2125        self._current_file_pos = self._current_stream_length
2126        self._current_file_name = None
def start_new_stream(self, newstreamname='.'):
2128    def start_new_stream(self, newstreamname='.'):
2129        self.finish_current_stream()
2130        self.set_current_stream_name(newstreamname)
def set_current_stream_name(self, newstreamname):
2132    def set_current_stream_name(self, newstreamname):
2133        if re.search(r'[\t\n]', newstreamname):
2134            raise errors.AssertionError(
2135                "Manifest stream names cannot contain whitespace: '%s'" %
2136                (newstreamname))
2137        self._current_stream_name = '.' if newstreamname=='' else newstreamname
def current_stream_name(self):
2139    def current_stream_name(self):
2140        return self._current_stream_name
def finish_current_stream(self):
2142    def finish_current_stream(self):
2143        self.finish_current_file()
2144        self.flush_data()
2145        if not self._current_stream_files:
2146            pass
2147        elif self._current_stream_name is None:
2148            raise errors.AssertionError(
2149                "Cannot finish an unnamed stream (%d bytes in %d files)" %
2150                (self._current_stream_length, len(self._current_stream_files)))
2151        else:
2152            if not self._current_stream_locators:
2153                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2154            self._finished_streams.append([self._current_stream_name,
2155                                           self._current_stream_locators,
2156                                           self._current_stream_files])
2157        self._current_stream_files = []
2158        self._current_stream_length = 0
2159        self._current_stream_locators = []
2160        self._current_stream_name = None
2161        self._current_file_pos = 0
2162        self._current_file_name = None
def finish(self):
2164    def finish(self):
2165        """Store the manifest in Keep and return its locator.
2166
2167        This is useful for storing manifest fragments (task outputs)
2168        temporarily in Keep during a Crunch job.
2169
2170        In other cases you should make a collection instead, by
2171        sending manifest_text() to the API server's "create
2172        collection" endpoint.
2173        """
2174        return self._my_keep().put(self.manifest_text().encode(),
2175                                   copies=self.replication)

Store the manifest in Keep and return its locator.

This is useful for storing manifest fragments (task outputs) temporarily in Keep during a Crunch job.

In other cases you should make a collection instead, by sending manifest_text() to the API server’s “create collection” endpoint.

def portable_data_hash(self):
2177    def portable_data_hash(self):
2178        stripped = self.stripped_manifest().encode()
2179        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
def manifest_text(self):
2181    def manifest_text(self):
2182        self.finish_current_stream()
2183        manifest = ''
2184
2185        for stream in self._finished_streams:
2186            if not re.search(r'^\.(/.*)?$', stream[0]):
2187                manifest += './'
2188            manifest += stream[0].replace(' ', '\\040')
2189            manifest += ' ' + ' '.join(stream[1])
2190            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2191            manifest += "\n"
2192
2193        return manifest
def data_locators(self):
2195    def data_locators(self):
2196        ret = []
2197        for name, locators, files in self._finished_streams:
2198            ret += locators
2199        return ret
def save_new(self, name=None):
2201    def save_new(self, name=None):
2202        return self._api_client.collections().create(
2203            ensure_unique_name=True,
2204            body={
2205                'name': name,
2206                'manifest_text': self.manifest_text(),
2207            }).execute(num_retries=self.num_retries)
Inherited Members
CollectionBase
stripped_manifest
class ResumableCollectionWriter(CollectionWriter):
2210class ResumableCollectionWriter(CollectionWriter):
2211    """CollectionWriter that can serialize internal state to disk
2212
2213    .. WARNING:: Deprecated
2214       This class is deprecated. Prefer `arvados.collection.Collection`
2215       instead.
2216    """
2217
2218    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2219                   '_current_stream_locators', '_current_stream_name',
2220                   '_current_file_name', '_current_file_pos', '_close_file',
2221                   '_data_buffer', '_dependencies', '_finished_streams',
2222                   '_queued_dirents', '_queued_trees']
2223
2224    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2225    def __init__(self, api_client=None, **kwargs):
2226        self._dependencies = {}
2227        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2228
2229    @classmethod
2230    def from_state(cls, state, *init_args, **init_kwargs):
2231        # Try to build a new writer from scratch with the given state.
2232        # If the state is not suitable to resume (because files have changed,
2233        # been deleted, aren't predictable, etc.), raise a
2234        # StaleWriterStateError.  Otherwise, return the initialized writer.
2235        # The caller is responsible for calling writer.do_queued_work()
2236        # appropriately after it's returned.
2237        writer = cls(*init_args, **init_kwargs)
2238        for attr_name in cls.STATE_PROPS:
2239            attr_value = state[attr_name]
2240            attr_class = getattr(writer, attr_name).__class__
2241            # Coerce the value into the same type as the initial value, if
2242            # needed.
2243            if attr_class not in (type(None), attr_value.__class__):
2244                attr_value = attr_class(attr_value)
2245            setattr(writer, attr_name, attr_value)
2246        # Check dependencies before we try to resume anything.
2247        if any(KeepLocator(ls).permission_expired()
2248               for ls in writer._current_stream_locators):
2249            raise errors.StaleWriterStateError(
2250                "locators include expired permission hint")
2251        writer.check_dependencies()
2252        if state['_current_file'] is not None:
2253            path, pos = state['_current_file']
2254            try:
2255                writer._queued_file = open(path, 'rb')
2256                writer._queued_file.seek(pos)
2257            except IOError as error:
2258                raise errors.StaleWriterStateError(
2259                    u"failed to reopen active file {}: {}".format(path, error))
2260        return writer
2261
2262    def check_dependencies(self):
2263        for path, orig_stat in listitems(self._dependencies):
2264            if not S_ISREG(orig_stat[ST_MODE]):
2265                raise errors.StaleWriterStateError(u"{} not file".format(path))
2266            try:
2267                now_stat = tuple(os.stat(path))
2268            except OSError as error:
2269                raise errors.StaleWriterStateError(
2270                    u"failed to stat {}: {}".format(path, error))
2271            if ((not S_ISREG(now_stat[ST_MODE])) or
2272                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2273                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2274                raise errors.StaleWriterStateError(u"{} changed".format(path))
2275
2276    def dump_state(self, copy_func=lambda x: x):
2277        state = {attr: copy_func(getattr(self, attr))
2278                 for attr in self.STATE_PROPS}
2279        if self._queued_file is None:
2280            state['_current_file'] = None
2281        else:
2282            state['_current_file'] = (os.path.realpath(self._queued_file.name),
2283                                      self._queued_file.tell())
2284        return state
2285
2286    def _queue_file(self, source, filename=None):
2287        try:
2288            src_path = os.path.realpath(source)
2289        except Exception:
2290            raise errors.AssertionError(u"{} not a file path".format(source))
2291        try:
2292            path_stat = os.stat(src_path)
2293        except OSError as stat_error:
2294            path_stat = None
2295        super(ResumableCollectionWriter, self)._queue_file(source, filename)
2296        fd_stat = os.fstat(self._queued_file.fileno())
2297        if not S_ISREG(fd_stat.st_mode):
2298            # We won't be able to resume from this cache anyway, so don't
2299            # worry about further checks.
2300            self._dependencies[source] = tuple(fd_stat)
2301        elif path_stat is None:
2302            raise errors.AssertionError(
2303                u"could not stat {}: {}".format(source, stat_error))
2304        elif path_stat.st_ino != fd_stat.st_ino:
2305            raise errors.AssertionError(
2306                u"{} changed between open and stat calls".format(source))
2307        else:
2308            self._dependencies[src_path] = tuple(fd_stat)
2309
2310    def write(self, data):
2311        if self._queued_file is None:
2312            raise errors.AssertionError(
2313                "resumable writer can't accept unsourced data")
2314        return super(ResumableCollectionWriter, self).write(data)

CollectionWriter that can serialize internal state to disk

@arvados.util._deprecated('3.0', 'arvados.collection.Collection')
ResumableCollectionWriter(api_client=None, **kwargs)
2224    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2225    def __init__(self, api_client=None, **kwargs):
2226        self._dependencies = {}
2227        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
STATE_PROPS = ['_current_stream_files', '_current_stream_length', '_current_stream_locators', '_current_stream_name', '_current_file_name', '_current_file_pos', '_close_file', '_data_buffer', '_dependencies', '_finished_streams', '_queued_dirents', '_queued_trees']
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):
2229    @classmethod
2230    def from_state(cls, state, *init_args, **init_kwargs):
2231        # Try to build a new writer from scratch with the given state.
2232        # If the state is not suitable to resume (because files have changed,
2233        # been deleted, aren't predictable, etc.), raise a
2234        # StaleWriterStateError.  Otherwise, return the initialized writer.
2235        # The caller is responsible for calling writer.do_queued_work()
2236        # appropriately after it's returned.
2237        writer = cls(*init_args, **init_kwargs)
2238        for attr_name in cls.STATE_PROPS:
2239            attr_value = state[attr_name]
2240            attr_class = getattr(writer, attr_name).__class__
2241            # Coerce the value into the same type as the initial value, if
2242            # needed.
2243            if attr_class not in (type(None), attr_value.__class__):
2244                attr_value = attr_class(attr_value)
2245            setattr(writer, attr_name, attr_value)
2246        # Check dependencies before we try to resume anything.
2247        if any(KeepLocator(ls).permission_expired()
2248               for ls in writer._current_stream_locators):
2249            raise errors.StaleWriterStateError(
2250                "locators include expired permission hint")
2251        writer.check_dependencies()
2252        if state['_current_file'] is not None:
2253            path, pos = state['_current_file']
2254            try:
2255                writer._queued_file = open(path, 'rb')
2256                writer._queued_file.seek(pos)
2257            except IOError as error:
2258                raise errors.StaleWriterStateError(
2259                    u"failed to reopen active file {}: {}".format(path, error))
2260        return writer
def check_dependencies(self):
2262    def check_dependencies(self):
2263        for path, orig_stat in listitems(self._dependencies):
2264            if not S_ISREG(orig_stat[ST_MODE]):
2265                raise errors.StaleWriterStateError(u"{} not file".format(path))
2266            try:
2267                now_stat = tuple(os.stat(path))
2268            except OSError as error:
2269                raise errors.StaleWriterStateError(
2270                    u"failed to stat {}: {}".format(path, error))
2271            if ((not S_ISREG(now_stat[ST_MODE])) or
2272                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2273                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2274                raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=<function ResumableCollectionWriter.<lambda>>):
2276    def dump_state(self, copy_func=lambda x: x):
2277        state = {attr: copy_func(getattr(self, attr))
2278                 for attr in self.STATE_PROPS}
2279        if self._queued_file is None:
2280            state['_current_file'] = None
2281        else:
2282            state['_current_file'] = (os.path.realpath(self._queued_file.name),
2283                                      self._queued_file.tell())
2284        return state
def write(self, data):
2310    def write(self, data):
2311        if self._queued_file is None:
2312            raise errors.AssertionError(
2313                "resumable writer can't accept unsourced data")
2314        return super(ResumableCollectionWriter, self).write(data)