arvados.collection

Tools to work with Arvados collections

This module provides high-level interfaces to create, read, and update Arvados collections. Most users will want to instantiate Collection objects, and use methods like Collection.open and Collection.mkdirs to read and write data in the collection. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4"""Tools to work with Arvados collections
   5
   6This module provides high-level interfaces to create, read, and update
   7Arvados collections. Most users will want to instantiate `Collection`
   8objects, and use methods like `Collection.open` and `Collection.mkdirs` to
   9read and write data in the collection. Refer to the Arvados Python SDK
  10cookbook for [an introduction to using the Collection class][cookbook].
  11
  12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
  13"""
  14
  15import ciso8601
  16import datetime
  17import errno
  18import functools
  19import hashlib
  20import io
  21import logging
  22import os
  23import re
  24import sys
  25import threading
  26import time
  27
  28from collections import deque
  29from stat import *
  30
  31from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
  32from .keep import KeepLocator, KeepClient
  33from ._normalize_stream import normalize_stream, escape
  34from ._ranges import Range, LocatorAndRange
  35from .safeapi import ThreadSafeApiCache
  36import arvados.config as config
  37import arvados.errors as errors
  38import arvados.util
  39import arvados.events as events
  40from arvados.retry import retry_method
  41
  42from typing import (
  43    Any,
  44    Callable,
  45    Dict,
  46    IO,
  47    Iterator,
  48    List,
  49    Mapping,
  50    Optional,
  51    Tuple,
  52    Union,
  53)
  54
  55if sys.version_info < (3, 8):
  56    from typing_extensions import Literal
  57else:
  58    from typing import Literal
  59
  60_logger = logging.getLogger('arvados.collection')
  61
  62ADD = "add"
  63"""Argument value for `Collection` methods to represent an added item"""
  64DEL = "del"
  65"""Argument value for `Collection` methods to represent a removed item"""
  66MOD = "mod"
  67"""Argument value for `Collection` methods to represent a modified item"""
  68TOK = "tok"
  69"""Argument value for `Collection` methods to represent an item with token differences"""
  70FILE = "file"
  71"""`create_type` value for `Collection.find_or_create`"""
  72COLLECTION = "collection"
  73"""`create_type` value for `Collection.find_or_create`"""
  74
  75ChangeList = List[Union[
  76    Tuple[Literal[ADD, DEL], str, 'Collection'],
  77    Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
  78]]
  79ChangeType = Literal[ADD, DEL, MOD, TOK]
  80CollectionItem = Union[ArvadosFile, 'Collection']
  81ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
  82CreateType = Literal[COLLECTION, FILE]
  83Properties = Dict[str, Any]
  84StorageClasses = List[str]
  85
  86class CollectionBase(object):
  87    """Abstract base class for Collection classes
  88
  89    .. ATTENTION:: Internal
  90       This class is meant to be used by other parts of the SDK. User code
  91       should instantiate or subclass `Collection` or one of its subclasses
  92       directly.
  93    """
  94
  95    def __enter__(self):
  96        """Enter a context block with this collection instance"""
  97        return self
  98
  99    def __exit__(self, exc_type, exc_value, traceback):
 100        """Exit a context block with this collection instance"""
 101        pass
 102
 103    def _my_keep(self):
 104        if self._keep_client is None:
 105            self._keep_client = KeepClient(api_client=self._api_client,
 106                                           num_retries=self.num_retries)
 107        return self._keep_client
 108
 109    def stripped_manifest(self) -> str:
 110        """Create a copy of the collection manifest with only size hints
 111
 112        This method returns a string with the current collection's manifest
 113        text with all non-portable locator hints like permission hints and
 114        remote cluster hints removed. The only hints in the returned manifest
 115        will be size hints.
 116        """
 117        raw = self.manifest_text()
 118        clean = []
 119        for line in raw.split("\n"):
 120            fields = line.split()
 121            if fields:
 122                clean_fields = fields[:1] + [
 123                    (re.sub(r'\+[^\d][^\+]*', '', x)
 124                     if re.match(arvados.util.keep_locator_pattern, x)
 125                     else x)
 126                    for x in fields[1:]]
 127                clean += [' '.join(clean_fields), "\n"]
 128        return ''.join(clean)
 129
 130
 131class _WriterFile(_FileLikeObjectBase):
 132    def __init__(self, coll_writer, name):
 133        super(_WriterFile, self).__init__(name, 'wb')
 134        self.dest = coll_writer
 135
 136    def close(self):
 137        super(_WriterFile, self).close()
 138        self.dest.finish_current_file()
 139
 140    @_FileLikeObjectBase._before_close
 141    def write(self, data):
 142        self.dest.write(data)
 143
 144    @_FileLikeObjectBase._before_close
 145    def writelines(self, seq):
 146        for data in seq:
 147            self.write(data)
 148
 149    @_FileLikeObjectBase._before_close
 150    def flush(self):
 151        self.dest.flush_data()
 152
 153
 154class RichCollectionBase(CollectionBase):
 155    """Base class for Collection classes
 156
 157    .. ATTENTION:: Internal
 158       This class is meant to be used by other parts of the SDK. User code
 159       should instantiate or subclass `Collection` or one of its subclasses
 160       directly.
 161    """
 162
 163    def __init__(self, parent=None):
 164        self.parent = parent
 165        self._committed = False
 166        self._has_remote_blocks = False
 167        self._callback = None
 168        self._items = {}
 169
 170    def _my_api(self):
 171        raise NotImplementedError()
 172
 173    def _my_keep(self):
 174        raise NotImplementedError()
 175
 176    def _my_block_manager(self):
 177        raise NotImplementedError()
 178
 179    def writable(self) -> bool:
 180        """Indicate whether this collection object can be modified
 181
 182        This method returns `False` if this object is a `CollectionReader`,
 183        else `True`.
 184        """
 185        raise NotImplementedError()
 186
 187    def root_collection(self) -> 'Collection':
 188        """Get this collection's root collection object
 189
 190        If you open a subcollection with `Collection.find`, calling this method
 191        on that subcollection returns the source Collection object.
 192        """
 193        raise NotImplementedError()
 194
 195    def stream_name(self) -> str:
 196        """Get the name of the manifest stream represented by this collection
 197
 198        If you open a subcollection with `Collection.find`, calling this method
 199        on that subcollection returns the name of the stream you opened.
 200        """
 201        raise NotImplementedError()
 202
 203    @synchronized
 204    def has_remote_blocks(self) -> bool:
 205        """Indiciate whether the collection refers to remote data
 206
 207        Returns `True` if the collection manifest includes any Keep locators
 208        with a remote hint (`+R`), else `False`.
 209        """
 210        if self._has_remote_blocks:
 211            return True
 212        for item in self:
 213            if self[item].has_remote_blocks():
 214                return True
 215        return False
 216
 217    @synchronized
 218    def set_has_remote_blocks(self, val: bool) -> None:
 219        """Cache whether this collection refers to remote blocks
 220
 221        .. ATTENTION:: Internal
 222           This method is only meant to be used by other Collection methods.
 223
 224        Set this collection's cached "has remote blocks" flag to the given
 225        value.
 226        """
 227        self._has_remote_blocks = val
 228        if self.parent:
 229            self.parent.set_has_remote_blocks(val)
 230
 231    @must_be_writable
 232    @synchronized
 233    def find_or_create(
 234            self,
 235            path: str,
 236            create_type: CreateType,
 237    ) -> CollectionItem:
 238        """Get the item at the given path, creating it if necessary
 239
 240        If `path` refers to a stream in this collection, returns a
 241        corresponding `Subcollection` object. If `path` refers to a file in
 242        this collection, returns a corresponding
 243        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 244        this collection, then this method creates a new object and returns
 245        it, creating parent streams as needed. The type of object created is
 246        determined by the value of `create_type`.
 247
 248        Arguments:
 249
 250        * path: str --- The path to find or create within this collection.
 251
 252        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 253          create at `path` if one does not exist. Passing `COLLECTION`
 254          creates a stream and returns the corresponding
 255          `Subcollection`. Passing `FILE` creates a new file and returns the
 256          corresponding `arvados.arvfile.ArvadosFile`.
 257        """
 258        pathcomponents = path.split("/", 1)
 259        if pathcomponents[0]:
 260            item = self._items.get(pathcomponents[0])
 261            if len(pathcomponents) == 1:
 262                if item is None:
 263                    # create new file
 264                    if create_type == COLLECTION:
 265                        item = Subcollection(self, pathcomponents[0])
 266                    else:
 267                        item = ArvadosFile(self, pathcomponents[0])
 268                    self._items[pathcomponents[0]] = item
 269                    self.set_committed(False)
 270                    self.notify(ADD, self, pathcomponents[0], item)
 271                return item
 272            else:
 273                if item is None:
 274                    # create new collection
 275                    item = Subcollection(self, pathcomponents[0])
 276                    self._items[pathcomponents[0]] = item
 277                    self.set_committed(False)
 278                    self.notify(ADD, self, pathcomponents[0], item)
 279                if isinstance(item, RichCollectionBase):
 280                    return item.find_or_create(pathcomponents[1], create_type)
 281                else:
 282                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 283        else:
 284            return self
 285
 286    @synchronized
 287    def find(self, path: str) -> CollectionItem:
 288        """Get the item at the given path
 289
 290        If `path` refers to a stream in this collection, returns a
 291        corresponding `Subcollection` object. If `path` refers to a file in
 292        this collection, returns a corresponding
 293        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 294        this collection, then this method raises `NotADirectoryError`.
 295
 296        Arguments:
 297
 298        * path: str --- The path to find or create within this collection.
 299        """
 300        if not path:
 301            raise errors.ArgumentError("Parameter 'path' is empty.")
 302
 303        pathcomponents = path.split("/", 1)
 304        if pathcomponents[0] == '':
 305            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 306
 307        item = self._items.get(pathcomponents[0])
 308        if item is None:
 309            return None
 310        elif len(pathcomponents) == 1:
 311            return item
 312        else:
 313            if isinstance(item, RichCollectionBase):
 314                if pathcomponents[1]:
 315                    return item.find(pathcomponents[1])
 316                else:
 317                    return item
 318            else:
 319                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 320
 321    @synchronized
 322    def mkdirs(self, path: str) -> 'Subcollection':
 323        """Create and return a subcollection at `path`
 324
 325        If `path` exists within this collection, raises `FileExistsError`.
 326        Otherwise, creates a stream at that path and returns the
 327        corresponding `Subcollection`.
 328        """
 329        if self.find(path) != None:
 330            raise IOError(errno.EEXIST, "Directory or file exists", path)
 331
 332        return self.find_or_create(path, COLLECTION)
 333
 334    def open(
 335            self,
 336            path: str,
 337            mode: str="r",
 338            encoding: Optional[str]=None
 339    ) -> IO:
 340        """Open a file-like object within the collection
 341
 342        This method returns a file-like object that can read and/or write the
 343        file located at `path` within the collection. If you attempt to write
 344        a `path` that does not exist, the file is created with `find_or_create`.
 345        If the file cannot be opened for any other reason, this method raises
 346        `OSError` with an appropriate errno.
 347
 348        Arguments:
 349
 350        * path: str --- The path of the file to open within this collection
 351
 352        * mode: str --- The mode to open this file. Supports all the same
 353          values as `builtins.open`.
 354
 355        * encoding: str | None --- The text encoding of the file. Only used
 356          when the file is opened in text mode. The default is
 357          platform-dependent.
 358
 359        """
 360        if not re.search(r'^[rwa][bt]?\+?$', mode):
 361            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 362
 363        if mode[0] == 'r' and '+' not in mode:
 364            fclass = ArvadosFileReader
 365            arvfile = self.find(path)
 366        elif not self.writable():
 367            raise IOError(errno.EROFS, "Collection is read only")
 368        else:
 369            fclass = ArvadosFileWriter
 370            arvfile = self.find_or_create(path, FILE)
 371
 372        if arvfile is None:
 373            raise IOError(errno.ENOENT, "File not found", path)
 374        if not isinstance(arvfile, ArvadosFile):
 375            raise IOError(errno.EISDIR, "Is a directory", path)
 376
 377        if mode[0] == 'w':
 378            arvfile.truncate(0)
 379
 380        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 381        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 382        if 'b' not in mode:
 383            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 384            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 385        return f
 386
 387    def modified(self) -> bool:
 388        """Indicate whether this collection has an API server record
 389
 390        Returns `False` if this collection corresponds to a record loaded from
 391        the API server, `True` otherwise.
 392        """
 393        return not self.committed()
 394
 395    @synchronized
 396    def committed(self):
 397        """Indicate whether this collection has an API server record
 398
 399        Returns `True` if this collection corresponds to a record loaded from
 400        the API server, `False` otherwise.
 401        """
 402        return self._committed
 403
 404    @synchronized
 405    def set_committed(self, value: bool=True):
 406        """Cache whether this collection has an API server record
 407
 408        .. ATTENTION:: Internal
 409           This method is only meant to be used by other Collection methods.
 410
 411        Set this collection's cached "committed" flag to the given
 412        value and propagates it as needed.
 413        """
 414        if value == self._committed:
 415            return
 416        if value:
 417            for k,v in self._items.items():
 418                v.set_committed(True)
 419            self._committed = True
 420        else:
 421            self._committed = False
 422            if self.parent is not None:
 423                self.parent.set_committed(False)
 424
 425    @synchronized
 426    def __iter__(self) -> Iterator[str]:
 427        """Iterate names of streams and files in this collection
 428
 429        This method does not recurse. It only iterates the contents of this
 430        collection's corresponding stream.
 431        """
 432        return iter(self._items)
 433
 434    @synchronized
 435    def __getitem__(self, k: str) -> CollectionItem:
 436        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 437
 438        This method does not recurse. If you want to search a path, use
 439        `RichCollectionBase.find` instead.
 440        """
 441        return self._items[k]
 442
 443    @synchronized
 444    def __contains__(self, k: str) -> bool:
 445        """Indicate whether this collection has an item with this name
 446
 447        This method does not recurse. It you want to check a path, use
 448        `RichCollectionBase.exists` instead.
 449        """
 450        return k in self._items
 451
 452    @synchronized
 453    def __len__(self):
 454        """Get the number of items directly contained in this collection
 455
 456        This method does not recurse. It only counts the streams and files
 457        in this collection's corresponding stream.
 458        """
 459        return len(self._items)
 460
 461    @must_be_writable
 462    @synchronized
 463    def __delitem__(self, p: str) -> None:
 464        """Delete an item from this collection's stream
 465
 466        This method does not recurse. If you want to remove an item by a
 467        path, use `RichCollectionBase.remove` instead.
 468        """
 469        del self._items[p]
 470        self.set_committed(False)
 471        self.notify(DEL, self, p, None)
 472
 473    @synchronized
 474    def keys(self) -> Iterator[str]:
 475        """Iterate names of streams and files in this collection
 476
 477        This method does not recurse. It only iterates the contents of this
 478        collection's corresponding stream.
 479        """
 480        return self._items.keys()
 481
 482    @synchronized
 483    def values(self) -> List[CollectionItem]:
 484        """Get a list of objects in this collection's stream
 485
 486        The return value includes a `Subcollection` for every stream, and an
 487        `arvados.arvfile.ArvadosFile` for every file, directly within this
 488        collection's stream.  This method does not recurse.
 489        """
 490        return list(self._items.values())
 491
 492    @synchronized
 493    def items(self) -> List[Tuple[str, CollectionItem]]:
 494        """Get a list of `(name, object)` tuples from this collection's stream
 495
 496        The return value includes a `Subcollection` for every stream, and an
 497        `arvados.arvfile.ArvadosFile` for every file, directly within this
 498        collection's stream.  This method does not recurse.
 499        """
 500        return list(self._items.items())
 501
 502    def exists(self, path: str) -> bool:
 503        """Indicate whether this collection includes an item at `path`
 504
 505        This method returns `True` if `path` refers to a stream or file within
 506        this collection, else `False`.
 507
 508        Arguments:
 509
 510        * path: str --- The path to check for existence within this collection
 511        """
 512        return self.find(path) is not None
 513
 514    @must_be_writable
 515    @synchronized
 516    def remove(self, path: str, recursive: bool=False) -> None:
 517        """Remove the file or stream at `path`
 518
 519        Arguments:
 520
 521        * path: str --- The path of the item to remove from the collection
 522
 523        * recursive: bool --- Controls the method's behavior if `path` refers
 524          to a nonempty stream. If `False` (the default), this method raises
 525          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 526          items under the stream.
 527        """
 528        if not path:
 529            raise errors.ArgumentError("Parameter 'path' is empty.")
 530
 531        pathcomponents = path.split("/", 1)
 532        item = self._items.get(pathcomponents[0])
 533        if item is None:
 534            raise IOError(errno.ENOENT, "File not found", path)
 535        if len(pathcomponents) == 1:
 536            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 537                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 538            deleteditem = self._items[pathcomponents[0]]
 539            del self._items[pathcomponents[0]]
 540            self.set_committed(False)
 541            self.notify(DEL, self, pathcomponents[0], deleteditem)
 542        else:
 543            item.remove(pathcomponents[1], recursive=recursive)
 544
 545    def _clonefrom(self, source):
 546        for k,v in source.items():
 547            self._items[k] = v.clone(self, k)
 548
 549    def clone(self):
 550        raise NotImplementedError()
 551
 552    @must_be_writable
 553    @synchronized
 554    def add(
 555            self,
 556            source_obj: CollectionItem,
 557            target_name: str,
 558            overwrite: bool=False,
 559            reparent: bool=False,
 560    ) -> None:
 561        """Copy or move a file or subcollection object to this collection
 562
 563        Arguments:
 564
 565        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 566          to add to this collection
 567
 568        * target_name: str --- The path inside this collection where
 569          `source_obj` should be added.
 570
 571        * overwrite: bool --- Controls the behavior of this method when the
 572          collection already contains an object at `target_name`. If `False`
 573          (the default), this method will raise `FileExistsError`. If `True`,
 574          the object at `target_name` will be replaced with `source_obj`.
 575
 576        * reparent: bool --- Controls whether this method copies or moves
 577          `source_obj`. If `False` (the default), `source_obj` is copied into
 578          this collection. If `True`, `source_obj` is moved into this
 579          collection.
 580        """
 581        if target_name in self and not overwrite:
 582            raise IOError(errno.EEXIST, "File already exists", target_name)
 583
 584        modified_from = None
 585        if target_name in self:
 586            modified_from = self[target_name]
 587
 588        # Actually make the move or copy.
 589        if reparent:
 590            source_obj._reparent(self, target_name)
 591            item = source_obj
 592        else:
 593            item = source_obj.clone(self, target_name)
 594
 595        self._items[target_name] = item
 596        self.set_committed(False)
 597        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 598            self.set_has_remote_blocks(True)
 599
 600        if modified_from:
 601            self.notify(MOD, self, target_name, (modified_from, item))
 602        else:
 603            self.notify(ADD, self, target_name, item)
 604
 605    def _get_src_target(self, source, target_path, source_collection, create_dest):
 606        if source_collection is None:
 607            source_collection = self
 608
 609        # Find the object
 610        if isinstance(source, str):
 611            source_obj = source_collection.find(source)
 612            if source_obj is None:
 613                raise IOError(errno.ENOENT, "File not found", source)
 614            sourcecomponents = source.split("/")
 615        else:
 616            source_obj = source
 617            sourcecomponents = None
 618
 619        # Find parent collection the target path
 620        targetcomponents = target_path.split("/")
 621
 622        # Determine the name to use.
 623        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 624
 625        if not target_name:
 626            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 627
 628        if create_dest:
 629            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 630        else:
 631            if len(targetcomponents) > 1:
 632                target_dir = self.find("/".join(targetcomponents[0:-1]))
 633            else:
 634                target_dir = self
 635
 636        if target_dir is None:
 637            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 638
 639        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 640            target_dir = target_dir[target_name]
 641            target_name = sourcecomponents[-1]
 642
 643        return (source_obj, target_dir, target_name)
 644
 645    @must_be_writable
 646    @synchronized
 647    def copy(
 648            self,
 649            source: Union[str, CollectionItem],
 650            target_path: str,
 651            source_collection: Optional['RichCollectionBase']=None,
 652            overwrite: bool=False,
 653    ) -> None:
 654        """Copy a file or subcollection object to this collection
 655
 656        Arguments:
 657
 658        * source: str | arvados.arvfile.ArvadosFile |
 659          arvados.collection.Subcollection --- The file or subcollection to
 660          add to this collection. If `source` is a str, the object will be
 661          found by looking up this path from `source_collection` (see
 662          below).
 663
 664        * target_path: str --- The path inside this collection where the
 665          source object should be added.
 666
 667        * source_collection: arvados.collection.Collection | None --- The
 668          collection to find the source object from when `source` is a
 669          path. Defaults to the current collection (`self`).
 670
 671        * overwrite: bool --- Controls the behavior of this method when the
 672          collection already contains an object at `target_path`. If `False`
 673          (the default), this method will raise `FileExistsError`. If `True`,
 674          the object at `target_path` will be replaced with `source_obj`.
 675        """
 676        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 677        target_dir.add(source_obj, target_name, overwrite, False)
 678
 679    @must_be_writable
 680    @synchronized
 681    def rename(
 682            self,
 683            source: Union[str, CollectionItem],
 684            target_path: str,
 685            source_collection: Optional['RichCollectionBase']=None,
 686            overwrite: bool=False,
 687    ) -> None:
 688        """Move a file or subcollection object to this collection
 689
 690        Arguments:
 691
 692        * source: str | arvados.arvfile.ArvadosFile |
 693          arvados.collection.Subcollection --- The file or subcollection to
 694          add to this collection. If `source` is a str, the object will be
 695          found by looking up this path from `source_collection` (see
 696          below).
 697
 698        * target_path: str --- The path inside this collection where the
 699          source object should be added.
 700
 701        * source_collection: arvados.collection.Collection | None --- The
 702          collection to find the source object from when `source` is a
 703          path. Defaults to the current collection (`self`).
 704
 705        * overwrite: bool --- Controls the behavior of this method when the
 706          collection already contains an object at `target_path`. If `False`
 707          (the default), this method will raise `FileExistsError`. If `True`,
 708          the object at `target_path` will be replaced with `source_obj`.
 709        """
 710        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 711        if not source_obj.writable():
 712            raise IOError(errno.EROFS, "Source collection is read only", source)
 713        target_dir.add(source_obj, target_name, overwrite, True)
 714
 715    def portable_manifest_text(self, stream_name: str=".") -> str:
 716        """Get the portable manifest text for this collection
 717
 718        The portable manifest text is normalized, and does not include access
 719        tokens. This method does not flush outstanding blocks to Keep.
 720
 721        Arguments:
 722
 723        * stream_name: str --- The name to use for this collection's stream in
 724          the generated manifest. Default `'.'`.
 725        """
 726        return self._get_manifest_text(stream_name, True, True)
 727
 728    @synchronized
 729    def manifest_text(
 730            self,
 731            stream_name: str=".",
 732            strip: bool=False,
 733            normalize: bool=False,
 734            only_committed: bool=False,
 735    ) -> str:
 736        """Get the manifest text for this collection
 737
 738        Arguments:
 739
 740        * stream_name: str --- The name to use for this collection's stream in
 741          the generated manifest. Default `'.'`.
 742
 743        * strip: bool --- Controls whether or not the returned manifest text
 744          includes access tokens. If `False` (the default), the manifest text
 745          will include access tokens. If `True`, the manifest text will not
 746          include access tokens.
 747
 748        * normalize: bool --- Controls whether or not the returned manifest
 749          text is normalized. Default `False`.
 750
 751        * only_committed: bool --- Controls whether or not this method uploads
 752          pending data to Keep before building and returning the manifest text.
 753          If `False` (the default), this method will finish uploading all data
 754          to Keep, then return the final manifest. If `True`, this method will
 755          build and return a manifest that only refers to the data that has
 756          finished uploading at the time this method was called.
 757        """
 758        if not only_committed:
 759            self._my_block_manager().commit_all()
 760        return self._get_manifest_text(stream_name, strip, normalize,
 761                                       only_committed=only_committed)
 762
 763    @synchronized
 764    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 765        """Get the manifest text for this collection, sub collections and files.
 766
 767        :stream_name:
 768          Name to use for this stream (directory)
 769
 770        :strip:
 771          If True, remove signing tokens from block locators if present.
 772          If False (default), block locators are left unchanged.
 773
 774        :normalize:
 775          If True, always export the manifest text in normalized form
 776          even if the Collection is not modified.  If False (default) and the collection
 777          is not modified, return the original manifest text even if it is not
 778          in normalized form.
 779
 780        :only_committed:
 781          If True, only include blocks that were already committed to Keep.
 782
 783        """
 784
 785        if not self.committed() or self._manifest_text is None or normalize:
 786            stream = {}
 787            buf = []
 788            sorted_keys = sorted(self.keys())
 789            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 790                # Create a stream per file `k`
 791                arvfile = self[filename]
 792                filestream = []
 793                for segment in arvfile.segments():
 794                    loc = segment.locator
 795                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 796                        if only_committed:
 797                            continue
 798                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 799                    if strip:
 800                        loc = KeepLocator(loc).stripped()
 801                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
 802                                         segment.segment_offset, segment.range_size))
 803                stream[filename] = filestream
 804            if stream:
 805                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
 806            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 807                buf.append(self[dirname].manifest_text(
 808                    stream_name=os.path.join(stream_name, dirname),
 809                    strip=strip, normalize=True, only_committed=only_committed))
 810            return "".join(buf)
 811        else:
 812            if strip:
 813                return self.stripped_manifest()
 814            else:
 815                return self._manifest_text
 816
 817    @synchronized
 818    def _copy_remote_blocks(self, remote_blocks={}):
 819        """Scan through the entire collection and ask Keep to copy remote blocks.
 820
 821        When accessing a remote collection, blocks will have a remote signature
 822        (+R instead of +A). Collect these signatures and request Keep to copy the
 823        blocks to the local cluster, returning local (+A) signatures.
 824
 825        :remote_blocks:
 826          Shared cache of remote to local block mappings. This is used to avoid
 827          doing extra work when blocks are shared by more than one file in
 828          different subdirectories.
 829
 830        """
 831        for item in self:
 832            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 833        return remote_blocks
 834
 835    @synchronized
 836    def diff(
 837            self,
 838            end_collection: 'RichCollectionBase',
 839            prefix: str=".",
 840            holding_collection: Optional['Collection']=None,
 841    ) -> ChangeList:
 842        """Build a list of differences between this collection and another
 843
 844        Arguments:
 845
 846        * end_collection: arvados.collection.RichCollectionBase --- A
 847          collection object with the desired end state. The returned diff
 848          list will describe how to go from the current collection object
 849          `self` to `end_collection`.
 850
 851        * prefix: str --- The name to use for this collection's stream in
 852          the diff list. Default `'.'`.
 853
 854        * holding_collection: arvados.collection.Collection | None --- A
 855          collection object used to hold objects for the returned diff
 856          list. By default, a new empty collection is created.
 857        """
 858        changes = []
 859        if holding_collection is None:
 860            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 861        for k in self:
 862            if k not in end_collection:
 863               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 864        for k in end_collection:
 865            if k in self:
 866                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 867                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 868                elif end_collection[k] != self[k]:
 869                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 870                else:
 871                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 872            else:
 873                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 874        return changes
 875
 876    @must_be_writable
 877    @synchronized
 878    def apply(self, changes: ChangeList) -> None:
 879        """Apply a list of changes from to this collection
 880
 881        This method takes a list of changes generated by
 882        `RichCollectionBase.diff` and applies it to this
 883        collection. Afterward, the state of this collection object will
 884        match the state of `end_collection` passed to `diff`. If a change
 885        conflicts with a local change, it will be saved to an alternate path
 886        indicating the conflict.
 887
 888        Arguments:
 889
 890        * changes: arvados.collection.ChangeList --- The list of differences
 891          generated by `RichCollectionBase.diff`.
 892        """
 893        if changes:
 894            self.set_committed(False)
 895        for change in changes:
 896            event_type = change[0]
 897            path = change[1]
 898            initial = change[2]
 899            local = self.find(path)
 900            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 901                                                                    time.gmtime()))
 902            if event_type == ADD:
 903                if local is None:
 904                    # No local file at path, safe to copy over new file
 905                    self.copy(initial, path)
 906                elif local is not None and local != initial:
 907                    # There is already local file and it is different:
 908                    # save change to conflict file.
 909                    self.copy(initial, conflictpath)
 910            elif event_type == MOD or event_type == TOK:
 911                final = change[3]
 912                if local == initial:
 913                    # Local matches the "initial" item so it has not
 914                    # changed locally and is safe to update.
 915                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 916                        # Replace contents of local file with new contents
 917                        local.replace_contents(final)
 918                    else:
 919                        # Overwrite path with new item; this can happen if
 920                        # path was a file and is now a collection or vice versa
 921                        self.copy(final, path, overwrite=True)
 922                else:
 923                    # Local is missing (presumably deleted) or local doesn't
 924                    # match the "start" value, so save change to conflict file
 925                    self.copy(final, conflictpath)
 926            elif event_type == DEL:
 927                if local == initial:
 928                    # Local item matches "initial" value, so it is safe to remove.
 929                    self.remove(path, recursive=True)
 930                # else, the file is modified or already removed, in either
 931                # case we don't want to try to remove it.
 932
 933    def portable_data_hash(self) -> str:
 934        """Get the portable data hash for this collection's manifest"""
 935        if self._manifest_locator and self.committed():
 936            # If the collection is already saved on the API server, and it's committed
 937            # then return API server's PDH response.
 938            return self._portable_data_hash
 939        else:
 940            stripped = self.portable_manifest_text().encode()
 941            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 942
 943    @synchronized
 944    def subscribe(self, callback: ChangeCallback) -> None:
 945        """Set a notify callback for changes to this collection
 946
 947        Arguments:
 948
 949        * callback: arvados.collection.ChangeCallback --- The callable to
 950          call each time the collection is changed.
 951        """
 952        if self._callback is None:
 953            self._callback = callback
 954        else:
 955            raise errors.ArgumentError("A callback is already set on this collection.")
 956
 957    @synchronized
 958    def unsubscribe(self) -> None:
 959        """Remove any notify callback set for changes to this collection"""
 960        if self._callback is not None:
 961            self._callback = None
 962
 963    @synchronized
 964    def notify(
 965            self,
 966            event: ChangeType,
 967            collection: 'RichCollectionBase',
 968            name: str,
 969            item: CollectionItem,
 970    ) -> None:
 971        """Notify any subscribed callback about a change to this collection
 972
 973        .. ATTENTION:: Internal
 974           This method is only meant to be used by other Collection methods.
 975
 976        If a callback has been registered with `RichCollectionBase.subscribe`,
 977        it will be called with information about a change to this collection.
 978        Then this notification will be propagated to this collection's root.
 979
 980        Arguments:
 981
 982        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 983          the collection.
 984
 985        * collection: arvados.collection.RichCollectionBase --- The
 986          collection that was modified.
 987
 988        * name: str --- The name of the file or stream within `collection` that
 989          was modified.
 990
 991        * item: arvados.arvfile.ArvadosFile |
 992          arvados.collection.Subcollection --- The new contents at `name`
 993          within `collection`.
 994        """
 995        if self._callback:
 996            self._callback(event, collection, name, item)
 997        self.root_collection().notify(event, collection, name, item)
 998
 999    @synchronized
1000    def __eq__(self, other: Any) -> bool:
1001        """Indicate whether this collection object is equal to another"""
1002        if other is self:
1003            return True
1004        if not isinstance(other, RichCollectionBase):
1005            return False
1006        if len(self._items) != len(other):
1007            return False
1008        for k in self._items:
1009            if k not in other:
1010                return False
1011            if self._items[k] != other[k]:
1012                return False
1013        return True
1014
1015    def __ne__(self, other: Any) -> bool:
1016        """Indicate whether this collection object is not equal to another"""
1017        return not self.__eq__(other)
1018
1019    @synchronized
1020    def flush(self) -> None:
1021        """Upload any pending data to Keep"""
1022        for e in self.values():
1023            e.flush()
1024
1025
1026class Collection(RichCollectionBase):
1027    """Read and manipulate an Arvados collection
1028
1029    This class provides a high-level interface to create, read, and update
1030    Arvados collections and their contents. Refer to the Arvados Python SDK
1031    cookbook for [an introduction to using the Collection class][cookbook].
1032
1033    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1034    """
1035
1036    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1037                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1038                 keep_client: Optional['arvados.keep.KeepClient']=None,
1039                 num_retries: int=10,
1040                 parent: Optional['Collection']=None,
1041                 apiconfig: Optional[Mapping[str, str]]=None,
1042                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1043                 replication_desired: Optional[int]=None,
1044                 storage_classes_desired: Optional[List[str]]=None,
1045                 put_threads: Optional[int]=None):
1046        """Initialize a Collection object
1047
1048        Arguments:
1049
1050        * manifest_locator_or_text: str | None --- This string can contain a
1051          collection manifest text, portable data hash, or UUID. When given a
1052          portable data hash or UUID, this instance will load a collection
1053          record from the API server. Otherwise, this instance will represent a
1054          new collection without an API server record. The default value `None`
1055          instantiates a new collection with an empty manifest.
1056
1057        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1058          Arvados API client object this instance uses to make requests. If
1059          none is given, this instance creates its own client using the
1060          settings from `apiconfig` (see below). If your client instantiates
1061          many Collection objects, you can help limit memory utilization by
1062          calling `arvados.api.api` to construct an
1063          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1064          for every Collection.
1065
1066        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1067          object this instance uses to make requests. If none is given, this
1068          instance creates its own client using its `api_client`.
1069
1070        * num_retries: int --- The number of times that client requests are
1071          retried. Default 10.
1072
1073        * parent: arvados.collection.Collection | None --- The parent Collection
1074          object of this instance, if any. This argument is primarily used by
1075          other Collection methods; user client code shouldn't need to use it.
1076
1077        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1078          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1079          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1080          Collection object constructs one from these settings. If no
1081          mapping is provided, calls `arvados.config.settings` to get these
1082          parameters from user configuration.
1083
1084        * block_manager: arvados.arvfile._BlockManager | None --- The
1085          _BlockManager object used by this instance to coordinate reading
1086          and writing Keep data blocks. If none is given, this instance
1087          constructs its own. This argument is primarily used by other
1088          Collection methods; user client code shouldn't need to use it.
1089
1090        * replication_desired: int | None --- This controls both the value of
1091          the `replication_desired` field on API collection records saved by
1092          this class, as well as the number of Keep services that the object
1093          writes new data blocks to. If none is given, uses the default value
1094          configured for the cluster.
1095
1096        * storage_classes_desired: list[str] | None --- This controls both
1097          the value of the `storage_classes_desired` field on API collection
1098          records saved by this class, as well as selecting which specific
1099          Keep services the object writes new data blocks to. If none is
1100          given, defaults to an empty list.
1101
1102        * put_threads: int | None --- The number of threads to run
1103          simultaneously to upload data blocks to Keep. This value is used when
1104          building a new `block_manager`. It is unused when a `block_manager`
1105          is provided.
1106        """
1107
1108        if storage_classes_desired and type(storage_classes_desired) is not list:
1109            raise errors.ArgumentError("storage_classes_desired must be list type.")
1110
1111        super(Collection, self).__init__(parent)
1112        self._api_client = api_client
1113        self._keep_client = keep_client
1114
1115        # Use the keep client from ThreadSafeApiCache
1116        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1117            self._keep_client = self._api_client.keep
1118
1119        self._block_manager = block_manager
1120        self.replication_desired = replication_desired
1121        self._storage_classes_desired = storage_classes_desired
1122        self.put_threads = put_threads
1123
1124        if apiconfig:
1125            self._config = apiconfig
1126        else:
1127            self._config = config.settings()
1128
1129        self.num_retries = num_retries
1130        self._manifest_locator = None
1131        self._manifest_text = None
1132        self._portable_data_hash = None
1133        self._api_response = None
1134        self._past_versions = set()
1135
1136        self.lock = threading.RLock()
1137        self.events = None
1138
1139        if manifest_locator_or_text:
1140            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1141                self._manifest_locator = manifest_locator_or_text
1142            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1143                self._manifest_locator = manifest_locator_or_text
1144                if not self._has_local_collection_uuid():
1145                    self._has_remote_blocks = True
1146            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1147                self._manifest_text = manifest_locator_or_text
1148                if '+R' in self._manifest_text:
1149                    self._has_remote_blocks = True
1150            else:
1151                raise errors.ArgumentError(
1152                    "Argument to CollectionReader is not a manifest or a collection UUID")
1153
1154            try:
1155                self._populate()
1156            except errors.SyntaxError as e:
1157                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1158
1159    def storage_classes_desired(self) -> List[str]:
1160        """Get this collection's `storage_classes_desired` value"""
1161        return self._storage_classes_desired or []
1162
1163    def root_collection(self) -> 'Collection':
1164        return self
1165
1166    def get_properties(self) -> Properties:
1167        """Get this collection's properties
1168
1169        This method always returns a dict. If this collection object does not
1170        have an associated API record, or that record does not have any
1171        properties set, this method returns an empty dict.
1172        """
1173        if self._api_response and self._api_response["properties"]:
1174            return self._api_response["properties"]
1175        else:
1176            return {}
1177
1178    def get_trash_at(self) -> Optional[datetime.datetime]:
1179        """Get this collection's `trash_at` field
1180
1181        This method parses the `trash_at` field of the collection's API
1182        record and returns a datetime from it. If that field is not set, or
1183        this collection object does not have an associated API record,
1184        returns None.
1185        """
1186        if self._api_response and self._api_response["trash_at"]:
1187            try:
1188                return ciso8601.parse_datetime(self._api_response["trash_at"])
1189            except ValueError:
1190                return None
1191        else:
1192            return None
1193
1194    def stream_name(self) -> str:
1195        return "."
1196
1197    def writable(self) -> bool:
1198        return True
1199
1200    @synchronized
1201    def known_past_version(
1202            self,
1203            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1204    ) -> bool:
1205        """Indicate whether an API record for this collection has been seen before
1206
1207        As this collection object loads records from the API server, it records
1208        their `modified_at` and `portable_data_hash` fields. This method accepts
1209        a 2-tuple with values for those fields, and returns `True` if the
1210        combination was previously loaded.
1211        """
1212        return modified_at_and_portable_data_hash in self._past_versions
1213
1214    @synchronized
1215    @retry_method
1216    def update(
1217            self,
1218            other: Optional['Collection']=None,
1219            num_retries: Optional[int]=None,
1220    ) -> None:
1221        """Merge another collection's contents into this one
1222
1223        This method compares the manifest of this collection instance with
1224        another, then updates this instance's manifest with changes from the
1225        other, renaming files to flag conflicts where necessary.
1226
1227        When called without any arguments, this method reloads the collection's
1228        API record, and updates this instance with any changes that have
1229        appeared server-side. If this instance does not have a corresponding
1230        API record, this method raises `arvados.errors.ArgumentError`.
1231
1232        Arguments:
1233
1234        * other: arvados.collection.Collection | None --- The collection
1235          whose contents should be merged into this instance. When not
1236          provided, this method reloads this collection's API record and
1237          constructs a Collection object from it.  If this instance does not
1238          have a corresponding API record, this method raises
1239          `arvados.errors.ArgumentError`.
1240
1241        * num_retries: int | None --- The number of times to retry reloading
1242          the collection's API record from the API server. If not specified,
1243          uses the `num_retries` provided when this instance was constructed.
1244        """
1245        if other is None:
1246            if self._manifest_locator is None:
1247                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1248            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1250                response.get("portable_data_hash") != self.portable_data_hash()):
1251                # The record on the server is different from our current one, but we've seen it before,
1252                # so ignore it because it's already been merged.
1253                # However, if it's the same as our current record, proceed with the update, because we want to update
1254                # our tokens.
1255                return
1256            else:
1257                self._remember_api_response(response)
1258            other = CollectionReader(response["manifest_text"])
1259        baseline = CollectionReader(self._manifest_text)
1260        self.apply(baseline.diff(other))
1261        self._manifest_text = self.manifest_text()
1262
1263    @synchronized
1264    def _my_api(self):
1265        if self._api_client is None:
1266            self._api_client = ThreadSafeApiCache(self._config, version='v1')
1267            if self._keep_client is None:
1268                self._keep_client = self._api_client.keep
1269        return self._api_client
1270
1271    @synchronized
1272    def _my_keep(self):
1273        if self._keep_client is None:
1274            if self._api_client is None:
1275                self._my_api()
1276            else:
1277                self._keep_client = KeepClient(api_client=self._api_client)
1278        return self._keep_client
1279
1280    @synchronized
1281    def _my_block_manager(self):
1282        if self._block_manager is None:
1283            copies = (self.replication_desired or
1284                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1285                                                   2))
1286            self._block_manager = _BlockManager(self._my_keep(),
1287                                                copies=copies,
1288                                                put_threads=self.put_threads,
1289                                                num_retries=self.num_retries,
1290                                                storage_classes_func=self.storage_classes_desired)
1291        return self._block_manager
1292
1293    def _remember_api_response(self, response):
1294        self._api_response = response
1295        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1296
1297    def _populate_from_api_server(self):
1298        # As in KeepClient itself, we must wait until the last
1299        # possible moment to instantiate an API client, in order to
1300        # avoid tripping up clients that don't have access to an API
1301        # server.  If we do build one, make sure our Keep client uses
1302        # it.  If instantiation fails, we'll fall back to the except
1303        # clause, just like any other Collection lookup
1304        # failure. Return an exception, or None if successful.
1305        self._remember_api_response(self._my_api().collections().get(
1306            uuid=self._manifest_locator).execute(
1307                num_retries=self.num_retries))
1308        self._manifest_text = self._api_response['manifest_text']
1309        self._portable_data_hash = self._api_response['portable_data_hash']
1310        # If not overriden via kwargs, we should try to load the
1311        # replication_desired and storage_classes_desired from the API server
1312        if self.replication_desired is None:
1313            self.replication_desired = self._api_response.get('replication_desired', None)
1314        if self._storage_classes_desired is None:
1315            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1316
1317    def _populate(self):
1318        if self._manifest_text is None:
1319            if self._manifest_locator is None:
1320                return
1321            else:
1322                self._populate_from_api_server()
1323        self._baseline_manifest = self._manifest_text
1324        self._import_manifest(self._manifest_text)
1325
1326    def _has_collection_uuid(self):
1327        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1328
1329    def _has_local_collection_uuid(self):
1330        return self._has_collection_uuid and \
1331            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1332
1333    def __enter__(self):
1334        return self
1335
1336    def __exit__(self, exc_type, exc_value, traceback):
1337        """Exit a context with this collection instance
1338
1339        If no exception was raised inside the context block, and this
1340        collection is writable and has a corresponding API record, that
1341        record will be updated to match the state of this instance at the end
1342        of the block.
1343        """
1344        if exc_type is None:
1345            if self.writable() and self._has_collection_uuid():
1346                self.save()
1347        self.stop_threads()
1348
1349    def stop_threads(self) -> None:
1350        """Stop background Keep upload/download threads"""
1351        if self._block_manager is not None:
1352            self._block_manager.stop_threads()
1353
1354    @synchronized
1355    def manifest_locator(self) -> Optional[str]:
1356        """Get this collection's manifest locator, if any
1357
1358        * If this collection instance is associated with an API record with a
1359          UUID, return that.
1360        * Otherwise, if this collection instance was loaded from an API record
1361          by portable data hash, return that.
1362        * Otherwise, return `None`.
1363        """
1364        return self._manifest_locator
1365
1366    @synchronized
1367    def clone(
1368            self,
1369            new_parent: Optional['Collection']=None,
1370            new_name: Optional[str]=None,
1371            readonly: bool=False,
1372            new_config: Optional[Mapping[str, str]]=None,
1373    ) -> 'Collection':
1374        """Create a Collection object with the same contents as this instance
1375
1376        This method creates a new Collection object with contents that match
1377        this instance's. The new collection will not be associated with any API
1378        record.
1379
1380        Arguments:
1381
1382        * new_parent: arvados.collection.Collection | None --- This value is
1383          passed to the new Collection's constructor as the `parent`
1384          argument.
1385
1386        * new_name: str | None --- This value is unused.
1387
1388        * readonly: bool --- If this value is true, this method constructs and
1389          returns a `CollectionReader`. Otherwise, it returns a mutable
1390          `Collection`. Default `False`.
1391
1392        * new_config: Mapping[str, str] | None --- This value is passed to the
1393          new Collection's constructor as `apiconfig`. If no value is provided,
1394          defaults to the configuration passed to this instance's constructor.
1395        """
1396        if new_config is None:
1397            new_config = self._config
1398        if readonly:
1399            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1400        else:
1401            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1402
1403        newcollection._clonefrom(self)
1404        return newcollection
1405
1406    @synchronized
1407    def api_response(self) -> Optional[Dict[str, Any]]:
1408        """Get this instance's associated API record
1409
1410        If this Collection instance has an associated API record, return it.
1411        Otherwise, return `None`.
1412        """
1413        return self._api_response
1414
1415    def find_or_create(
1416            self,
1417            path: str,
1418            create_type: CreateType,
1419    ) -> CollectionItem:
1420        if path == ".":
1421            return self
1422        else:
1423            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1424
1425    def find(self, path: str) -> CollectionItem:
1426        if path == ".":
1427            return self
1428        else:
1429            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1430
1431    def remove(self, path: str, recursive: bool=False) -> None:
1432        if path == ".":
1433            raise errors.ArgumentError("Cannot remove '.'")
1434        else:
1435            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1436
1437    @must_be_writable
1438    @synchronized
1439    @retry_method
1440    def save(
1441            self,
1442            properties: Optional[Properties]=None,
1443            storage_classes: Optional[StorageClasses]=None,
1444            trash_at: Optional[datetime.datetime]=None,
1445            merge: bool=True,
1446            num_retries: Optional[int]=None,
1447            preserve_version: bool=False,
1448    ) -> str:
1449        """Save collection to an existing API record
1450
1451        This method updates the instance's corresponding API record to match
1452        the instance's state. If this instance does not have a corresponding API
1453        record yet, raises `AssertionError`. (To create a new API record, use
1454        `Collection.save_new`.) This method returns the saved collection
1455        manifest.
1456
1457        Arguments:
1458
1459        * properties: dict[str, Any] | None --- If provided, the API record will
1460          be updated with these properties. Note this will completely replace
1461          any existing properties.
1462
1463        * storage_classes: list[str] | None --- If provided, the API record will
1464          be updated with this value in the `storage_classes_desired` field.
1465          This value will also be saved on the instance and used for any
1466          changes that follow.
1467
1468        * trash_at: datetime.datetime | None --- If provided, the API record
1469          will be updated with this value in the `trash_at` field.
1470
1471        * merge: bool --- If `True` (the default), this method will first
1472          reload this collection's API record, and merge any new contents into
1473          this instance before saving changes. See `Collection.update` for
1474          details.
1475
1476        * num_retries: int | None --- The number of times to retry reloading
1477          the collection's API record from the API server. If not specified,
1478          uses the `num_retries` provided when this instance was constructed.
1479
1480        * preserve_version: bool --- This value will be passed to directly
1481          to the underlying API call. If `True`, the Arvados API will
1482          preserve the versions of this collection both immediately before
1483          and after the update. If `True` when the API server is not
1484          configured with collection versioning, this method raises
1485          `arvados.errors.ArgumentError`.
1486        """
1487        if properties and type(properties) is not dict:
1488            raise errors.ArgumentError("properties must be dictionary type.")
1489
1490        if storage_classes and type(storage_classes) is not list:
1491            raise errors.ArgumentError("storage_classes must be list type.")
1492        if storage_classes:
1493            self._storage_classes_desired = storage_classes
1494
1495        if trash_at and type(trash_at) is not datetime.datetime:
1496            raise errors.ArgumentError("trash_at must be datetime type.")
1497
1498        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1499            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1500
1501        body={}
1502        if properties:
1503            body["properties"] = properties
1504        if self.storage_classes_desired():
1505            body["storage_classes_desired"] = self.storage_classes_desired()
1506        if trash_at:
1507            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1508            body["trash_at"] = t
1509        if preserve_version:
1510            body["preserve_version"] = preserve_version
1511
1512        if not self.committed():
1513            if self._has_remote_blocks:
1514                # Copy any remote blocks to the local cluster.
1515                self._copy_remote_blocks(remote_blocks={})
1516                self._has_remote_blocks = False
1517            if not self._has_collection_uuid():
1518                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1519            elif not self._has_local_collection_uuid():
1520                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1521
1522            self._my_block_manager().commit_all()
1523
1524            if merge:
1525                self.update()
1526
1527            text = self.manifest_text(strip=False)
1528            body['manifest_text'] = text
1529
1530            self._remember_api_response(self._my_api().collections().update(
1531                uuid=self._manifest_locator,
1532                body=body
1533                ).execute(num_retries=num_retries))
1534            self._manifest_text = self._api_response["manifest_text"]
1535            self._portable_data_hash = self._api_response["portable_data_hash"]
1536            self.set_committed(True)
1537        elif body:
1538            self._remember_api_response(self._my_api().collections().update(
1539                uuid=self._manifest_locator,
1540                body=body
1541                ).execute(num_retries=num_retries))
1542
1543        return self._manifest_text
1544
1545
1546    @must_be_writable
1547    @synchronized
1548    @retry_method
1549    def save_new(
1550            self,
1551            name: Optional[str]=None,
1552            create_collection_record: bool=True,
1553            owner_uuid: Optional[str]=None,
1554            properties: Optional[Properties]=None,
1555            storage_classes: Optional[StorageClasses]=None,
1556            trash_at: Optional[datetime.datetime]=None,
1557            ensure_unique_name: bool=False,
1558            num_retries: Optional[int]=None,
1559            preserve_version: bool=False,
1560    ):
1561        """Save collection to a new API record
1562
1563        This method finishes uploading new data blocks and (optionally)
1564        creates a new API collection record with the provided data. If a new
1565        record is created, this instance becomes associated with that record
1566        for future updates like `save()`. This method returns the saved
1567        collection manifest.
1568
1569        Arguments:
1570
1571        * name: str | None --- The `name` field to use on the new collection
1572          record. If not specified, a generic default name is generated.
1573
1574        * create_collection_record: bool --- If `True` (the default), creates a
1575          collection record on the API server. If `False`, the method finishes
1576          all data uploads and only returns the resulting collection manifest
1577          without sending it to the API server.
1578
1579        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1580          new collection record.
1581
1582        * properties: dict[str, Any] | None --- The `properties` field to use on
1583          the new collection record.
1584
1585        * storage_classes: list[str] | None --- The
1586          `storage_classes_desired` field to use on the new collection record.
1587
1588        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1589          on the new collection record.
1590
1591        * ensure_unique_name: bool --- This value is passed directly to the
1592          Arvados API when creating the collection record. If `True`, the API
1593          server may modify the submitted `name` to ensure the collection's
1594          `name`+`owner_uuid` combination is unique. If `False` (the default),
1595          if a collection already exists with this same `name`+`owner_uuid`
1596          combination, creating a collection record will raise a validation
1597          error.
1598
1599        * num_retries: int | None --- The number of times to retry reloading
1600          the collection's API record from the API server. If not specified,
1601          uses the `num_retries` provided when this instance was constructed.
1602
1603        * preserve_version: bool --- This value will be passed to directly
1604          to the underlying API call. If `True`, the Arvados API will
1605          preserve the versions of this collection both immediately before
1606          and after the update. If `True` when the API server is not
1607          configured with collection versioning, this method raises
1608          `arvados.errors.ArgumentError`.
1609        """
1610        if properties and type(properties) is not dict:
1611            raise errors.ArgumentError("properties must be dictionary type.")
1612
1613        if storage_classes and type(storage_classes) is not list:
1614            raise errors.ArgumentError("storage_classes must be list type.")
1615
1616        if trash_at and type(trash_at) is not datetime.datetime:
1617            raise errors.ArgumentError("trash_at must be datetime type.")
1618
1619        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1620            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1621
1622        if self._has_remote_blocks:
1623            # Copy any remote blocks to the local cluster.
1624            self._copy_remote_blocks(remote_blocks={})
1625            self._has_remote_blocks = False
1626
1627        if storage_classes:
1628            self._storage_classes_desired = storage_classes
1629
1630        self._my_block_manager().commit_all()
1631        text = self.manifest_text(strip=False)
1632
1633        if create_collection_record:
1634            if name is None:
1635                name = "New collection"
1636                ensure_unique_name = True
1637
1638            body = {"manifest_text": text,
1639                    "name": name,
1640                    "replication_desired": self.replication_desired}
1641            if owner_uuid:
1642                body["owner_uuid"] = owner_uuid
1643            if properties:
1644                body["properties"] = properties
1645            if self.storage_classes_desired():
1646                body["storage_classes_desired"] = self.storage_classes_desired()
1647            if trash_at:
1648                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1649                body["trash_at"] = t
1650            if preserve_version:
1651                body["preserve_version"] = preserve_version
1652
1653            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1654            text = self._api_response["manifest_text"]
1655
1656            self._manifest_locator = self._api_response["uuid"]
1657            self._portable_data_hash = self._api_response["portable_data_hash"]
1658
1659            self._manifest_text = text
1660            self.set_committed(True)
1661
1662        return text
1663
1664    _token_re = re.compile(r'(\S+)(\s+|$)')
1665    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1666    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1667
1668    def _unescape_manifest_path(self, path):
1669        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1670
1671    @synchronized
1672    def _import_manifest(self, manifest_text):
1673        """Import a manifest into a `Collection`.
1674
1675        :manifest_text:
1676          The manifest text to import from.
1677
1678        """
1679        if len(self) > 0:
1680            raise ArgumentError("Can only import manifest into an empty collection")
1681
1682        STREAM_NAME = 0
1683        BLOCKS = 1
1684        SEGMENTS = 2
1685
1686        stream_name = None
1687        state = STREAM_NAME
1688
1689        for token_and_separator in self._token_re.finditer(manifest_text):
1690            tok = token_and_separator.group(1)
1691            sep = token_and_separator.group(2)
1692
1693            if state == STREAM_NAME:
1694                # starting a new stream
1695                stream_name = self._unescape_manifest_path(tok)
1696                blocks = []
1697                segments = []
1698                streamoffset = 0
1699                state = BLOCKS
1700                self.find_or_create(stream_name, COLLECTION)
1701                continue
1702
1703            if state == BLOCKS:
1704                block_locator = self._block_re.match(tok)
1705                if block_locator:
1706                    blocksize = int(block_locator.group(1))
1707                    blocks.append(Range(tok, streamoffset, blocksize, 0))
1708                    streamoffset += blocksize
1709                else:
1710                    state = SEGMENTS
1711
1712            if state == SEGMENTS:
1713                file_segment = self._segment_re.match(tok)
1714                if file_segment:
1715                    pos = int(file_segment.group(1))
1716                    size = int(file_segment.group(2))
1717                    name = self._unescape_manifest_path(file_segment.group(3))
1718                    if name.split('/')[-1] == '.':
1719                        # placeholder for persisting an empty directory, not a real file
1720                        if len(name) > 2:
1721                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1722                    else:
1723                        filepath = os.path.join(stream_name, name)
1724                        try:
1725                            afile = self.find_or_create(filepath, FILE)
1726                        except IOError as e:
1727                            if e.errno == errno.ENOTDIR:
1728                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1729                            else:
1730                                raise e from None
1731                        if isinstance(afile, ArvadosFile):
1732                            afile.add_segment(blocks, pos, size)
1733                        else:
1734                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1735                else:
1736                    # error!
1737                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1738
1739            if sep == "\n":
1740                stream_name = None
1741                state = STREAM_NAME
1742
1743        self.set_committed(True)
1744
1745    @synchronized
1746    def notify(
1747            self,
1748            event: ChangeType,
1749            collection: 'RichCollectionBase',
1750            name: str,
1751            item: CollectionItem,
1752    ) -> None:
1753        if self._callback:
1754            self._callback(event, collection, name, item)
1755
1756
1757class Subcollection(RichCollectionBase):
1758    """Read and manipulate a stream/directory within an Arvados collection
1759
1760    This class represents a single stream (like a directory) within an Arvados
1761    `Collection`. It is returned by `Collection.find` and provides the same API.
1762    Operations that work on the API collection record propagate to the parent
1763    `Collection` object.
1764    """
1765
1766    def __init__(self, parent, name):
1767        super(Subcollection, self).__init__(parent)
1768        self.lock = self.root_collection().lock
1769        self._manifest_text = None
1770        self.name = name
1771        self.num_retries = parent.num_retries
1772
1773    def root_collection(self) -> 'Collection':
1774        return self.parent.root_collection()
1775
1776    def writable(self) -> bool:
1777        return self.root_collection().writable()
1778
1779    def _my_api(self):
1780        return self.root_collection()._my_api()
1781
1782    def _my_keep(self):
1783        return self.root_collection()._my_keep()
1784
1785    def _my_block_manager(self):
1786        return self.root_collection()._my_block_manager()
1787
1788    def stream_name(self) -> str:
1789        return os.path.join(self.parent.stream_name(), self.name)
1790
1791    @synchronized
1792    def clone(
1793            self,
1794            new_parent: Optional['Collection']=None,
1795            new_name: Optional[str]=None,
1796    ) -> 'Subcollection':
1797        c = Subcollection(new_parent, new_name)
1798        c._clonefrom(self)
1799        return c
1800
1801    @must_be_writable
1802    @synchronized
1803    def _reparent(self, newparent, newname):
1804        self.set_committed(False)
1805        self.flush()
1806        self.parent.remove(self.name, recursive=True)
1807        self.parent = newparent
1808        self.name = newname
1809        self.lock = self.parent.root_collection().lock
1810
1811    @synchronized
1812    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1813        """Encode empty directories by using an \056-named (".") empty file"""
1814        if len(self._items) == 0:
1815            return "%s %s 0:0:\\056\n" % (
1816                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1817        return super(Subcollection, self)._get_manifest_text(stream_name,
1818                                                             strip, normalize,
1819                                                             only_committed)
1820
1821
1822class CollectionReader(Collection):
1823    """Read-only `Collection` subclass
1824
1825    This class will never create or update any API collection records. You can
1826    use this class for additional code safety when you only need to read
1827    existing collections.
1828    """
1829    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1830        self._in_init = True
1831        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1832        self._in_init = False
1833
1834        # Forego any locking since it should never change once initialized.
1835        self.lock = NoopLock()
1836
1837        # Backwards compatability with old CollectionReader
1838        # all_streams() and all_files()
1839        self._streams = None
1840
1841    def writable(self) -> bool:
1842        return self._in_init
ADD = 'add'

Argument value for Collection methods to represent an added item

DEL = 'del'

Argument value for Collection methods to represent a removed item

MOD = 'mod'

Argument value for Collection methods to represent a modified item

TOK = 'tok'

Argument value for Collection methods to represent an item with token differences

FILE = 'file'

create_type value for Collection.find_or_create

COLLECTION = 'collection'

create_type value for Collection.find_or_create

ChangeList = typing.List[typing.Union[typing.Tuple[typing.Literal['add', 'del'], str, ForwardRef('Collection')], typing.Tuple[typing.Literal['mod', 'tok'], str, ForwardRef('Collection'), ForwardRef('Collection')]]]
ChangeType = typing.Literal['add', 'del', 'mod', 'tok']
CollectionItem = typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]
ChangeCallback = typing.Callable[[typing.Literal['add', 'del', 'mod', 'tok'], ForwardRef('Collection'), str, typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]], object]
CreateType = typing.Literal['collection', 'file']
Properties = typing.Dict[str, typing.Any]
StorageClasses = typing.List[str]
class CollectionBase:
 87class CollectionBase(object):
 88    """Abstract base class for Collection classes
 89
 90    .. ATTENTION:: Internal
 91       This class is meant to be used by other parts of the SDK. User code
 92       should instantiate or subclass `Collection` or one of its subclasses
 93       directly.
 94    """
 95
 96    def __enter__(self):
 97        """Enter a context block with this collection instance"""
 98        return self
 99
100    def __exit__(self, exc_type, exc_value, traceback):
101        """Exit a context block with this collection instance"""
102        pass
103
104    def _my_keep(self):
105        if self._keep_client is None:
106            self._keep_client = KeepClient(api_client=self._api_client,
107                                           num_retries=self.num_retries)
108        return self._keep_client
109
110    def stripped_manifest(self) -> str:
111        """Create a copy of the collection manifest with only size hints
112
113        This method returns a string with the current collection's manifest
114        text with all non-portable locator hints like permission hints and
115        remote cluster hints removed. The only hints in the returned manifest
116        will be size hints.
117        """
118        raw = self.manifest_text()
119        clean = []
120        for line in raw.split("\n"):
121            fields = line.split()
122            if fields:
123                clean_fields = fields[:1] + [
124                    (re.sub(r'\+[^\d][^\+]*', '', x)
125                     if re.match(arvados.util.keep_locator_pattern, x)
126                     else x)
127                    for x in fields[1:]]
128                clean += [' '.join(clean_fields), "\n"]
129        return ''.join(clean)

Abstract base class for Collection classes

def stripped_manifest(self) -> str:
110    def stripped_manifest(self) -> str:
111        """Create a copy of the collection manifest with only size hints
112
113        This method returns a string with the current collection's manifest
114        text with all non-portable locator hints like permission hints and
115        remote cluster hints removed. The only hints in the returned manifest
116        will be size hints.
117        """
118        raw = self.manifest_text()
119        clean = []
120        for line in raw.split("\n"):
121            fields = line.split()
122            if fields:
123                clean_fields = fields[:1] + [
124                    (re.sub(r'\+[^\d][^\+]*', '', x)
125                     if re.match(arvados.util.keep_locator_pattern, x)
126                     else x)
127                    for x in fields[1:]]
128                clean += [' '.join(clean_fields), "\n"]
129        return ''.join(clean)

Create a copy of the collection manifest with only size hints

This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.

class RichCollectionBase(CollectionBase):
 155class RichCollectionBase(CollectionBase):
 156    """Base class for Collection classes
 157
 158    .. ATTENTION:: Internal
 159       This class is meant to be used by other parts of the SDK. User code
 160       should instantiate or subclass `Collection` or one of its subclasses
 161       directly.
 162    """
 163
 164    def __init__(self, parent=None):
 165        self.parent = parent
 166        self._committed = False
 167        self._has_remote_blocks = False
 168        self._callback = None
 169        self._items = {}
 170
 171    def _my_api(self):
 172        raise NotImplementedError()
 173
 174    def _my_keep(self):
 175        raise NotImplementedError()
 176
 177    def _my_block_manager(self):
 178        raise NotImplementedError()
 179
 180    def writable(self) -> bool:
 181        """Indicate whether this collection object can be modified
 182
 183        This method returns `False` if this object is a `CollectionReader`,
 184        else `True`.
 185        """
 186        raise NotImplementedError()
 187
 188    def root_collection(self) -> 'Collection':
 189        """Get this collection's root collection object
 190
 191        If you open a subcollection with `Collection.find`, calling this method
 192        on that subcollection returns the source Collection object.
 193        """
 194        raise NotImplementedError()
 195
 196    def stream_name(self) -> str:
 197        """Get the name of the manifest stream represented by this collection
 198
 199        If you open a subcollection with `Collection.find`, calling this method
 200        on that subcollection returns the name of the stream you opened.
 201        """
 202        raise NotImplementedError()
 203
 204    @synchronized
 205    def has_remote_blocks(self) -> bool:
 206        """Indiciate whether the collection refers to remote data
 207
 208        Returns `True` if the collection manifest includes any Keep locators
 209        with a remote hint (`+R`), else `False`.
 210        """
 211        if self._has_remote_blocks:
 212            return True
 213        for item in self:
 214            if self[item].has_remote_blocks():
 215                return True
 216        return False
 217
 218    @synchronized
 219    def set_has_remote_blocks(self, val: bool) -> None:
 220        """Cache whether this collection refers to remote blocks
 221
 222        .. ATTENTION:: Internal
 223           This method is only meant to be used by other Collection methods.
 224
 225        Set this collection's cached "has remote blocks" flag to the given
 226        value.
 227        """
 228        self._has_remote_blocks = val
 229        if self.parent:
 230            self.parent.set_has_remote_blocks(val)
 231
 232    @must_be_writable
 233    @synchronized
 234    def find_or_create(
 235            self,
 236            path: str,
 237            create_type: CreateType,
 238    ) -> CollectionItem:
 239        """Get the item at the given path, creating it if necessary
 240
 241        If `path` refers to a stream in this collection, returns a
 242        corresponding `Subcollection` object. If `path` refers to a file in
 243        this collection, returns a corresponding
 244        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 245        this collection, then this method creates a new object and returns
 246        it, creating parent streams as needed. The type of object created is
 247        determined by the value of `create_type`.
 248
 249        Arguments:
 250
 251        * path: str --- The path to find or create within this collection.
 252
 253        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 254          create at `path` if one does not exist. Passing `COLLECTION`
 255          creates a stream and returns the corresponding
 256          `Subcollection`. Passing `FILE` creates a new file and returns the
 257          corresponding `arvados.arvfile.ArvadosFile`.
 258        """
 259        pathcomponents = path.split("/", 1)
 260        if pathcomponents[0]:
 261            item = self._items.get(pathcomponents[0])
 262            if len(pathcomponents) == 1:
 263                if item is None:
 264                    # create new file
 265                    if create_type == COLLECTION:
 266                        item = Subcollection(self, pathcomponents[0])
 267                    else:
 268                        item = ArvadosFile(self, pathcomponents[0])
 269                    self._items[pathcomponents[0]] = item
 270                    self.set_committed(False)
 271                    self.notify(ADD, self, pathcomponents[0], item)
 272                return item
 273            else:
 274                if item is None:
 275                    # create new collection
 276                    item = Subcollection(self, pathcomponents[0])
 277                    self._items[pathcomponents[0]] = item
 278                    self.set_committed(False)
 279                    self.notify(ADD, self, pathcomponents[0], item)
 280                if isinstance(item, RichCollectionBase):
 281                    return item.find_or_create(pathcomponents[1], create_type)
 282                else:
 283                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 284        else:
 285            return self
 286
 287    @synchronized
 288    def find(self, path: str) -> CollectionItem:
 289        """Get the item at the given path
 290
 291        If `path` refers to a stream in this collection, returns a
 292        corresponding `Subcollection` object. If `path` refers to a file in
 293        this collection, returns a corresponding
 294        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 295        this collection, then this method raises `NotADirectoryError`.
 296
 297        Arguments:
 298
 299        * path: str --- The path to find or create within this collection.
 300        """
 301        if not path:
 302            raise errors.ArgumentError("Parameter 'path' is empty.")
 303
 304        pathcomponents = path.split("/", 1)
 305        if pathcomponents[0] == '':
 306            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 307
 308        item = self._items.get(pathcomponents[0])
 309        if item is None:
 310            return None
 311        elif len(pathcomponents) == 1:
 312            return item
 313        else:
 314            if isinstance(item, RichCollectionBase):
 315                if pathcomponents[1]:
 316                    return item.find(pathcomponents[1])
 317                else:
 318                    return item
 319            else:
 320                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 321
 322    @synchronized
 323    def mkdirs(self, path: str) -> 'Subcollection':
 324        """Create and return a subcollection at `path`
 325
 326        If `path` exists within this collection, raises `FileExistsError`.
 327        Otherwise, creates a stream at that path and returns the
 328        corresponding `Subcollection`.
 329        """
 330        if self.find(path) != None:
 331            raise IOError(errno.EEXIST, "Directory or file exists", path)
 332
 333        return self.find_or_create(path, COLLECTION)
 334
 335    def open(
 336            self,
 337            path: str,
 338            mode: str="r",
 339            encoding: Optional[str]=None
 340    ) -> IO:
 341        """Open a file-like object within the collection
 342
 343        This method returns a file-like object that can read and/or write the
 344        file located at `path` within the collection. If you attempt to write
 345        a `path` that does not exist, the file is created with `find_or_create`.
 346        If the file cannot be opened for any other reason, this method raises
 347        `OSError` with an appropriate errno.
 348
 349        Arguments:
 350
 351        * path: str --- The path of the file to open within this collection
 352
 353        * mode: str --- The mode to open this file. Supports all the same
 354          values as `builtins.open`.
 355
 356        * encoding: str | None --- The text encoding of the file. Only used
 357          when the file is opened in text mode. The default is
 358          platform-dependent.
 359
 360        """
 361        if not re.search(r'^[rwa][bt]?\+?$', mode):
 362            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 363
 364        if mode[0] == 'r' and '+' not in mode:
 365            fclass = ArvadosFileReader
 366            arvfile = self.find(path)
 367        elif not self.writable():
 368            raise IOError(errno.EROFS, "Collection is read only")
 369        else:
 370            fclass = ArvadosFileWriter
 371            arvfile = self.find_or_create(path, FILE)
 372
 373        if arvfile is None:
 374            raise IOError(errno.ENOENT, "File not found", path)
 375        if not isinstance(arvfile, ArvadosFile):
 376            raise IOError(errno.EISDIR, "Is a directory", path)
 377
 378        if mode[0] == 'w':
 379            arvfile.truncate(0)
 380
 381        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 382        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 383        if 'b' not in mode:
 384            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 385            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 386        return f
 387
 388    def modified(self) -> bool:
 389        """Indicate whether this collection has an API server record
 390
 391        Returns `False` if this collection corresponds to a record loaded from
 392        the API server, `True` otherwise.
 393        """
 394        return not self.committed()
 395
 396    @synchronized
 397    def committed(self):
 398        """Indicate whether this collection has an API server record
 399
 400        Returns `True` if this collection corresponds to a record loaded from
 401        the API server, `False` otherwise.
 402        """
 403        return self._committed
 404
 405    @synchronized
 406    def set_committed(self, value: bool=True):
 407        """Cache whether this collection has an API server record
 408
 409        .. ATTENTION:: Internal
 410           This method is only meant to be used by other Collection methods.
 411
 412        Set this collection's cached "committed" flag to the given
 413        value and propagates it as needed.
 414        """
 415        if value == self._committed:
 416            return
 417        if value:
 418            for k,v in self._items.items():
 419                v.set_committed(True)
 420            self._committed = True
 421        else:
 422            self._committed = False
 423            if self.parent is not None:
 424                self.parent.set_committed(False)
 425
 426    @synchronized
 427    def __iter__(self) -> Iterator[str]:
 428        """Iterate names of streams and files in this collection
 429
 430        This method does not recurse. It only iterates the contents of this
 431        collection's corresponding stream.
 432        """
 433        return iter(self._items)
 434
 435    @synchronized
 436    def __getitem__(self, k: str) -> CollectionItem:
 437        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 438
 439        This method does not recurse. If you want to search a path, use
 440        `RichCollectionBase.find` instead.
 441        """
 442        return self._items[k]
 443
 444    @synchronized
 445    def __contains__(self, k: str) -> bool:
 446        """Indicate whether this collection has an item with this name
 447
 448        This method does not recurse. It you want to check a path, use
 449        `RichCollectionBase.exists` instead.
 450        """
 451        return k in self._items
 452
 453    @synchronized
 454    def __len__(self):
 455        """Get the number of items directly contained in this collection
 456
 457        This method does not recurse. It only counts the streams and files
 458        in this collection's corresponding stream.
 459        """
 460        return len(self._items)
 461
 462    @must_be_writable
 463    @synchronized
 464    def __delitem__(self, p: str) -> None:
 465        """Delete an item from this collection's stream
 466
 467        This method does not recurse. If you want to remove an item by a
 468        path, use `RichCollectionBase.remove` instead.
 469        """
 470        del self._items[p]
 471        self.set_committed(False)
 472        self.notify(DEL, self, p, None)
 473
 474    @synchronized
 475    def keys(self) -> Iterator[str]:
 476        """Iterate names of streams and files in this collection
 477
 478        This method does not recurse. It only iterates the contents of this
 479        collection's corresponding stream.
 480        """
 481        return self._items.keys()
 482
 483    @synchronized
 484    def values(self) -> List[CollectionItem]:
 485        """Get a list of objects in this collection's stream
 486
 487        The return value includes a `Subcollection` for every stream, and an
 488        `arvados.arvfile.ArvadosFile` for every file, directly within this
 489        collection's stream.  This method does not recurse.
 490        """
 491        return list(self._items.values())
 492
 493    @synchronized
 494    def items(self) -> List[Tuple[str, CollectionItem]]:
 495        """Get a list of `(name, object)` tuples from this collection's stream
 496
 497        The return value includes a `Subcollection` for every stream, and an
 498        `arvados.arvfile.ArvadosFile` for every file, directly within this
 499        collection's stream.  This method does not recurse.
 500        """
 501        return list(self._items.items())
 502
 503    def exists(self, path: str) -> bool:
 504        """Indicate whether this collection includes an item at `path`
 505
 506        This method returns `True` if `path` refers to a stream or file within
 507        this collection, else `False`.
 508
 509        Arguments:
 510
 511        * path: str --- The path to check for existence within this collection
 512        """
 513        return self.find(path) is not None
 514
 515    @must_be_writable
 516    @synchronized
 517    def remove(self, path: str, recursive: bool=False) -> None:
 518        """Remove the file or stream at `path`
 519
 520        Arguments:
 521
 522        * path: str --- The path of the item to remove from the collection
 523
 524        * recursive: bool --- Controls the method's behavior if `path` refers
 525          to a nonempty stream. If `False` (the default), this method raises
 526          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 527          items under the stream.
 528        """
 529        if not path:
 530            raise errors.ArgumentError("Parameter 'path' is empty.")
 531
 532        pathcomponents = path.split("/", 1)
 533        item = self._items.get(pathcomponents[0])
 534        if item is None:
 535            raise IOError(errno.ENOENT, "File not found", path)
 536        if len(pathcomponents) == 1:
 537            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 538                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 539            deleteditem = self._items[pathcomponents[0]]
 540            del self._items[pathcomponents[0]]
 541            self.set_committed(False)
 542            self.notify(DEL, self, pathcomponents[0], deleteditem)
 543        else:
 544            item.remove(pathcomponents[1], recursive=recursive)
 545
 546    def _clonefrom(self, source):
 547        for k,v in source.items():
 548            self._items[k] = v.clone(self, k)
 549
 550    def clone(self):
 551        raise NotImplementedError()
 552
 553    @must_be_writable
 554    @synchronized
 555    def add(
 556            self,
 557            source_obj: CollectionItem,
 558            target_name: str,
 559            overwrite: bool=False,
 560            reparent: bool=False,
 561    ) -> None:
 562        """Copy or move a file or subcollection object to this collection
 563
 564        Arguments:
 565
 566        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 567          to add to this collection
 568
 569        * target_name: str --- The path inside this collection where
 570          `source_obj` should be added.
 571
 572        * overwrite: bool --- Controls the behavior of this method when the
 573          collection already contains an object at `target_name`. If `False`
 574          (the default), this method will raise `FileExistsError`. If `True`,
 575          the object at `target_name` will be replaced with `source_obj`.
 576
 577        * reparent: bool --- Controls whether this method copies or moves
 578          `source_obj`. If `False` (the default), `source_obj` is copied into
 579          this collection. If `True`, `source_obj` is moved into this
 580          collection.
 581        """
 582        if target_name in self and not overwrite:
 583            raise IOError(errno.EEXIST, "File already exists", target_name)
 584
 585        modified_from = None
 586        if target_name in self:
 587            modified_from = self[target_name]
 588
 589        # Actually make the move or copy.
 590        if reparent:
 591            source_obj._reparent(self, target_name)
 592            item = source_obj
 593        else:
 594            item = source_obj.clone(self, target_name)
 595
 596        self._items[target_name] = item
 597        self.set_committed(False)
 598        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 599            self.set_has_remote_blocks(True)
 600
 601        if modified_from:
 602            self.notify(MOD, self, target_name, (modified_from, item))
 603        else:
 604            self.notify(ADD, self, target_name, item)
 605
 606    def _get_src_target(self, source, target_path, source_collection, create_dest):
 607        if source_collection is None:
 608            source_collection = self
 609
 610        # Find the object
 611        if isinstance(source, str):
 612            source_obj = source_collection.find(source)
 613            if source_obj is None:
 614                raise IOError(errno.ENOENT, "File not found", source)
 615            sourcecomponents = source.split("/")
 616        else:
 617            source_obj = source
 618            sourcecomponents = None
 619
 620        # Find parent collection the target path
 621        targetcomponents = target_path.split("/")
 622
 623        # Determine the name to use.
 624        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 625
 626        if not target_name:
 627            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 628
 629        if create_dest:
 630            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 631        else:
 632            if len(targetcomponents) > 1:
 633                target_dir = self.find("/".join(targetcomponents[0:-1]))
 634            else:
 635                target_dir = self
 636
 637        if target_dir is None:
 638            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 639
 640        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 641            target_dir = target_dir[target_name]
 642            target_name = sourcecomponents[-1]
 643
 644        return (source_obj, target_dir, target_name)
 645
 646    @must_be_writable
 647    @synchronized
 648    def copy(
 649            self,
 650            source: Union[str, CollectionItem],
 651            target_path: str,
 652            source_collection: Optional['RichCollectionBase']=None,
 653            overwrite: bool=False,
 654    ) -> None:
 655        """Copy a file or subcollection object to this collection
 656
 657        Arguments:
 658
 659        * source: str | arvados.arvfile.ArvadosFile |
 660          arvados.collection.Subcollection --- The file or subcollection to
 661          add to this collection. If `source` is a str, the object will be
 662          found by looking up this path from `source_collection` (see
 663          below).
 664
 665        * target_path: str --- The path inside this collection where the
 666          source object should be added.
 667
 668        * source_collection: arvados.collection.Collection | None --- The
 669          collection to find the source object from when `source` is a
 670          path. Defaults to the current collection (`self`).
 671
 672        * overwrite: bool --- Controls the behavior of this method when the
 673          collection already contains an object at `target_path`. If `False`
 674          (the default), this method will raise `FileExistsError`. If `True`,
 675          the object at `target_path` will be replaced with `source_obj`.
 676        """
 677        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 678        target_dir.add(source_obj, target_name, overwrite, False)
 679
 680    @must_be_writable
 681    @synchronized
 682    def rename(
 683            self,
 684            source: Union[str, CollectionItem],
 685            target_path: str,
 686            source_collection: Optional['RichCollectionBase']=None,
 687            overwrite: bool=False,
 688    ) -> None:
 689        """Move a file or subcollection object to this collection
 690
 691        Arguments:
 692
 693        * source: str | arvados.arvfile.ArvadosFile |
 694          arvados.collection.Subcollection --- The file or subcollection to
 695          add to this collection. If `source` is a str, the object will be
 696          found by looking up this path from `source_collection` (see
 697          below).
 698
 699        * target_path: str --- The path inside this collection where the
 700          source object should be added.
 701
 702        * source_collection: arvados.collection.Collection | None --- The
 703          collection to find the source object from when `source` is a
 704          path. Defaults to the current collection (`self`).
 705
 706        * overwrite: bool --- Controls the behavior of this method when the
 707          collection already contains an object at `target_path`. If `False`
 708          (the default), this method will raise `FileExistsError`. If `True`,
 709          the object at `target_path` will be replaced with `source_obj`.
 710        """
 711        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 712        if not source_obj.writable():
 713            raise IOError(errno.EROFS, "Source collection is read only", source)
 714        target_dir.add(source_obj, target_name, overwrite, True)
 715
 716    def portable_manifest_text(self, stream_name: str=".") -> str:
 717        """Get the portable manifest text for this collection
 718
 719        The portable manifest text is normalized, and does not include access
 720        tokens. This method does not flush outstanding blocks to Keep.
 721
 722        Arguments:
 723
 724        * stream_name: str --- The name to use for this collection's stream in
 725          the generated manifest. Default `'.'`.
 726        """
 727        return self._get_manifest_text(stream_name, True, True)
 728
 729    @synchronized
 730    def manifest_text(
 731            self,
 732            stream_name: str=".",
 733            strip: bool=False,
 734            normalize: bool=False,
 735            only_committed: bool=False,
 736    ) -> str:
 737        """Get the manifest text for this collection
 738
 739        Arguments:
 740
 741        * stream_name: str --- The name to use for this collection's stream in
 742          the generated manifest. Default `'.'`.
 743
 744        * strip: bool --- Controls whether or not the returned manifest text
 745          includes access tokens. If `False` (the default), the manifest text
 746          will include access tokens. If `True`, the manifest text will not
 747          include access tokens.
 748
 749        * normalize: bool --- Controls whether or not the returned manifest
 750          text is normalized. Default `False`.
 751
 752        * only_committed: bool --- Controls whether or not this method uploads
 753          pending data to Keep before building and returning the manifest text.
 754          If `False` (the default), this method will finish uploading all data
 755          to Keep, then return the final manifest. If `True`, this method will
 756          build and return a manifest that only refers to the data that has
 757          finished uploading at the time this method was called.
 758        """
 759        if not only_committed:
 760            self._my_block_manager().commit_all()
 761        return self._get_manifest_text(stream_name, strip, normalize,
 762                                       only_committed=only_committed)
 763
 764    @synchronized
 765    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 766        """Get the manifest text for this collection, sub collections and files.
 767
 768        :stream_name:
 769          Name to use for this stream (directory)
 770
 771        :strip:
 772          If True, remove signing tokens from block locators if present.
 773          If False (default), block locators are left unchanged.
 774
 775        :normalize:
 776          If True, always export the manifest text in normalized form
 777          even if the Collection is not modified.  If False (default) and the collection
 778          is not modified, return the original manifest text even if it is not
 779          in normalized form.
 780
 781        :only_committed:
 782          If True, only include blocks that were already committed to Keep.
 783
 784        """
 785
 786        if not self.committed() or self._manifest_text is None or normalize:
 787            stream = {}
 788            buf = []
 789            sorted_keys = sorted(self.keys())
 790            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 791                # Create a stream per file `k`
 792                arvfile = self[filename]
 793                filestream = []
 794                for segment in arvfile.segments():
 795                    loc = segment.locator
 796                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 797                        if only_committed:
 798                            continue
 799                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 800                    if strip:
 801                        loc = KeepLocator(loc).stripped()
 802                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
 803                                         segment.segment_offset, segment.range_size))
 804                stream[filename] = filestream
 805            if stream:
 806                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
 807            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 808                buf.append(self[dirname].manifest_text(
 809                    stream_name=os.path.join(stream_name, dirname),
 810                    strip=strip, normalize=True, only_committed=only_committed))
 811            return "".join(buf)
 812        else:
 813            if strip:
 814                return self.stripped_manifest()
 815            else:
 816                return self._manifest_text
 817
 818    @synchronized
 819    def _copy_remote_blocks(self, remote_blocks={}):
 820        """Scan through the entire collection and ask Keep to copy remote blocks.
 821
 822        When accessing a remote collection, blocks will have a remote signature
 823        (+R instead of +A). Collect these signatures and request Keep to copy the
 824        blocks to the local cluster, returning local (+A) signatures.
 825
 826        :remote_blocks:
 827          Shared cache of remote to local block mappings. This is used to avoid
 828          doing extra work when blocks are shared by more than one file in
 829          different subdirectories.
 830
 831        """
 832        for item in self:
 833            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 834        return remote_blocks
 835
 836    @synchronized
 837    def diff(
 838            self,
 839            end_collection: 'RichCollectionBase',
 840            prefix: str=".",
 841            holding_collection: Optional['Collection']=None,
 842    ) -> ChangeList:
 843        """Build a list of differences between this collection and another
 844
 845        Arguments:
 846
 847        * end_collection: arvados.collection.RichCollectionBase --- A
 848          collection object with the desired end state. The returned diff
 849          list will describe how to go from the current collection object
 850          `self` to `end_collection`.
 851
 852        * prefix: str --- The name to use for this collection's stream in
 853          the diff list. Default `'.'`.
 854
 855        * holding_collection: arvados.collection.Collection | None --- A
 856          collection object used to hold objects for the returned diff
 857          list. By default, a new empty collection is created.
 858        """
 859        changes = []
 860        if holding_collection is None:
 861            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 862        for k in self:
 863            if k not in end_collection:
 864               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 865        for k in end_collection:
 866            if k in self:
 867                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 868                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 869                elif end_collection[k] != self[k]:
 870                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 871                else:
 872                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 873            else:
 874                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 875        return changes
 876
 877    @must_be_writable
 878    @synchronized
 879    def apply(self, changes: ChangeList) -> None:
 880        """Apply a list of changes from to this collection
 881
 882        This method takes a list of changes generated by
 883        `RichCollectionBase.diff` and applies it to this
 884        collection. Afterward, the state of this collection object will
 885        match the state of `end_collection` passed to `diff`. If a change
 886        conflicts with a local change, it will be saved to an alternate path
 887        indicating the conflict.
 888
 889        Arguments:
 890
 891        * changes: arvados.collection.ChangeList --- The list of differences
 892          generated by `RichCollectionBase.diff`.
 893        """
 894        if changes:
 895            self.set_committed(False)
 896        for change in changes:
 897            event_type = change[0]
 898            path = change[1]
 899            initial = change[2]
 900            local = self.find(path)
 901            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 902                                                                    time.gmtime()))
 903            if event_type == ADD:
 904                if local is None:
 905                    # No local file at path, safe to copy over new file
 906                    self.copy(initial, path)
 907                elif local is not None and local != initial:
 908                    # There is already local file and it is different:
 909                    # save change to conflict file.
 910                    self.copy(initial, conflictpath)
 911            elif event_type == MOD or event_type == TOK:
 912                final = change[3]
 913                if local == initial:
 914                    # Local matches the "initial" item so it has not
 915                    # changed locally and is safe to update.
 916                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 917                        # Replace contents of local file with new contents
 918                        local.replace_contents(final)
 919                    else:
 920                        # Overwrite path with new item; this can happen if
 921                        # path was a file and is now a collection or vice versa
 922                        self.copy(final, path, overwrite=True)
 923                else:
 924                    # Local is missing (presumably deleted) or local doesn't
 925                    # match the "start" value, so save change to conflict file
 926                    self.copy(final, conflictpath)
 927            elif event_type == DEL:
 928                if local == initial:
 929                    # Local item matches "initial" value, so it is safe to remove.
 930                    self.remove(path, recursive=True)
 931                # else, the file is modified or already removed, in either
 932                # case we don't want to try to remove it.
 933
 934    def portable_data_hash(self) -> str:
 935        """Get the portable data hash for this collection's manifest"""
 936        if self._manifest_locator and self.committed():
 937            # If the collection is already saved on the API server, and it's committed
 938            # then return API server's PDH response.
 939            return self._portable_data_hash
 940        else:
 941            stripped = self.portable_manifest_text().encode()
 942            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 943
 944    @synchronized
 945    def subscribe(self, callback: ChangeCallback) -> None:
 946        """Set a notify callback for changes to this collection
 947
 948        Arguments:
 949
 950        * callback: arvados.collection.ChangeCallback --- The callable to
 951          call each time the collection is changed.
 952        """
 953        if self._callback is None:
 954            self._callback = callback
 955        else:
 956            raise errors.ArgumentError("A callback is already set on this collection.")
 957
 958    @synchronized
 959    def unsubscribe(self) -> None:
 960        """Remove any notify callback set for changes to this collection"""
 961        if self._callback is not None:
 962            self._callback = None
 963
 964    @synchronized
 965    def notify(
 966            self,
 967            event: ChangeType,
 968            collection: 'RichCollectionBase',
 969            name: str,
 970            item: CollectionItem,
 971    ) -> None:
 972        """Notify any subscribed callback about a change to this collection
 973
 974        .. ATTENTION:: Internal
 975           This method is only meant to be used by other Collection methods.
 976
 977        If a callback has been registered with `RichCollectionBase.subscribe`,
 978        it will be called with information about a change to this collection.
 979        Then this notification will be propagated to this collection's root.
 980
 981        Arguments:
 982
 983        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 984          the collection.
 985
 986        * collection: arvados.collection.RichCollectionBase --- The
 987          collection that was modified.
 988
 989        * name: str --- The name of the file or stream within `collection` that
 990          was modified.
 991
 992        * item: arvados.arvfile.ArvadosFile |
 993          arvados.collection.Subcollection --- The new contents at `name`
 994          within `collection`.
 995        """
 996        if self._callback:
 997            self._callback(event, collection, name, item)
 998        self.root_collection().notify(event, collection, name, item)
 999
1000    @synchronized
1001    def __eq__(self, other: Any) -> bool:
1002        """Indicate whether this collection object is equal to another"""
1003        if other is self:
1004            return True
1005        if not isinstance(other, RichCollectionBase):
1006            return False
1007        if len(self._items) != len(other):
1008            return False
1009        for k in self._items:
1010            if k not in other:
1011                return False
1012            if self._items[k] != other[k]:
1013                return False
1014        return True
1015
1016    def __ne__(self, other: Any) -> bool:
1017        """Indicate whether this collection object is not equal to another"""
1018        return not self.__eq__(other)
1019
1020    @synchronized
1021    def flush(self) -> None:
1022        """Upload any pending data to Keep"""
1023        for e in self.values():
1024            e.flush()

Base class for Collection classes

RichCollectionBase(parent=None)
164    def __init__(self, parent=None):
165        self.parent = parent
166        self._committed = False
167        self._has_remote_blocks = False
168        self._callback = None
169        self._items = {}
parent
def writable(self) -> bool:
180    def writable(self) -> bool:
181        """Indicate whether this collection object can be modified
182
183        This method returns `False` if this object is a `CollectionReader`,
184        else `True`.
185        """
186        raise NotImplementedError()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def root_collection(self) -> Collection:
188    def root_collection(self) -> 'Collection':
189        """Get this collection's root collection object
190
191        If you open a subcollection with `Collection.find`, calling this method
192        on that subcollection returns the source Collection object.
193        """
194        raise NotImplementedError()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def stream_name(self) -> str:
196    def stream_name(self) -> str:
197        """Get the name of the manifest stream represented by this collection
198
199        If you open a subcollection with `Collection.find`, calling this method
200        on that subcollection returns the name of the stream you opened.
201        """
202        raise NotImplementedError()

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def has_remote_blocks(self) -> bool:
204    @synchronized
205    def has_remote_blocks(self) -> bool:
206        """Indiciate whether the collection refers to remote data
207
208        Returns `True` if the collection manifest includes any Keep locators
209        with a remote hint (`+R`), else `False`.
210        """
211        if self._has_remote_blocks:
212            return True
213        for item in self:
214            if self[item].has_remote_blocks():
215                return True
216        return False

Indiciate whether the collection refers to remote data

Returns True if the collection manifest includes any Keep locators with a remote hint (+R), else False.

@synchronized
def set_has_remote_blocks(self, val: bool) -> None:
218    @synchronized
219    def set_has_remote_blocks(self, val: bool) -> None:
220        """Cache whether this collection refers to remote blocks
221
222        .. ATTENTION:: Internal
223           This method is only meant to be used by other Collection methods.
224
225        Set this collection's cached "has remote blocks" flag to the given
226        value.
227        """
228        self._has_remote_blocks = val
229        if self.parent:
230            self.parent.set_has_remote_blocks(val)

Cache whether this collection refers to remote blocks

Set this collection’s cached “has remote blocks” flag to the given value.

@must_be_writable
@synchronized
def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
232    @must_be_writable
233    @synchronized
234    def find_or_create(
235            self,
236            path: str,
237            create_type: CreateType,
238    ) -> CollectionItem:
239        """Get the item at the given path, creating it if necessary
240
241        If `path` refers to a stream in this collection, returns a
242        corresponding `Subcollection` object. If `path` refers to a file in
243        this collection, returns a corresponding
244        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
245        this collection, then this method creates a new object and returns
246        it, creating parent streams as needed. The type of object created is
247        determined by the value of `create_type`.
248
249        Arguments:
250
251        * path: str --- The path to find or create within this collection.
252
253        * create_type: Literal[COLLECTION, FILE] --- The type of object to
254          create at `path` if one does not exist. Passing `COLLECTION`
255          creates a stream and returns the corresponding
256          `Subcollection`. Passing `FILE` creates a new file and returns the
257          corresponding `arvados.arvfile.ArvadosFile`.
258        """
259        pathcomponents = path.split("/", 1)
260        if pathcomponents[0]:
261            item = self._items.get(pathcomponents[0])
262            if len(pathcomponents) == 1:
263                if item is None:
264                    # create new file
265                    if create_type == COLLECTION:
266                        item = Subcollection(self, pathcomponents[0])
267                    else:
268                        item = ArvadosFile(self, pathcomponents[0])
269                    self._items[pathcomponents[0]] = item
270                    self.set_committed(False)
271                    self.notify(ADD, self, pathcomponents[0], item)
272                return item
273            else:
274                if item is None:
275                    # create new collection
276                    item = Subcollection(self, pathcomponents[0])
277                    self._items[pathcomponents[0]] = item
278                    self.set_committed(False)
279                    self.notify(ADD, self, pathcomponents[0], item)
280                if isinstance(item, RichCollectionBase):
281                    return item.find_or_create(pathcomponents[1], create_type)
282                else:
283                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
284        else:
285            return self

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

@synchronized
def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
287    @synchronized
288    def find(self, path: str) -> CollectionItem:
289        """Get the item at the given path
290
291        If `path` refers to a stream in this collection, returns a
292        corresponding `Subcollection` object. If `path` refers to a file in
293        this collection, returns a corresponding
294        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
295        this collection, then this method raises `NotADirectoryError`.
296
297        Arguments:
298
299        * path: str --- The path to find or create within this collection.
300        """
301        if not path:
302            raise errors.ArgumentError("Parameter 'path' is empty.")
303
304        pathcomponents = path.split("/", 1)
305        if pathcomponents[0] == '':
306            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
307
308        item = self._items.get(pathcomponents[0])
309        if item is None:
310            return None
311        elif len(pathcomponents) == 1:
312            return item
313        else:
314            if isinstance(item, RichCollectionBase):
315                if pathcomponents[1]:
316                    return item.find(pathcomponents[1])
317                else:
318                    return item
319            else:
320                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
@synchronized
def mkdirs(self, path: str) -> Subcollection:
322    @synchronized
323    def mkdirs(self, path: str) -> 'Subcollection':
324        """Create and return a subcollection at `path`
325
326        If `path` exists within this collection, raises `FileExistsError`.
327        Otherwise, creates a stream at that path and returns the
328        corresponding `Subcollection`.
329        """
330        if self.find(path) != None:
331            raise IOError(errno.EEXIST, "Directory or file exists", path)
332
333        return self.find_or_create(path, COLLECTION)

Create and return a subcollection at path

If path exists within this collection, raises FileExistsError. Otherwise, creates a stream at that path and returns the corresponding Subcollection.

def open( self, path: str, mode: str = 'r', encoding: Optional[str] = None) -> <class 'IO'>:
335    def open(
336            self,
337            path: str,
338            mode: str="r",
339            encoding: Optional[str]=None
340    ) -> IO:
341        """Open a file-like object within the collection
342
343        This method returns a file-like object that can read and/or write the
344        file located at `path` within the collection. If you attempt to write
345        a `path` that does not exist, the file is created with `find_or_create`.
346        If the file cannot be opened for any other reason, this method raises
347        `OSError` with an appropriate errno.
348
349        Arguments:
350
351        * path: str --- The path of the file to open within this collection
352
353        * mode: str --- The mode to open this file. Supports all the same
354          values as `builtins.open`.
355
356        * encoding: str | None --- The text encoding of the file. Only used
357          when the file is opened in text mode. The default is
358          platform-dependent.
359
360        """
361        if not re.search(r'^[rwa][bt]?\+?$', mode):
362            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
363
364        if mode[0] == 'r' and '+' not in mode:
365            fclass = ArvadosFileReader
366            arvfile = self.find(path)
367        elif not self.writable():
368            raise IOError(errno.EROFS, "Collection is read only")
369        else:
370            fclass = ArvadosFileWriter
371            arvfile = self.find_or_create(path, FILE)
372
373        if arvfile is None:
374            raise IOError(errno.ENOENT, "File not found", path)
375        if not isinstance(arvfile, ArvadosFile):
376            raise IOError(errno.EISDIR, "Is a directory", path)
377
378        if mode[0] == 'w':
379            arvfile.truncate(0)
380
381        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
382        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
383        if 'b' not in mode:
384            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
385            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
386        return f

Open a file-like object within the collection

This method returns a file-like object that can read and/or write the file located at path within the collection. If you attempt to write a path that does not exist, the file is created with find_or_create. If the file cannot be opened for any other reason, this method raises OSError with an appropriate errno.

Arguments:

  • path: str — The path of the file to open within this collection

  • mode: str — The mode to open this file. Supports all the same values as builtins.open.

  • encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.

def modified(self) -> bool:
388    def modified(self) -> bool:
389        """Indicate whether this collection has an API server record
390
391        Returns `False` if this collection corresponds to a record loaded from
392        the API server, `True` otherwise.
393        """
394        return not self.committed()

Indicate whether this collection has an API server record

Returns False if this collection corresponds to a record loaded from the API server, True otherwise.

@synchronized
def committed(self):
396    @synchronized
397    def committed(self):
398        """Indicate whether this collection has an API server record
399
400        Returns `True` if this collection corresponds to a record loaded from
401        the API server, `False` otherwise.
402        """
403        return self._committed

Indicate whether this collection has an API server record

Returns True if this collection corresponds to a record loaded from the API server, False otherwise.

@synchronized
def set_committed(self, value: bool = True):
405    @synchronized
406    def set_committed(self, value: bool=True):
407        """Cache whether this collection has an API server record
408
409        .. ATTENTION:: Internal
410           This method is only meant to be used by other Collection methods.
411
412        Set this collection's cached "committed" flag to the given
413        value and propagates it as needed.
414        """
415        if value == self._committed:
416            return
417        if value:
418            for k,v in self._items.items():
419                v.set_committed(True)
420            self._committed = True
421        else:
422            self._committed = False
423            if self.parent is not None:
424                self.parent.set_committed(False)

Cache whether this collection has an API server record

Set this collection’s cached “committed” flag to the given value and propagates it as needed.

@synchronized
def keys(self) -> Iterator[str]:
474    @synchronized
475    def keys(self) -> Iterator[str]:
476        """Iterate names of streams and files in this collection
477
478        This method does not recurse. It only iterates the contents of this
479        collection's corresponding stream.
480        """
481        return self._items.keys()

Iterate names of streams and files in this collection

This method does not recurse. It only iterates the contents of this collection’s corresponding stream.

@synchronized
def values( self) -> List[Union[arvados.arvfile.ArvadosFile, Collection]]:
483    @synchronized
484    def values(self) -> List[CollectionItem]:
485        """Get a list of objects in this collection's stream
486
487        The return value includes a `Subcollection` for every stream, and an
488        `arvados.arvfile.ArvadosFile` for every file, directly within this
489        collection's stream.  This method does not recurse.
490        """
491        return list(self._items.values())

Get a list of objects in this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

@synchronized
def items( self) -> List[Tuple[str, Union[arvados.arvfile.ArvadosFile, Collection]]]:
493    @synchronized
494    def items(self) -> List[Tuple[str, CollectionItem]]:
495        """Get a list of `(name, object)` tuples from this collection's stream
496
497        The return value includes a `Subcollection` for every stream, and an
498        `arvados.arvfile.ArvadosFile` for every file, directly within this
499        collection's stream.  This method does not recurse.
500        """
501        return list(self._items.items())

Get a list of (name, object) tuples from this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

def exists(self, path: str) -> bool:
503    def exists(self, path: str) -> bool:
504        """Indicate whether this collection includes an item at `path`
505
506        This method returns `True` if `path` refers to a stream or file within
507        this collection, else `False`.
508
509        Arguments:
510
511        * path: str --- The path to check for existence within this collection
512        """
513        return self.find(path) is not None

Indicate whether this collection includes an item at path

This method returns True if path refers to a stream or file within this collection, else False.

Arguments:

  • path: str — The path to check for existence within this collection
@must_be_writable
@synchronized
def remove(self, path: str, recursive: bool = False) -> None:
515    @must_be_writable
516    @synchronized
517    def remove(self, path: str, recursive: bool=False) -> None:
518        """Remove the file or stream at `path`
519
520        Arguments:
521
522        * path: str --- The path of the item to remove from the collection
523
524        * recursive: bool --- Controls the method's behavior if `path` refers
525          to a nonempty stream. If `False` (the default), this method raises
526          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
527          items under the stream.
528        """
529        if not path:
530            raise errors.ArgumentError("Parameter 'path' is empty.")
531
532        pathcomponents = path.split("/", 1)
533        item = self._items.get(pathcomponents[0])
534        if item is None:
535            raise IOError(errno.ENOENT, "File not found", path)
536        if len(pathcomponents) == 1:
537            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
538                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
539            deleteditem = self._items[pathcomponents[0]]
540            del self._items[pathcomponents[0]]
541            self.set_committed(False)
542            self.notify(DEL, self, pathcomponents[0], deleteditem)
543        else:
544            item.remove(pathcomponents[1], recursive=recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

def clone(self):
550    def clone(self):
551        raise NotImplementedError()
@must_be_writable
@synchronized
def add( self, source_obj: Union[arvados.arvfile.ArvadosFile, Collection], target_name: str, overwrite: bool = False, reparent: bool = False) -> None:
553    @must_be_writable
554    @synchronized
555    def add(
556            self,
557            source_obj: CollectionItem,
558            target_name: str,
559            overwrite: bool=False,
560            reparent: bool=False,
561    ) -> None:
562        """Copy or move a file or subcollection object to this collection
563
564        Arguments:
565
566        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
567          to add to this collection
568
569        * target_name: str --- The path inside this collection where
570          `source_obj` should be added.
571
572        * overwrite: bool --- Controls the behavior of this method when the
573          collection already contains an object at `target_name`. If `False`
574          (the default), this method will raise `FileExistsError`. If `True`,
575          the object at `target_name` will be replaced with `source_obj`.
576
577        * reparent: bool --- Controls whether this method copies or moves
578          `source_obj`. If `False` (the default), `source_obj` is copied into
579          this collection. If `True`, `source_obj` is moved into this
580          collection.
581        """
582        if target_name in self and not overwrite:
583            raise IOError(errno.EEXIST, "File already exists", target_name)
584
585        modified_from = None
586        if target_name in self:
587            modified_from = self[target_name]
588
589        # Actually make the move or copy.
590        if reparent:
591            source_obj._reparent(self, target_name)
592            item = source_obj
593        else:
594            item = source_obj.clone(self, target_name)
595
596        self._items[target_name] = item
597        self.set_committed(False)
598        if not self._has_remote_blocks and source_obj.has_remote_blocks():
599            self.set_has_remote_blocks(True)
600
601        if modified_from:
602            self.notify(MOD, self, target_name, (modified_from, item))
603        else:
604            self.notify(ADD, self, target_name, item)

Copy or move a file or subcollection object to this collection

Arguments:

  • source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection

  • target_name: str — The path inside this collection where source_obj should be added.

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_name. If False (the default), this method will raise FileExistsError. If True, the object at target_name will be replaced with source_obj.

  • reparent: bool — Controls whether this method copies or moves source_obj. If False (the default), source_obj is copied into this collection. If True, source_obj is moved into this collection.

@must_be_writable
@synchronized
def copy( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
646    @must_be_writable
647    @synchronized
648    def copy(
649            self,
650            source: Union[str, CollectionItem],
651            target_path: str,
652            source_collection: Optional['RichCollectionBase']=None,
653            overwrite: bool=False,
654    ) -> None:
655        """Copy a file or subcollection object to this collection
656
657        Arguments:
658
659        * source: str | arvados.arvfile.ArvadosFile |
660          arvados.collection.Subcollection --- The file or subcollection to
661          add to this collection. If `source` is a str, the object will be
662          found by looking up this path from `source_collection` (see
663          below).
664
665        * target_path: str --- The path inside this collection where the
666          source object should be added.
667
668        * source_collection: arvados.collection.Collection | None --- The
669          collection to find the source object from when `source` is a
670          path. Defaults to the current collection (`self`).
671
672        * overwrite: bool --- Controls the behavior of this method when the
673          collection already contains an object at `target_path`. If `False`
674          (the default), this method will raise `FileExistsError`. If `True`,
675          the object at `target_path` will be replaced with `source_obj`.
676        """
677        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
678        target_dir.add(source_obj, target_name, overwrite, False)

Copy a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

@must_be_writable
@synchronized
def rename( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
680    @must_be_writable
681    @synchronized
682    def rename(
683            self,
684            source: Union[str, CollectionItem],
685            target_path: str,
686            source_collection: Optional['RichCollectionBase']=None,
687            overwrite: bool=False,
688    ) -> None:
689        """Move a file or subcollection object to this collection
690
691        Arguments:
692
693        * source: str | arvados.arvfile.ArvadosFile |
694          arvados.collection.Subcollection --- The file or subcollection to
695          add to this collection. If `source` is a str, the object will be
696          found by looking up this path from `source_collection` (see
697          below).
698
699        * target_path: str --- The path inside this collection where the
700          source object should be added.
701
702        * source_collection: arvados.collection.Collection | None --- The
703          collection to find the source object from when `source` is a
704          path. Defaults to the current collection (`self`).
705
706        * overwrite: bool --- Controls the behavior of this method when the
707          collection already contains an object at `target_path`. If `False`
708          (the default), this method will raise `FileExistsError`. If `True`,
709          the object at `target_path` will be replaced with `source_obj`.
710        """
711        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
712        if not source_obj.writable():
713            raise IOError(errno.EROFS, "Source collection is read only", source)
714        target_dir.add(source_obj, target_name, overwrite, True)

Move a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

def portable_manifest_text(self, stream_name: str = '.') -> str:
716    def portable_manifest_text(self, stream_name: str=".") -> str:
717        """Get the portable manifest text for this collection
718
719        The portable manifest text is normalized, and does not include access
720        tokens. This method does not flush outstanding blocks to Keep.
721
722        Arguments:
723
724        * stream_name: str --- The name to use for this collection's stream in
725          the generated manifest. Default `'.'`.
726        """
727        return self._get_manifest_text(stream_name, True, True)

Get the portable manifest text for this collection

The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.
@synchronized
def manifest_text( self, stream_name: str = '.', strip: bool = False, normalize: bool = False, only_committed: bool = False) -> str:
729    @synchronized
730    def manifest_text(
731            self,
732            stream_name: str=".",
733            strip: bool=False,
734            normalize: bool=False,
735            only_committed: bool=False,
736    ) -> str:
737        """Get the manifest text for this collection
738
739        Arguments:
740
741        * stream_name: str --- The name to use for this collection's stream in
742          the generated manifest. Default `'.'`.
743
744        * strip: bool --- Controls whether or not the returned manifest text
745          includes access tokens. If `False` (the default), the manifest text
746          will include access tokens. If `True`, the manifest text will not
747          include access tokens.
748
749        * normalize: bool --- Controls whether or not the returned manifest
750          text is normalized. Default `False`.
751
752        * only_committed: bool --- Controls whether or not this method uploads
753          pending data to Keep before building and returning the manifest text.
754          If `False` (the default), this method will finish uploading all data
755          to Keep, then return the final manifest. If `True`, this method will
756          build and return a manifest that only refers to the data that has
757          finished uploading at the time this method was called.
758        """
759        if not only_committed:
760            self._my_block_manager().commit_all()
761        return self._get_manifest_text(stream_name, strip, normalize,
762                                       only_committed=only_committed)

Get the manifest text for this collection

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.

  • strip: bool — Controls whether or not the returned manifest text includes access tokens. If False (the default), the manifest text will include access tokens. If True, the manifest text will not include access tokens.

  • normalize: bool — Controls whether or not the returned manifest text is normalized. Default False.

  • only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If False (the default), this method will finish uploading all data to Keep, then return the final manifest. If True, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.

@synchronized
def diff( self, end_collection: RichCollectionBase, prefix: str = '.', holding_collection: Optional[Collection] = None) -> List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]:
836    @synchronized
837    def diff(
838            self,
839            end_collection: 'RichCollectionBase',
840            prefix: str=".",
841            holding_collection: Optional['Collection']=None,
842    ) -> ChangeList:
843        """Build a list of differences between this collection and another
844
845        Arguments:
846
847        * end_collection: arvados.collection.RichCollectionBase --- A
848          collection object with the desired end state. The returned diff
849          list will describe how to go from the current collection object
850          `self` to `end_collection`.
851
852        * prefix: str --- The name to use for this collection's stream in
853          the diff list. Default `'.'`.
854
855        * holding_collection: arvados.collection.Collection | None --- A
856          collection object used to hold objects for the returned diff
857          list. By default, a new empty collection is created.
858        """
859        changes = []
860        if holding_collection is None:
861            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
862        for k in self:
863            if k not in end_collection:
864               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
865        for k in end_collection:
866            if k in self:
867                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
868                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
869                elif end_collection[k] != self[k]:
870                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
871                else:
872                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
873            else:
874                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
875        return changes

Build a list of differences between this collection and another

Arguments:

  • end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object self to end_collection.

  • prefix: str — The name to use for this collection’s stream in the diff list. Default '.'.

  • holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.

@must_be_writable
@synchronized
def apply( self, changes: List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]) -> None:
877    @must_be_writable
878    @synchronized
879    def apply(self, changes: ChangeList) -> None:
880        """Apply a list of changes from to this collection
881
882        This method takes a list of changes generated by
883        `RichCollectionBase.diff` and applies it to this
884        collection. Afterward, the state of this collection object will
885        match the state of `end_collection` passed to `diff`. If a change
886        conflicts with a local change, it will be saved to an alternate path
887        indicating the conflict.
888
889        Arguments:
890
891        * changes: arvados.collection.ChangeList --- The list of differences
892          generated by `RichCollectionBase.diff`.
893        """
894        if changes:
895            self.set_committed(False)
896        for change in changes:
897            event_type = change[0]
898            path = change[1]
899            initial = change[2]
900            local = self.find(path)
901            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
902                                                                    time.gmtime()))
903            if event_type == ADD:
904                if local is None:
905                    # No local file at path, safe to copy over new file
906                    self.copy(initial, path)
907                elif local is not None and local != initial:
908                    # There is already local file and it is different:
909                    # save change to conflict file.
910                    self.copy(initial, conflictpath)
911            elif event_type == MOD or event_type == TOK:
912                final = change[3]
913                if local == initial:
914                    # Local matches the "initial" item so it has not
915                    # changed locally and is safe to update.
916                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
917                        # Replace contents of local file with new contents
918                        local.replace_contents(final)
919                    else:
920                        # Overwrite path with new item; this can happen if
921                        # path was a file and is now a collection or vice versa
922                        self.copy(final, path, overwrite=True)
923                else:
924                    # Local is missing (presumably deleted) or local doesn't
925                    # match the "start" value, so save change to conflict file
926                    self.copy(final, conflictpath)
927            elif event_type == DEL:
928                if local == initial:
929                    # Local item matches "initial" value, so it is safe to remove.
930                    self.remove(path, recursive=True)
931                # else, the file is modified or already removed, in either
932                # case we don't want to try to remove it.

Apply a list of changes from to this collection

This method takes a list of changes generated by RichCollectionBase.diff and applies it to this collection. Afterward, the state of this collection object will match the state of end_collection passed to diff. If a change conflicts with a local change, it will be saved to an alternate path indicating the conflict.

Arguments:

def portable_data_hash(self) -> str:
934    def portable_data_hash(self) -> str:
935        """Get the portable data hash for this collection's manifest"""
936        if self._manifest_locator and self.committed():
937            # If the collection is already saved on the API server, and it's committed
938            # then return API server's PDH response.
939            return self._portable_data_hash
940        else:
941            stripped = self.portable_manifest_text().encode()
942            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))

Get the portable data hash for this collection’s manifest

@synchronized
def subscribe( self, callback: Callable[[Literal['add', 'del', 'mod', 'tok'], Collection, str, Union[arvados.arvfile.ArvadosFile, Collection]], object]) -> None:
944    @synchronized
945    def subscribe(self, callback: ChangeCallback) -> None:
946        """Set a notify callback for changes to this collection
947
948        Arguments:
949
950        * callback: arvados.collection.ChangeCallback --- The callable to
951          call each time the collection is changed.
952        """
953        if self._callback is None:
954            self._callback = callback
955        else:
956            raise errors.ArgumentError("A callback is already set on this collection.")

Set a notify callback for changes to this collection

Arguments:

@synchronized
def unsubscribe(self) -> None:
958    @synchronized
959    def unsubscribe(self) -> None:
960        """Remove any notify callback set for changes to this collection"""
961        if self._callback is not None:
962            self._callback = None

Remove any notify callback set for changes to this collection

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
964    @synchronized
965    def notify(
966            self,
967            event: ChangeType,
968            collection: 'RichCollectionBase',
969            name: str,
970            item: CollectionItem,
971    ) -> None:
972        """Notify any subscribed callback about a change to this collection
973
974        .. ATTENTION:: Internal
975           This method is only meant to be used by other Collection methods.
976
977        If a callback has been registered with `RichCollectionBase.subscribe`,
978        it will be called with information about a change to this collection.
979        Then this notification will be propagated to this collection's root.
980
981        Arguments:
982
983        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
984          the collection.
985
986        * collection: arvados.collection.RichCollectionBase --- The
987          collection that was modified.
988
989        * name: str --- The name of the file or stream within `collection` that
990          was modified.
991
992        * item: arvados.arvfile.ArvadosFile |
993          arvados.collection.Subcollection --- The new contents at `name`
994          within `collection`.
995        """
996        if self._callback:
997            self._callback(event, collection, name, item)
998        self.root_collection().notify(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

@synchronized
def flush(self) -> None:
1020    @synchronized
1021    def flush(self) -> None:
1022        """Upload any pending data to Keep"""
1023        for e in self.values():
1024            e.flush()

Upload any pending data to Keep

Inherited Members
CollectionBase
stripped_manifest
class Collection(RichCollectionBase):
1027class Collection(RichCollectionBase):
1028    """Read and manipulate an Arvados collection
1029
1030    This class provides a high-level interface to create, read, and update
1031    Arvados collections and their contents. Refer to the Arvados Python SDK
1032    cookbook for [an introduction to using the Collection class][cookbook].
1033
1034    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1035    """
1036
1037    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1038                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1039                 keep_client: Optional['arvados.keep.KeepClient']=None,
1040                 num_retries: int=10,
1041                 parent: Optional['Collection']=None,
1042                 apiconfig: Optional[Mapping[str, str]]=None,
1043                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1044                 replication_desired: Optional[int]=None,
1045                 storage_classes_desired: Optional[List[str]]=None,
1046                 put_threads: Optional[int]=None):
1047        """Initialize a Collection object
1048
1049        Arguments:
1050
1051        * manifest_locator_or_text: str | None --- This string can contain a
1052          collection manifest text, portable data hash, or UUID. When given a
1053          portable data hash or UUID, this instance will load a collection
1054          record from the API server. Otherwise, this instance will represent a
1055          new collection without an API server record. The default value `None`
1056          instantiates a new collection with an empty manifest.
1057
1058        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1059          Arvados API client object this instance uses to make requests. If
1060          none is given, this instance creates its own client using the
1061          settings from `apiconfig` (see below). If your client instantiates
1062          many Collection objects, you can help limit memory utilization by
1063          calling `arvados.api.api` to construct an
1064          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1065          for every Collection.
1066
1067        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1068          object this instance uses to make requests. If none is given, this
1069          instance creates its own client using its `api_client`.
1070
1071        * num_retries: int --- The number of times that client requests are
1072          retried. Default 10.
1073
1074        * parent: arvados.collection.Collection | None --- The parent Collection
1075          object of this instance, if any. This argument is primarily used by
1076          other Collection methods; user client code shouldn't need to use it.
1077
1078        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1079          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1080          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1081          Collection object constructs one from these settings. If no
1082          mapping is provided, calls `arvados.config.settings` to get these
1083          parameters from user configuration.
1084
1085        * block_manager: arvados.arvfile._BlockManager | None --- The
1086          _BlockManager object used by this instance to coordinate reading
1087          and writing Keep data blocks. If none is given, this instance
1088          constructs its own. This argument is primarily used by other
1089          Collection methods; user client code shouldn't need to use it.
1090
1091        * replication_desired: int | None --- This controls both the value of
1092          the `replication_desired` field on API collection records saved by
1093          this class, as well as the number of Keep services that the object
1094          writes new data blocks to. If none is given, uses the default value
1095          configured for the cluster.
1096
1097        * storage_classes_desired: list[str] | None --- This controls both
1098          the value of the `storage_classes_desired` field on API collection
1099          records saved by this class, as well as selecting which specific
1100          Keep services the object writes new data blocks to. If none is
1101          given, defaults to an empty list.
1102
1103        * put_threads: int | None --- The number of threads to run
1104          simultaneously to upload data blocks to Keep. This value is used when
1105          building a new `block_manager`. It is unused when a `block_manager`
1106          is provided.
1107        """
1108
1109        if storage_classes_desired and type(storage_classes_desired) is not list:
1110            raise errors.ArgumentError("storage_classes_desired must be list type.")
1111
1112        super(Collection, self).__init__(parent)
1113        self._api_client = api_client
1114        self._keep_client = keep_client
1115
1116        # Use the keep client from ThreadSafeApiCache
1117        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1118            self._keep_client = self._api_client.keep
1119
1120        self._block_manager = block_manager
1121        self.replication_desired = replication_desired
1122        self._storage_classes_desired = storage_classes_desired
1123        self.put_threads = put_threads
1124
1125        if apiconfig:
1126            self._config = apiconfig
1127        else:
1128            self._config = config.settings()
1129
1130        self.num_retries = num_retries
1131        self._manifest_locator = None
1132        self._manifest_text = None
1133        self._portable_data_hash = None
1134        self._api_response = None
1135        self._past_versions = set()
1136
1137        self.lock = threading.RLock()
1138        self.events = None
1139
1140        if manifest_locator_or_text:
1141            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1142                self._manifest_locator = manifest_locator_or_text
1143            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1144                self._manifest_locator = manifest_locator_or_text
1145                if not self._has_local_collection_uuid():
1146                    self._has_remote_blocks = True
1147            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1148                self._manifest_text = manifest_locator_or_text
1149                if '+R' in self._manifest_text:
1150                    self._has_remote_blocks = True
1151            else:
1152                raise errors.ArgumentError(
1153                    "Argument to CollectionReader is not a manifest or a collection UUID")
1154
1155            try:
1156                self._populate()
1157            except errors.SyntaxError as e:
1158                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1159
1160    def storage_classes_desired(self) -> List[str]:
1161        """Get this collection's `storage_classes_desired` value"""
1162        return self._storage_classes_desired or []
1163
1164    def root_collection(self) -> 'Collection':
1165        return self
1166
1167    def get_properties(self) -> Properties:
1168        """Get this collection's properties
1169
1170        This method always returns a dict. If this collection object does not
1171        have an associated API record, or that record does not have any
1172        properties set, this method returns an empty dict.
1173        """
1174        if self._api_response and self._api_response["properties"]:
1175            return self._api_response["properties"]
1176        else:
1177            return {}
1178
1179    def get_trash_at(self) -> Optional[datetime.datetime]:
1180        """Get this collection's `trash_at` field
1181
1182        This method parses the `trash_at` field of the collection's API
1183        record and returns a datetime from it. If that field is not set, or
1184        this collection object does not have an associated API record,
1185        returns None.
1186        """
1187        if self._api_response and self._api_response["trash_at"]:
1188            try:
1189                return ciso8601.parse_datetime(self._api_response["trash_at"])
1190            except ValueError:
1191                return None
1192        else:
1193            return None
1194
1195    def stream_name(self) -> str:
1196        return "."
1197
1198    def writable(self) -> bool:
1199        return True
1200
1201    @synchronized
1202    def known_past_version(
1203            self,
1204            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1205    ) -> bool:
1206        """Indicate whether an API record for this collection has been seen before
1207
1208        As this collection object loads records from the API server, it records
1209        their `modified_at` and `portable_data_hash` fields. This method accepts
1210        a 2-tuple with values for those fields, and returns `True` if the
1211        combination was previously loaded.
1212        """
1213        return modified_at_and_portable_data_hash in self._past_versions
1214
1215    @synchronized
1216    @retry_method
1217    def update(
1218            self,
1219            other: Optional['Collection']=None,
1220            num_retries: Optional[int]=None,
1221    ) -> None:
1222        """Merge another collection's contents into this one
1223
1224        This method compares the manifest of this collection instance with
1225        another, then updates this instance's manifest with changes from the
1226        other, renaming files to flag conflicts where necessary.
1227
1228        When called without any arguments, this method reloads the collection's
1229        API record, and updates this instance with any changes that have
1230        appeared server-side. If this instance does not have a corresponding
1231        API record, this method raises `arvados.errors.ArgumentError`.
1232
1233        Arguments:
1234
1235        * other: arvados.collection.Collection | None --- The collection
1236          whose contents should be merged into this instance. When not
1237          provided, this method reloads this collection's API record and
1238          constructs a Collection object from it.  If this instance does not
1239          have a corresponding API record, this method raises
1240          `arvados.errors.ArgumentError`.
1241
1242        * num_retries: int | None --- The number of times to retry reloading
1243          the collection's API record from the API server. If not specified,
1244          uses the `num_retries` provided when this instance was constructed.
1245        """
1246        if other is None:
1247            if self._manifest_locator is None:
1248                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1249            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1250            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1251                response.get("portable_data_hash") != self.portable_data_hash()):
1252                # The record on the server is different from our current one, but we've seen it before,
1253                # so ignore it because it's already been merged.
1254                # However, if it's the same as our current record, proceed with the update, because we want to update
1255                # our tokens.
1256                return
1257            else:
1258                self._remember_api_response(response)
1259            other = CollectionReader(response["manifest_text"])
1260        baseline = CollectionReader(self._manifest_text)
1261        self.apply(baseline.diff(other))
1262        self._manifest_text = self.manifest_text()
1263
1264    @synchronized
1265    def _my_api(self):
1266        if self._api_client is None:
1267            self._api_client = ThreadSafeApiCache(self._config, version='v1')
1268            if self._keep_client is None:
1269                self._keep_client = self._api_client.keep
1270        return self._api_client
1271
1272    @synchronized
1273    def _my_keep(self):
1274        if self._keep_client is None:
1275            if self._api_client is None:
1276                self._my_api()
1277            else:
1278                self._keep_client = KeepClient(api_client=self._api_client)
1279        return self._keep_client
1280
1281    @synchronized
1282    def _my_block_manager(self):
1283        if self._block_manager is None:
1284            copies = (self.replication_desired or
1285                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1286                                                   2))
1287            self._block_manager = _BlockManager(self._my_keep(),
1288                                                copies=copies,
1289                                                put_threads=self.put_threads,
1290                                                num_retries=self.num_retries,
1291                                                storage_classes_func=self.storage_classes_desired)
1292        return self._block_manager
1293
1294    def _remember_api_response(self, response):
1295        self._api_response = response
1296        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1297
1298    def _populate_from_api_server(self):
1299        # As in KeepClient itself, we must wait until the last
1300        # possible moment to instantiate an API client, in order to
1301        # avoid tripping up clients that don't have access to an API
1302        # server.  If we do build one, make sure our Keep client uses
1303        # it.  If instantiation fails, we'll fall back to the except
1304        # clause, just like any other Collection lookup
1305        # failure. Return an exception, or None if successful.
1306        self._remember_api_response(self._my_api().collections().get(
1307            uuid=self._manifest_locator).execute(
1308                num_retries=self.num_retries))
1309        self._manifest_text = self._api_response['manifest_text']
1310        self._portable_data_hash = self._api_response['portable_data_hash']
1311        # If not overriden via kwargs, we should try to load the
1312        # replication_desired and storage_classes_desired from the API server
1313        if self.replication_desired is None:
1314            self.replication_desired = self._api_response.get('replication_desired', None)
1315        if self._storage_classes_desired is None:
1316            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1317
1318    def _populate(self):
1319        if self._manifest_text is None:
1320            if self._manifest_locator is None:
1321                return
1322            else:
1323                self._populate_from_api_server()
1324        self._baseline_manifest = self._manifest_text
1325        self._import_manifest(self._manifest_text)
1326
1327    def _has_collection_uuid(self):
1328        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1329
1330    def _has_local_collection_uuid(self):
1331        return self._has_collection_uuid and \
1332            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1333
1334    def __enter__(self):
1335        return self
1336
1337    def __exit__(self, exc_type, exc_value, traceback):
1338        """Exit a context with this collection instance
1339
1340        If no exception was raised inside the context block, and this
1341        collection is writable and has a corresponding API record, that
1342        record will be updated to match the state of this instance at the end
1343        of the block.
1344        """
1345        if exc_type is None:
1346            if self.writable() and self._has_collection_uuid():
1347                self.save()
1348        self.stop_threads()
1349
1350    def stop_threads(self) -> None:
1351        """Stop background Keep upload/download threads"""
1352        if self._block_manager is not None:
1353            self._block_manager.stop_threads()
1354
1355    @synchronized
1356    def manifest_locator(self) -> Optional[str]:
1357        """Get this collection's manifest locator, if any
1358
1359        * If this collection instance is associated with an API record with a
1360          UUID, return that.
1361        * Otherwise, if this collection instance was loaded from an API record
1362          by portable data hash, return that.
1363        * Otherwise, return `None`.
1364        """
1365        return self._manifest_locator
1366
1367    @synchronized
1368    def clone(
1369            self,
1370            new_parent: Optional['Collection']=None,
1371            new_name: Optional[str]=None,
1372            readonly: bool=False,
1373            new_config: Optional[Mapping[str, str]]=None,
1374    ) -> 'Collection':
1375        """Create a Collection object with the same contents as this instance
1376
1377        This method creates a new Collection object with contents that match
1378        this instance's. The new collection will not be associated with any API
1379        record.
1380
1381        Arguments:
1382
1383        * new_parent: arvados.collection.Collection | None --- This value is
1384          passed to the new Collection's constructor as the `parent`
1385          argument.
1386
1387        * new_name: str | None --- This value is unused.
1388
1389        * readonly: bool --- If this value is true, this method constructs and
1390          returns a `CollectionReader`. Otherwise, it returns a mutable
1391          `Collection`. Default `False`.
1392
1393        * new_config: Mapping[str, str] | None --- This value is passed to the
1394          new Collection's constructor as `apiconfig`. If no value is provided,
1395          defaults to the configuration passed to this instance's constructor.
1396        """
1397        if new_config is None:
1398            new_config = self._config
1399        if readonly:
1400            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1401        else:
1402            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1403
1404        newcollection._clonefrom(self)
1405        return newcollection
1406
1407    @synchronized
1408    def api_response(self) -> Optional[Dict[str, Any]]:
1409        """Get this instance's associated API record
1410
1411        If this Collection instance has an associated API record, return it.
1412        Otherwise, return `None`.
1413        """
1414        return self._api_response
1415
1416    def find_or_create(
1417            self,
1418            path: str,
1419            create_type: CreateType,
1420    ) -> CollectionItem:
1421        if path == ".":
1422            return self
1423        else:
1424            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1425
1426    def find(self, path: str) -> CollectionItem:
1427        if path == ".":
1428            return self
1429        else:
1430            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1431
1432    def remove(self, path: str, recursive: bool=False) -> None:
1433        if path == ".":
1434            raise errors.ArgumentError("Cannot remove '.'")
1435        else:
1436            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1437
1438    @must_be_writable
1439    @synchronized
1440    @retry_method
1441    def save(
1442            self,
1443            properties: Optional[Properties]=None,
1444            storage_classes: Optional[StorageClasses]=None,
1445            trash_at: Optional[datetime.datetime]=None,
1446            merge: bool=True,
1447            num_retries: Optional[int]=None,
1448            preserve_version: bool=False,
1449    ) -> str:
1450        """Save collection to an existing API record
1451
1452        This method updates the instance's corresponding API record to match
1453        the instance's state. If this instance does not have a corresponding API
1454        record yet, raises `AssertionError`. (To create a new API record, use
1455        `Collection.save_new`.) This method returns the saved collection
1456        manifest.
1457
1458        Arguments:
1459
1460        * properties: dict[str, Any] | None --- If provided, the API record will
1461          be updated with these properties. Note this will completely replace
1462          any existing properties.
1463
1464        * storage_classes: list[str] | None --- If provided, the API record will
1465          be updated with this value in the `storage_classes_desired` field.
1466          This value will also be saved on the instance and used for any
1467          changes that follow.
1468
1469        * trash_at: datetime.datetime | None --- If provided, the API record
1470          will be updated with this value in the `trash_at` field.
1471
1472        * merge: bool --- If `True` (the default), this method will first
1473          reload this collection's API record, and merge any new contents into
1474          this instance before saving changes. See `Collection.update` for
1475          details.
1476
1477        * num_retries: int | None --- The number of times to retry reloading
1478          the collection's API record from the API server. If not specified,
1479          uses the `num_retries` provided when this instance was constructed.
1480
1481        * preserve_version: bool --- This value will be passed to directly
1482          to the underlying API call. If `True`, the Arvados API will
1483          preserve the versions of this collection both immediately before
1484          and after the update. If `True` when the API server is not
1485          configured with collection versioning, this method raises
1486          `arvados.errors.ArgumentError`.
1487        """
1488        if properties and type(properties) is not dict:
1489            raise errors.ArgumentError("properties must be dictionary type.")
1490
1491        if storage_classes and type(storage_classes) is not list:
1492            raise errors.ArgumentError("storage_classes must be list type.")
1493        if storage_classes:
1494            self._storage_classes_desired = storage_classes
1495
1496        if trash_at and type(trash_at) is not datetime.datetime:
1497            raise errors.ArgumentError("trash_at must be datetime type.")
1498
1499        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1500            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1501
1502        body={}
1503        if properties:
1504            body["properties"] = properties
1505        if self.storage_classes_desired():
1506            body["storage_classes_desired"] = self.storage_classes_desired()
1507        if trash_at:
1508            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1509            body["trash_at"] = t
1510        if preserve_version:
1511            body["preserve_version"] = preserve_version
1512
1513        if not self.committed():
1514            if self._has_remote_blocks:
1515                # Copy any remote blocks to the local cluster.
1516                self._copy_remote_blocks(remote_blocks={})
1517                self._has_remote_blocks = False
1518            if not self._has_collection_uuid():
1519                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1520            elif not self._has_local_collection_uuid():
1521                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1522
1523            self._my_block_manager().commit_all()
1524
1525            if merge:
1526                self.update()
1527
1528            text = self.manifest_text(strip=False)
1529            body['manifest_text'] = text
1530
1531            self._remember_api_response(self._my_api().collections().update(
1532                uuid=self._manifest_locator,
1533                body=body
1534                ).execute(num_retries=num_retries))
1535            self._manifest_text = self._api_response["manifest_text"]
1536            self._portable_data_hash = self._api_response["portable_data_hash"]
1537            self.set_committed(True)
1538        elif body:
1539            self._remember_api_response(self._my_api().collections().update(
1540                uuid=self._manifest_locator,
1541                body=body
1542                ).execute(num_retries=num_retries))
1543
1544        return self._manifest_text
1545
1546
1547    @must_be_writable
1548    @synchronized
1549    @retry_method
1550    def save_new(
1551            self,
1552            name: Optional[str]=None,
1553            create_collection_record: bool=True,
1554            owner_uuid: Optional[str]=None,
1555            properties: Optional[Properties]=None,
1556            storage_classes: Optional[StorageClasses]=None,
1557            trash_at: Optional[datetime.datetime]=None,
1558            ensure_unique_name: bool=False,
1559            num_retries: Optional[int]=None,
1560            preserve_version: bool=False,
1561    ):
1562        """Save collection to a new API record
1563
1564        This method finishes uploading new data blocks and (optionally)
1565        creates a new API collection record with the provided data. If a new
1566        record is created, this instance becomes associated with that record
1567        for future updates like `save()`. This method returns the saved
1568        collection manifest.
1569
1570        Arguments:
1571
1572        * name: str | None --- The `name` field to use on the new collection
1573          record. If not specified, a generic default name is generated.
1574
1575        * create_collection_record: bool --- If `True` (the default), creates a
1576          collection record on the API server. If `False`, the method finishes
1577          all data uploads and only returns the resulting collection manifest
1578          without sending it to the API server.
1579
1580        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1581          new collection record.
1582
1583        * properties: dict[str, Any] | None --- The `properties` field to use on
1584          the new collection record.
1585
1586        * storage_classes: list[str] | None --- The
1587          `storage_classes_desired` field to use on the new collection record.
1588
1589        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1590          on the new collection record.
1591
1592        * ensure_unique_name: bool --- This value is passed directly to the
1593          Arvados API when creating the collection record. If `True`, the API
1594          server may modify the submitted `name` to ensure the collection's
1595          `name`+`owner_uuid` combination is unique. If `False` (the default),
1596          if a collection already exists with this same `name`+`owner_uuid`
1597          combination, creating a collection record will raise a validation
1598          error.
1599
1600        * num_retries: int | None --- The number of times to retry reloading
1601          the collection's API record from the API server. If not specified,
1602          uses the `num_retries` provided when this instance was constructed.
1603
1604        * preserve_version: bool --- This value will be passed to directly
1605          to the underlying API call. If `True`, the Arvados API will
1606          preserve the versions of this collection both immediately before
1607          and after the update. If `True` when the API server is not
1608          configured with collection versioning, this method raises
1609          `arvados.errors.ArgumentError`.
1610        """
1611        if properties and type(properties) is not dict:
1612            raise errors.ArgumentError("properties must be dictionary type.")
1613
1614        if storage_classes and type(storage_classes) is not list:
1615            raise errors.ArgumentError("storage_classes must be list type.")
1616
1617        if trash_at and type(trash_at) is not datetime.datetime:
1618            raise errors.ArgumentError("trash_at must be datetime type.")
1619
1620        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1621            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1622
1623        if self._has_remote_blocks:
1624            # Copy any remote blocks to the local cluster.
1625            self._copy_remote_blocks(remote_blocks={})
1626            self._has_remote_blocks = False
1627
1628        if storage_classes:
1629            self._storage_classes_desired = storage_classes
1630
1631        self._my_block_manager().commit_all()
1632        text = self.manifest_text(strip=False)
1633
1634        if create_collection_record:
1635            if name is None:
1636                name = "New collection"
1637                ensure_unique_name = True
1638
1639            body = {"manifest_text": text,
1640                    "name": name,
1641                    "replication_desired": self.replication_desired}
1642            if owner_uuid:
1643                body["owner_uuid"] = owner_uuid
1644            if properties:
1645                body["properties"] = properties
1646            if self.storage_classes_desired():
1647                body["storage_classes_desired"] = self.storage_classes_desired()
1648            if trash_at:
1649                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1650                body["trash_at"] = t
1651            if preserve_version:
1652                body["preserve_version"] = preserve_version
1653
1654            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1655            text = self._api_response["manifest_text"]
1656
1657            self._manifest_locator = self._api_response["uuid"]
1658            self._portable_data_hash = self._api_response["portable_data_hash"]
1659
1660            self._manifest_text = text
1661            self.set_committed(True)
1662
1663        return text
1664
1665    _token_re = re.compile(r'(\S+)(\s+|$)')
1666    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1667    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1668
1669    def _unescape_manifest_path(self, path):
1670        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1671
1672    @synchronized
1673    def _import_manifest(self, manifest_text):
1674        """Import a manifest into a `Collection`.
1675
1676        :manifest_text:
1677          The manifest text to import from.
1678
1679        """
1680        if len(self) > 0:
1681            raise ArgumentError("Can only import manifest into an empty collection")
1682
1683        STREAM_NAME = 0
1684        BLOCKS = 1
1685        SEGMENTS = 2
1686
1687        stream_name = None
1688        state = STREAM_NAME
1689
1690        for token_and_separator in self._token_re.finditer(manifest_text):
1691            tok = token_and_separator.group(1)
1692            sep = token_and_separator.group(2)
1693
1694            if state == STREAM_NAME:
1695                # starting a new stream
1696                stream_name = self._unescape_manifest_path(tok)
1697                blocks = []
1698                segments = []
1699                streamoffset = 0
1700                state = BLOCKS
1701                self.find_or_create(stream_name, COLLECTION)
1702                continue
1703
1704            if state == BLOCKS:
1705                block_locator = self._block_re.match(tok)
1706                if block_locator:
1707                    blocksize = int(block_locator.group(1))
1708                    blocks.append(Range(tok, streamoffset, blocksize, 0))
1709                    streamoffset += blocksize
1710                else:
1711                    state = SEGMENTS
1712
1713            if state == SEGMENTS:
1714                file_segment = self._segment_re.match(tok)
1715                if file_segment:
1716                    pos = int(file_segment.group(1))
1717                    size = int(file_segment.group(2))
1718                    name = self._unescape_manifest_path(file_segment.group(3))
1719                    if name.split('/')[-1] == '.':
1720                        # placeholder for persisting an empty directory, not a real file
1721                        if len(name) > 2:
1722