arvados.collection

Tools to work with Arvados collections

This module provides high-level interfaces to create, read, and update Arvados collections. Most users will want to instantiate Collection objects, and use methods like Collection.open and Collection.mkdirs to read and write data in the collection. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4"""Tools to work with Arvados collections
   5
   6This module provides high-level interfaces to create, read, and update
   7Arvados collections. Most users will want to instantiate `Collection`
   8objects, and use methods like `Collection.open` and `Collection.mkdirs` to
   9read and write data in the collection. Refer to the Arvados Python SDK
  10cookbook for [an introduction to using the Collection class][cookbook].
  11
  12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
  13"""
  14
  15import ciso8601
  16import datetime
  17import errno
  18import functools
  19import hashlib
  20import io
  21import logging
  22import os
  23import re
  24import sys
  25import threading
  26import time
  27
  28from collections import deque
  29from stat import *
  30
  31from ._internal import streams
  32from .api import ThreadSafeAPIClient
  33from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock, ADD, DEL, MOD, TOK, WRITE
  34from .keep import KeepLocator, KeepClient
  35import arvados.config as config
  36import arvados.errors as errors
  37import arvados.util
  38import arvados.events as events
  39from arvados.retry import retry_method
  40
  41from typing import (
  42    Any,
  43    Callable,
  44    Dict,
  45    IO,
  46    Iterator,
  47    List,
  48    Mapping,
  49    Optional,
  50    Tuple,
  51    Union,
  52)
  53
  54if sys.version_info < (3, 8):
  55    from typing_extensions import Literal
  56else:
  57    from typing import Literal
  58
  59_logger = logging.getLogger('arvados.collection')
  60
  61
  62FILE = "file"
  63"""`create_type` value for `Collection.find_or_create`"""
  64COLLECTION = "collection"
  65"""`create_type` value for `Collection.find_or_create`"""
  66
  67ChangeList = List[Union[
  68    Tuple[Literal[ADD, DEL], str, 'Collection'],
  69    Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
  70]]
  71ChangeType = Literal[ADD, DEL, MOD, TOK]
  72CollectionItem = Union[ArvadosFile, 'Collection']
  73ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
  74CreateType = Literal[COLLECTION, FILE]
  75Properties = Dict[str, Any]
  76StorageClasses = List[str]
  77
  78class CollectionBase(object):
  79    """Abstract base class for Collection classes
  80
  81    .. ATTENTION:: Internal
  82       This class is meant to be used by other parts of the SDK. User code
  83       should instantiate or subclass `Collection` or one of its subclasses
  84       directly.
  85    """
  86
  87    def __enter__(self):
  88        """Enter a context block with this collection instance"""
  89        return self
  90
  91    def __exit__(self, exc_type, exc_value, traceback):
  92        """Exit a context block with this collection instance"""
  93        pass
  94
  95    def _my_keep(self):
  96        if self._keep_client is None:
  97            self._keep_client = KeepClient(api_client=self._api_client,
  98                                           num_retries=self.num_retries)
  99        return self._keep_client
 100
 101    def stripped_manifest(self) -> str:
 102        """Create a copy of the collection manifest with only size hints
 103
 104        This method returns a string with the current collection's manifest
 105        text with all non-portable locator hints like permission hints and
 106        remote cluster hints removed. The only hints in the returned manifest
 107        will be size hints.
 108        """
 109        raw = self.manifest_text()
 110        clean = []
 111        for line in raw.split("\n"):
 112            fields = line.split()
 113            if fields:
 114                clean_fields = fields[:1] + [
 115                    (re.sub(r'\+[^\d][^\+]*', '', x)
 116                     if re.match(arvados.util.keep_locator_pattern, x)
 117                     else x)
 118                    for x in fields[1:]]
 119                clean += [' '.join(clean_fields), "\n"]
 120        return ''.join(clean)
 121
 122
 123class _WriterFile(_FileLikeObjectBase):
 124    def __init__(self, coll_writer, name):
 125        super(_WriterFile, self).__init__(name, 'wb')
 126        self.dest = coll_writer
 127
 128    def close(self):
 129        super(_WriterFile, self).close()
 130        self.dest.finish_current_file()
 131
 132    @_FileLikeObjectBase._before_close
 133    def write(self, data):
 134        self.dest.write(data)
 135
 136    @_FileLikeObjectBase._before_close
 137    def writelines(self, seq):
 138        for data in seq:
 139            self.write(data)
 140
 141    @_FileLikeObjectBase._before_close
 142    def flush(self):
 143        self.dest.flush_data()
 144
 145
 146class RichCollectionBase(CollectionBase):
 147    """Base class for Collection classes
 148
 149    .. ATTENTION:: Internal
 150       This class is meant to be used by other parts of the SDK. User code
 151       should instantiate or subclass `Collection` or one of its subclasses
 152       directly.
 153    """
 154
 155    def __init__(self, parent=None):
 156        self.parent = parent
 157        self._committed = False
 158        self._has_remote_blocks = False
 159        self._callback = None
 160        self._items = {}
 161
 162    def _my_api(self):
 163        raise NotImplementedError()
 164
 165    def _my_keep(self):
 166        raise NotImplementedError()
 167
 168    def _my_block_manager(self):
 169        raise NotImplementedError()
 170
 171    def writable(self) -> bool:
 172        """Indicate whether this collection object can be modified
 173
 174        This method returns `False` if this object is a `CollectionReader`,
 175        else `True`.
 176        """
 177        raise NotImplementedError()
 178
 179    def root_collection(self) -> 'Collection':
 180        """Get this collection's root collection object
 181
 182        If you open a subcollection with `Collection.find`, calling this method
 183        on that subcollection returns the source Collection object.
 184        """
 185        raise NotImplementedError()
 186
 187    def stream_name(self) -> str:
 188        """Get the name of the manifest stream represented by this collection
 189
 190        If you open a subcollection with `Collection.find`, calling this method
 191        on that subcollection returns the name of the stream you opened.
 192        """
 193        raise NotImplementedError()
 194
 195    @synchronized
 196    def has_remote_blocks(self) -> bool:
 197        """Indiciate whether the collection refers to remote data
 198
 199        Returns `True` if the collection manifest includes any Keep locators
 200        with a remote hint (`+R`), else `False`.
 201        """
 202        if self._has_remote_blocks:
 203            return True
 204        for item in self:
 205            if self[item].has_remote_blocks():
 206                return True
 207        return False
 208
 209    @synchronized
 210    def set_has_remote_blocks(self, val: bool) -> None:
 211        """Cache whether this collection refers to remote blocks
 212
 213        .. ATTENTION:: Internal
 214           This method is only meant to be used by other Collection methods.
 215
 216        Set this collection's cached "has remote blocks" flag to the given
 217        value.
 218        """
 219        self._has_remote_blocks = val
 220        if self.parent:
 221            self.parent.set_has_remote_blocks(val)
 222
 223    @must_be_writable
 224    @synchronized
 225    def find_or_create(
 226            self,
 227            path: str,
 228            create_type: CreateType,
 229    ) -> CollectionItem:
 230        """Get the item at the given path, creating it if necessary
 231
 232        If `path` refers to a stream in this collection, returns a
 233        corresponding `Subcollection` object. If `path` refers to a file in
 234        this collection, returns a corresponding
 235        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 236        this collection, then this method creates a new object and returns
 237        it, creating parent streams as needed. The type of object created is
 238        determined by the value of `create_type`.
 239
 240        Arguments:
 241
 242        * path: str --- The path to find or create within this collection.
 243
 244        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 245          create at `path` if one does not exist. Passing `COLLECTION`
 246          creates a stream and returns the corresponding
 247          `Subcollection`. Passing `FILE` creates a new file and returns the
 248          corresponding `arvados.arvfile.ArvadosFile`.
 249        """
 250        pathcomponents = path.split("/", 1)
 251        if pathcomponents[0]:
 252            item = self._items.get(pathcomponents[0])
 253            if len(pathcomponents) == 1:
 254                if item is None:
 255                    # create new file
 256                    if create_type == COLLECTION:
 257                        item = Subcollection(self, pathcomponents[0])
 258                    else:
 259                        item = ArvadosFile(self, pathcomponents[0])
 260                    self._items[pathcomponents[0]] = item
 261                    self.set_committed(False)
 262                    self.notify(ADD, self, pathcomponents[0], item)
 263                return item
 264            else:
 265                if item is None:
 266                    # create new collection
 267                    item = Subcollection(self, pathcomponents[0])
 268                    self._items[pathcomponents[0]] = item
 269                    self.set_committed(False)
 270                    self.notify(ADD, self, pathcomponents[0], item)
 271                if isinstance(item, RichCollectionBase):
 272                    return item.find_or_create(pathcomponents[1], create_type)
 273                else:
 274                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 275        else:
 276            return self
 277
 278    @synchronized
 279    def find(self, path: str) -> CollectionItem:
 280        """Get the item at the given path
 281
 282        If `path` refers to a stream in this collection, returns a
 283        corresponding `Subcollection` object. If `path` refers to a file in
 284        this collection, returns a corresponding
 285        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 286        this collection, then this method raises `NotADirectoryError`.
 287
 288        Arguments:
 289
 290        * path: str --- The path to find or create within this collection.
 291        """
 292        if not path:
 293            raise errors.ArgumentError("Parameter 'path' is empty.")
 294
 295        pathcomponents = path.split("/", 1)
 296        if pathcomponents[0] == '':
 297            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 298
 299        item = self._items.get(pathcomponents[0])
 300        if item is None:
 301            return None
 302        elif len(pathcomponents) == 1:
 303            return item
 304        else:
 305            if isinstance(item, RichCollectionBase):
 306                if pathcomponents[1]:
 307                    return item.find(pathcomponents[1])
 308                else:
 309                    return item
 310            else:
 311                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 312
 313    @synchronized
 314    def mkdirs(self, path: str) -> 'Subcollection':
 315        """Create and return a subcollection at `path`
 316
 317        If `path` exists within this collection, raises `FileExistsError`.
 318        Otherwise, creates a stream at that path and returns the
 319        corresponding `Subcollection`.
 320        """
 321        if self.find(path) != None:
 322            raise IOError(errno.EEXIST, "Directory or file exists", path)
 323
 324        return self.find_or_create(path, COLLECTION)
 325
 326    def open(
 327            self,
 328            path: str,
 329            mode: str="r",
 330            encoding: Optional[str]=None
 331    ) -> IO:
 332        """Open a file-like object within the collection
 333
 334        This method returns a file-like object that can read and/or write the
 335        file located at `path` within the collection. If you attempt to write
 336        a `path` that does not exist, the file is created with `find_or_create`.
 337        If the file cannot be opened for any other reason, this method raises
 338        `OSError` with an appropriate errno.
 339
 340        Arguments:
 341
 342        * path: str --- The path of the file to open within this collection
 343
 344        * mode: str --- The mode to open this file. Supports all the same
 345          values as `builtins.open`.
 346
 347        * encoding: str | None --- The text encoding of the file. Only used
 348          when the file is opened in text mode. The default is
 349          platform-dependent.
 350
 351        """
 352        if not re.search(r'^[rwa][bt]?\+?$', mode):
 353            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 354
 355        if mode[0] == 'r' and '+' not in mode:
 356            fclass = ArvadosFileReader
 357            arvfile = self.find(path)
 358        elif not self.writable():
 359            raise IOError(errno.EROFS, "Collection is read only")
 360        else:
 361            fclass = ArvadosFileWriter
 362            arvfile = self.find_or_create(path, FILE)
 363
 364        if arvfile is None:
 365            raise IOError(errno.ENOENT, "File not found", path)
 366        if not isinstance(arvfile, ArvadosFile):
 367            raise IOError(errno.EISDIR, "Is a directory", path)
 368
 369        if mode[0] == 'w':
 370            arvfile.truncate(0)
 371
 372        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 373        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 374        if 'b' not in mode:
 375            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 376            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 377        return f
 378
 379    def modified(self) -> bool:
 380        """Indicate whether this collection has an API server record
 381
 382        Returns `False` if this collection corresponds to a record loaded from
 383        the API server, `True` otherwise.
 384        """
 385        return not self.committed()
 386
 387    @synchronized
 388    def committed(self):
 389        """Indicate whether this collection has an API server record
 390
 391        Returns `True` if this collection corresponds to a record loaded from
 392        the API server, `False` otherwise.
 393        """
 394        return self._committed
 395
 396    @synchronized
 397    def set_committed(self, value: bool=True):
 398        """Cache whether this collection has an API server record
 399
 400        .. ATTENTION:: Internal
 401           This method is only meant to be used by other Collection methods.
 402
 403        Set this collection's cached "committed" flag to the given
 404        value and propagates it as needed.
 405        """
 406        if value == self._committed:
 407            return
 408        if value:
 409            for k,v in self._items.items():
 410                v.set_committed(True)
 411            self._committed = True
 412        else:
 413            self._committed = False
 414            if self.parent is not None:
 415                self.parent.set_committed(False)
 416
 417    @synchronized
 418    def __iter__(self) -> Iterator[str]:
 419        """Iterate names of streams and files in this collection
 420
 421        This method does not recurse. It only iterates the contents of this
 422        collection's corresponding stream.
 423        """
 424        return iter(self._items)
 425
 426    @synchronized
 427    def __getitem__(self, k: str) -> CollectionItem:
 428        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 429
 430        This method does not recurse. If you want to search a path, use
 431        `RichCollectionBase.find` instead.
 432        """
 433        return self._items[k]
 434
 435    @synchronized
 436    def __contains__(self, k: str) -> bool:
 437        """Indicate whether this collection has an item with this name
 438
 439        This method does not recurse. It you want to check a path, use
 440        `RichCollectionBase.exists` instead.
 441        """
 442        return k in self._items
 443
 444    @synchronized
 445    def __len__(self):
 446        """Get the number of items directly contained in this collection
 447
 448        This method does not recurse. It only counts the streams and files
 449        in this collection's corresponding stream.
 450        """
 451        return len(self._items)
 452
 453    @must_be_writable
 454    @synchronized
 455    def __delitem__(self, p: str) -> None:
 456        """Delete an item from this collection's stream
 457
 458        This method does not recurse. If you want to remove an item by a
 459        path, use `RichCollectionBase.remove` instead.
 460        """
 461        del self._items[p]
 462        self.set_committed(False)
 463        self.notify(DEL, self, p, None)
 464
 465    @synchronized
 466    def keys(self) -> Iterator[str]:
 467        """Iterate names of streams and files in this collection
 468
 469        This method does not recurse. It only iterates the contents of this
 470        collection's corresponding stream.
 471        """
 472        return self._items.keys()
 473
 474    @synchronized
 475    def values(self) -> List[CollectionItem]:
 476        """Get a list of objects in this collection's stream
 477
 478        The return value includes a `Subcollection` for every stream, and an
 479        `arvados.arvfile.ArvadosFile` for every file, directly within this
 480        collection's stream.  This method does not recurse.
 481        """
 482        return list(self._items.values())
 483
 484    @synchronized
 485    def items(self) -> List[Tuple[str, CollectionItem]]:
 486        """Get a list of `(name, object)` tuples from this collection's stream
 487
 488        The return value includes a `Subcollection` for every stream, and an
 489        `arvados.arvfile.ArvadosFile` for every file, directly within this
 490        collection's stream.  This method does not recurse.
 491        """
 492        return list(self._items.items())
 493
 494    def exists(self, path: str) -> bool:
 495        """Indicate whether this collection includes an item at `path`
 496
 497        This method returns `True` if `path` refers to a stream or file within
 498        this collection, else `False`.
 499
 500        Arguments:
 501
 502        * path: str --- The path to check for existence within this collection
 503        """
 504        return self.find(path) is not None
 505
 506    @must_be_writable
 507    @synchronized
 508    def remove(self, path: str, recursive: bool=False) -> None:
 509        """Remove the file or stream at `path`
 510
 511        Arguments:
 512
 513        * path: str --- The path of the item to remove from the collection
 514
 515        * recursive: bool --- Controls the method's behavior if `path` refers
 516          to a nonempty stream. If `False` (the default), this method raises
 517          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 518          items under the stream.
 519        """
 520        if not path:
 521            raise errors.ArgumentError("Parameter 'path' is empty.")
 522
 523        pathcomponents = path.split("/", 1)
 524        item = self._items.get(pathcomponents[0])
 525        if item is None:
 526            raise IOError(errno.ENOENT, "File not found", path)
 527        if len(pathcomponents) == 1:
 528            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 529                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 530            deleteditem = self._items[pathcomponents[0]]
 531            del self._items[pathcomponents[0]]
 532            self.set_committed(False)
 533            self.notify(DEL, self, pathcomponents[0], deleteditem)
 534        else:
 535            item.remove(pathcomponents[1], recursive=recursive)
 536
 537    def _clonefrom(self, source):
 538        for k,v in source.items():
 539            self._items[k] = v.clone(self, k)
 540
 541    def clone(self):
 542        raise NotImplementedError()
 543
 544    @must_be_writable
 545    @synchronized
 546    def add(
 547            self,
 548            source_obj: CollectionItem,
 549            target_name: str,
 550            overwrite: bool=False,
 551            reparent: bool=False,
 552    ) -> None:
 553        """Copy or move a file or subcollection object to this collection
 554
 555        Arguments:
 556
 557        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 558          to add to this collection
 559
 560        * target_name: str --- The path inside this collection where
 561          `source_obj` should be added.
 562
 563        * overwrite: bool --- Controls the behavior of this method when the
 564          collection already contains an object at `target_name`. If `False`
 565          (the default), this method will raise `FileExistsError`. If `True`,
 566          the object at `target_name` will be replaced with `source_obj`.
 567
 568        * reparent: bool --- Controls whether this method copies or moves
 569          `source_obj`. If `False` (the default), `source_obj` is copied into
 570          this collection. If `True`, `source_obj` is moved into this
 571          collection.
 572        """
 573        if target_name in self and not overwrite:
 574            raise IOError(errno.EEXIST, "File already exists", target_name)
 575
 576        modified_from = None
 577        if target_name in self:
 578            modified_from = self[target_name]
 579
 580        # Actually make the move or copy.
 581        if reparent:
 582            source_obj._reparent(self, target_name)
 583            item = source_obj
 584        else:
 585            item = source_obj.clone(self, target_name)
 586
 587        self._items[target_name] = item
 588        self.set_committed(False)
 589        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 590            self.set_has_remote_blocks(True)
 591
 592        if modified_from:
 593            self.notify(MOD, self, target_name, (modified_from, item))
 594        else:
 595            self.notify(ADD, self, target_name, item)
 596
 597    def _get_src_target(self, source, target_path, source_collection, create_dest):
 598        if source_collection is None:
 599            source_collection = self
 600
 601        # Find the object
 602        if isinstance(source, str):
 603            source_obj = source_collection.find(source)
 604            if source_obj is None:
 605                raise IOError(errno.ENOENT, "File not found", source)
 606            sourcecomponents = source.split("/")
 607        else:
 608            source_obj = source
 609            sourcecomponents = None
 610
 611        # Find parent collection the target path
 612        targetcomponents = target_path.split("/")
 613
 614        # Determine the name to use.
 615        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 616
 617        if not target_name:
 618            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 619
 620        if create_dest:
 621            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 622        else:
 623            if len(targetcomponents) > 1:
 624                target_dir = self.find("/".join(targetcomponents[0:-1]))
 625            else:
 626                target_dir = self
 627
 628        if target_dir is None:
 629            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 630
 631        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 632            target_dir = target_dir[target_name]
 633            target_name = sourcecomponents[-1]
 634
 635        return (source_obj, target_dir, target_name)
 636
 637    @must_be_writable
 638    @synchronized
 639    def copy(
 640            self,
 641            source: Union[str, CollectionItem],
 642            target_path: str,
 643            source_collection: Optional['RichCollectionBase']=None,
 644            overwrite: bool=False,
 645    ) -> None:
 646        """Copy a file or subcollection object to this collection
 647
 648        Arguments:
 649
 650        * source: str | arvados.arvfile.ArvadosFile |
 651          arvados.collection.Subcollection --- The file or subcollection to
 652          add to this collection. If `source` is a str, the object will be
 653          found by looking up this path from `source_collection` (see
 654          below).
 655
 656        * target_path: str --- The path inside this collection where the
 657          source object should be added.
 658
 659        * source_collection: arvados.collection.Collection | None --- The
 660          collection to find the source object from when `source` is a
 661          path. Defaults to the current collection (`self`).
 662
 663        * overwrite: bool --- Controls the behavior of this method when the
 664          collection already contains an object at `target_path`. If `False`
 665          (the default), this method will raise `FileExistsError`. If `True`,
 666          the object at `target_path` will be replaced with `source_obj`.
 667        """
 668        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 669        target_dir.add(source_obj, target_name, overwrite, False)
 670
 671    @must_be_writable
 672    @synchronized
 673    def rename(
 674            self,
 675            source: Union[str, CollectionItem],
 676            target_path: str,
 677            source_collection: Optional['RichCollectionBase']=None,
 678            overwrite: bool=False,
 679    ) -> None:
 680        """Move a file or subcollection object to this collection
 681
 682        Arguments:
 683
 684        * source: str | arvados.arvfile.ArvadosFile |
 685          arvados.collection.Subcollection --- The file or subcollection to
 686          add to this collection. If `source` is a str, the object will be
 687          found by looking up this path from `source_collection` (see
 688          below).
 689
 690        * target_path: str --- The path inside this collection where the
 691          source object should be added.
 692
 693        * source_collection: arvados.collection.Collection | None --- The
 694          collection to find the source object from when `source` is a
 695          path. Defaults to the current collection (`self`).
 696
 697        * overwrite: bool --- Controls the behavior of this method when the
 698          collection already contains an object at `target_path`. If `False`
 699          (the default), this method will raise `FileExistsError`. If `True`,
 700          the object at `target_path` will be replaced with `source_obj`.
 701        """
 702        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 703        if not source_obj.writable():
 704            raise IOError(errno.EROFS, "Source collection is read only", source)
 705        target_dir.add(source_obj, target_name, overwrite, True)
 706
 707    def portable_manifest_text(self, stream_name: str=".") -> str:
 708        """Get the portable manifest text for this collection
 709
 710        The portable manifest text is normalized, and does not include access
 711        tokens. This method does not flush outstanding blocks to Keep.
 712
 713        Arguments:
 714
 715        * stream_name: str --- The name to use for this collection's stream in
 716          the generated manifest. Default `'.'`.
 717        """
 718        return self._get_manifest_text(stream_name, True, True)
 719
 720    @synchronized
 721    def manifest_text(
 722            self,
 723            stream_name: str=".",
 724            strip: bool=False,
 725            normalize: bool=False,
 726            only_committed: bool=False,
 727    ) -> str:
 728        """Get the manifest text for this collection
 729
 730        Arguments:
 731
 732        * stream_name: str --- The name to use for this collection's stream in
 733          the generated manifest. Default `'.'`.
 734
 735        * strip: bool --- Controls whether or not the returned manifest text
 736          includes access tokens. If `False` (the default), the manifest text
 737          will include access tokens. If `True`, the manifest text will not
 738          include access tokens.
 739
 740        * normalize: bool --- Controls whether or not the returned manifest
 741          text is normalized. Default `False`.
 742
 743        * only_committed: bool --- Controls whether or not this method uploads
 744          pending data to Keep before building and returning the manifest text.
 745          If `False` (the default), this method will finish uploading all data
 746          to Keep, then return the final manifest. If `True`, this method will
 747          build and return a manifest that only refers to the data that has
 748          finished uploading at the time this method was called.
 749        """
 750        if not only_committed:
 751            self._my_block_manager().commit_all()
 752        return self._get_manifest_text(stream_name, strip, normalize,
 753                                       only_committed=only_committed)
 754
 755    @synchronized
 756    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 757        """Get the manifest text for this collection, sub collections and files.
 758
 759        :stream_name:
 760          Name to use for this stream (directory)
 761
 762        :strip:
 763          If True, remove signing tokens from block locators if present.
 764          If False (default), block locators are left unchanged.
 765
 766        :normalize:
 767          If True, always export the manifest text in normalized form
 768          even if the Collection is not modified.  If False (default) and the collection
 769          is not modified, return the original manifest text even if it is not
 770          in normalized form.
 771
 772        :only_committed:
 773          If True, only include blocks that were already committed to Keep.
 774
 775        """
 776
 777        if not self.committed() or self._manifest_text is None or normalize:
 778            stream = {}
 779            buf = []
 780            sorted_keys = sorted(self.keys())
 781            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 782                # Create a stream per file `k`
 783                arvfile = self[filename]
 784                filestream = []
 785                for segment in arvfile.segments():
 786                    loc = segment.locator
 787                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 788                        if only_committed:
 789                            continue
 790                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 791                    if strip:
 792                        loc = KeepLocator(loc).stripped()
 793                    filestream.append(streams.LocatorAndRange(
 794                        loc,
 795                        KeepLocator(loc).size,
 796                        segment.segment_offset,
 797                        segment.range_size,
 798                    ))
 799                stream[filename] = filestream
 800            if stream:
 801                buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
 802            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 803                buf.append(self[dirname].manifest_text(
 804                    stream_name=os.path.join(stream_name, dirname),
 805                    strip=strip, normalize=True, only_committed=only_committed))
 806            return "".join(buf)
 807        else:
 808            if strip:
 809                return self.stripped_manifest()
 810            else:
 811                return self._manifest_text
 812
 813    @synchronized
 814    def _copy_remote_blocks(self, remote_blocks={}):
 815        """Scan through the entire collection and ask Keep to copy remote blocks.
 816
 817        When accessing a remote collection, blocks will have a remote signature
 818        (+R instead of +A). Collect these signatures and request Keep to copy the
 819        blocks to the local cluster, returning local (+A) signatures.
 820
 821        :remote_blocks:
 822          Shared cache of remote to local block mappings. This is used to avoid
 823          doing extra work when blocks are shared by more than one file in
 824          different subdirectories.
 825
 826        """
 827        for item in self:
 828            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 829        return remote_blocks
 830
 831    @synchronized
 832    def diff(
 833            self,
 834            end_collection: 'RichCollectionBase',
 835            prefix: str=".",
 836            holding_collection: Optional['Collection']=None,
 837    ) -> ChangeList:
 838        """Build a list of differences between this collection and another
 839
 840        Arguments:
 841
 842        * end_collection: arvados.collection.RichCollectionBase --- A
 843          collection object with the desired end state. The returned diff
 844          list will describe how to go from the current collection object
 845          `self` to `end_collection`.
 846
 847        * prefix: str --- The name to use for this collection's stream in
 848          the diff list. Default `'.'`.
 849
 850        * holding_collection: arvados.collection.Collection | None --- A
 851          collection object used to hold objects for the returned diff
 852          list. By default, a new empty collection is created.
 853        """
 854        changes = []
 855        if holding_collection is None:
 856            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 857        for k in self:
 858            if k not in end_collection:
 859               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 860        for k in end_collection:
 861            if k in self:
 862                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 863                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 864                elif end_collection[k] != self[k]:
 865                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 866                else:
 867                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 868            else:
 869                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 870        return changes
 871
 872    @must_be_writable
 873    @synchronized
 874    def apply(self, changes: ChangeList) -> None:
 875        """Apply a list of changes from to this collection
 876
 877        This method takes a list of changes generated by
 878        `RichCollectionBase.diff` and applies it to this
 879        collection. Afterward, the state of this collection object will
 880        match the state of `end_collection` passed to `diff`. If a change
 881        conflicts with a local change, it will be saved to an alternate path
 882        indicating the conflict.
 883
 884        Arguments:
 885
 886        * changes: arvados.collection.ChangeList --- The list of differences
 887          generated by `RichCollectionBase.diff`.
 888        """
 889        if changes:
 890            self.set_committed(False)
 891        for change in changes:
 892            event_type = change[0]
 893            path = change[1]
 894            initial = change[2]
 895            local = self.find(path)
 896            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 897                                                                    time.gmtime()))
 898            if event_type == ADD:
 899                if local is None:
 900                    # No local file at path, safe to copy over new file
 901                    self.copy(initial, path)
 902                elif local is not None and local != initial:
 903                    # There is already local file and it is different:
 904                    # save change to conflict file.
 905                    self.copy(initial, conflictpath)
 906            elif event_type == MOD or event_type == TOK:
 907                final = change[3]
 908                if local == initial:
 909                    # Local matches the "initial" item so it has not
 910                    # changed locally and is safe to update.
 911                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 912                        # Replace contents of local file with new contents
 913                        local.replace_contents(final)
 914                    else:
 915                        # Overwrite path with new item; this can happen if
 916                        # path was a file and is now a collection or vice versa
 917                        self.copy(final, path, overwrite=True)
 918                elif event_type == MOD:
 919                    # Local doesn't match the "start" value or local
 920                    # is missing (presumably deleted) so save change
 921                    # to conflict file.  Don't do this for TOK events
 922                    # which means the file didn't change but only had
 923                    # tokens updated.
 924                    self.copy(final, conflictpath)
 925            elif event_type == DEL:
 926                if local == initial:
 927                    # Local item matches "initial" value, so it is safe to remove.
 928                    self.remove(path, recursive=True)
 929                # else, the file is modified or already removed, in either
 930                # case we don't want to try to remove it.
 931
 932    def portable_data_hash(self) -> str:
 933        """Get the portable data hash for this collection's manifest"""
 934        if self._manifest_locator and self.committed():
 935            # If the collection is already saved on the API server, and it's committed
 936            # then return API server's PDH response.
 937            return self._portable_data_hash
 938        else:
 939            stripped = self.portable_manifest_text().encode()
 940            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 941
 942    @synchronized
 943    def subscribe(self, callback: ChangeCallback) -> None:
 944        """Set a notify callback for changes to this collection
 945
 946        Arguments:
 947
 948        * callback: arvados.collection.ChangeCallback --- The callable to
 949          call each time the collection is changed.
 950        """
 951        if self._callback is None:
 952            self._callback = callback
 953        else:
 954            raise errors.ArgumentError("A callback is already set on this collection.")
 955
 956    @synchronized
 957    def unsubscribe(self) -> None:
 958        """Remove any notify callback set for changes to this collection"""
 959        if self._callback is not None:
 960            self._callback = None
 961
 962    @synchronized
 963    def notify(
 964            self,
 965            event: ChangeType,
 966            collection: 'RichCollectionBase',
 967            name: str,
 968            item: CollectionItem,
 969    ) -> None:
 970        """Notify any subscribed callback about a change to this collection
 971
 972        .. ATTENTION:: Internal
 973           This method is only meant to be used by other Collection methods.
 974
 975        If a callback has been registered with `RichCollectionBase.subscribe`,
 976        it will be called with information about a change to this collection.
 977        Then this notification will be propagated to this collection's root.
 978
 979        Arguments:
 980
 981        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 982          the collection.
 983
 984        * collection: arvados.collection.RichCollectionBase --- The
 985          collection that was modified.
 986
 987        * name: str --- The name of the file or stream within `collection` that
 988          was modified.
 989
 990        * item: arvados.arvfile.ArvadosFile |
 991          arvados.collection.Subcollection --- For ADD events, the new
 992          contents at `name` within `collection`; for DEL events, the
 993          item that was removed.  For MOD and TOK events, a 2-tuple of
 994          the previous item and the new item (may be the same object
 995          or different, depending on whether the action involved it
 996          being modified in place or replaced).
 997
 998        """
 999        if self._callback:
1000            self._callback(event, collection, name, item)
1001        self.root_collection().notify(event, collection, name, item)
1002
1003    @synchronized
1004    def __eq__(self, other: Any) -> bool:
1005        """Indicate whether this collection object is equal to another"""
1006        if other is self:
1007            return True
1008        if not isinstance(other, RichCollectionBase):
1009            return False
1010        if len(self._items) != len(other):
1011            return False
1012        for k in self._items:
1013            if k not in other:
1014                return False
1015            if self._items[k] != other[k]:
1016                return False
1017        return True
1018
1019    def __ne__(self, other: Any) -> bool:
1020        """Indicate whether this collection object is not equal to another"""
1021        return not self.__eq__(other)
1022
1023    @synchronized
1024    def flush(self) -> None:
1025        """Upload any pending data to Keep"""
1026        for e in self.values():
1027            e.flush()
1028
1029
1030class Collection(RichCollectionBase):
1031    """Read and manipulate an Arvados collection
1032
1033    This class provides a high-level interface to create, read, and update
1034    Arvados collections and their contents. Refer to the Arvados Python SDK
1035    cookbook for [an introduction to using the Collection class][cookbook].
1036
1037    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1038    """
1039
1040    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1041                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1042                 keep_client: Optional['arvados.keep.KeepClient']=None,
1043                 num_retries: int=10,
1044                 parent: Optional['Collection']=None,
1045                 apiconfig: Optional[Mapping[str, str]]=None,
1046                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1047                 replication_desired: Optional[int]=None,
1048                 storage_classes_desired: Optional[List[str]]=None,
1049                 put_threads: Optional[int]=None):
1050        """Initialize a Collection object
1051
1052        Arguments:
1053
1054        * manifest_locator_or_text: str | None --- This string can contain a
1055          collection manifest text, portable data hash, or UUID. When given a
1056          portable data hash or UUID, this instance will load a collection
1057          record from the API server. Otherwise, this instance will represent a
1058          new collection without an API server record. The default value `None`
1059          instantiates a new collection with an empty manifest.
1060
1061        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1062          Arvados API client object this instance uses to make requests. If
1063          none is given, this instance creates its own client using the
1064          settings from `apiconfig` (see below). If your client instantiates
1065          many Collection objects, you can help limit memory utilization by
1066          calling `arvados.api.api` to construct an
1067          `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1068          for every Collection.
1069
1070        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1071          object this instance uses to make requests. If none is given, this
1072          instance creates its own client using its `api_client`.
1073
1074        * num_retries: int --- The number of times that client requests are
1075          retried. Default 10.
1076
1077        * parent: arvados.collection.Collection | None --- The parent Collection
1078          object of this instance, if any. This argument is primarily used by
1079          other Collection methods; user client code shouldn't need to use it.
1080
1081        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1082          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1083          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1084          Collection object constructs one from these settings. If no
1085          mapping is provided, calls `arvados.config.settings` to get these
1086          parameters from user configuration.
1087
1088        * block_manager: arvados.arvfile._BlockManager | None --- The
1089          _BlockManager object used by this instance to coordinate reading
1090          and writing Keep data blocks. If none is given, this instance
1091          constructs its own. This argument is primarily used by other
1092          Collection methods; user client code shouldn't need to use it.
1093
1094        * replication_desired: int | None --- This controls both the value of
1095          the `replication_desired` field on API collection records saved by
1096          this class, as well as the number of Keep services that the object
1097          writes new data blocks to. If none is given, uses the default value
1098          configured for the cluster.
1099
1100        * storage_classes_desired: list[str] | None --- This controls both
1101          the value of the `storage_classes_desired` field on API collection
1102          records saved by this class, as well as selecting which specific
1103          Keep services the object writes new data blocks to. If none is
1104          given, defaults to an empty list.
1105
1106        * put_threads: int | None --- The number of threads to run
1107          simultaneously to upload data blocks to Keep. This value is used when
1108          building a new `block_manager`. It is unused when a `block_manager`
1109          is provided.
1110        """
1111
1112        if storage_classes_desired and type(storage_classes_desired) is not list:
1113            raise errors.ArgumentError("storage_classes_desired must be list type.")
1114
1115        super(Collection, self).__init__(parent)
1116        self._api_client = api_client
1117        self._keep_client = keep_client
1118
1119        # Use the keep client from ThreadSafeAPIClient
1120        if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1121            self._keep_client = self._api_client.keep
1122
1123        self._block_manager = block_manager
1124        self.replication_desired = replication_desired
1125        self._storage_classes_desired = storage_classes_desired
1126        self.put_threads = put_threads
1127
1128        if apiconfig:
1129            self._config = apiconfig
1130        else:
1131            self._config = config.settings()
1132
1133        self.num_retries = num_retries
1134        self._manifest_locator = None
1135        self._manifest_text = None
1136        self._portable_data_hash = None
1137        self._api_response = None
1138        self._token_refresh_timestamp = 0
1139
1140        self.lock = threading.RLock()
1141        self.events = None
1142
1143        if manifest_locator_or_text:
1144            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1145                self._manifest_locator = manifest_locator_or_text
1146            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1147                self._manifest_locator = manifest_locator_or_text
1148                if not self._has_local_collection_uuid():
1149                    self._has_remote_blocks = True
1150            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1151                self._manifest_text = manifest_locator_or_text
1152                if '+R' in self._manifest_text:
1153                    self._has_remote_blocks = True
1154            else:
1155                raise errors.ArgumentError(
1156                    "Argument to CollectionReader is not a manifest or a collection UUID")
1157
1158            try:
1159                self._populate()
1160            except errors.SyntaxError as e:
1161                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1162
1163    def storage_classes_desired(self) -> List[str]:
1164        """Get this collection's `storage_classes_desired` value"""
1165        return self._storage_classes_desired or []
1166
1167    def root_collection(self) -> 'Collection':
1168        return self
1169
1170    def get_properties(self) -> Properties:
1171        """Get this collection's properties
1172
1173        This method always returns a dict. If this collection object does not
1174        have an associated API record, or that record does not have any
1175        properties set, this method returns an empty dict.
1176        """
1177        if self._api_response and self._api_response["properties"]:
1178            return self._api_response["properties"]
1179        else:
1180            return {}
1181
1182    def get_trash_at(self) -> Optional[datetime.datetime]:
1183        """Get this collection's `trash_at` field
1184
1185        This method parses the `trash_at` field of the collection's API
1186        record and returns a datetime from it. If that field is not set, or
1187        this collection object does not have an associated API record,
1188        returns None.
1189        """
1190        if self._api_response and self._api_response["trash_at"]:
1191            try:
1192                return ciso8601.parse_datetime(self._api_response["trash_at"])
1193            except ValueError:
1194                return None
1195        else:
1196            return None
1197
1198    def stream_name(self) -> str:
1199        return "."
1200
1201    def writable(self) -> bool:
1202        return True
1203
1204    @synchronized
1205    @retry_method
1206    def update(
1207            self,
1208            other: Optional['Collection']=None,
1209            num_retries: Optional[int]=None,
1210    ) -> None:
1211        """Merge another collection's contents into this one
1212
1213        This method compares the manifest of this collection instance with
1214        another, then updates this instance's manifest with changes from the
1215        other, renaming files to flag conflicts where necessary.
1216
1217        When called without any arguments, this method reloads the collection's
1218        API record, and updates this instance with any changes that have
1219        appeared server-side. If this instance does not have a corresponding
1220        API record, this method raises `arvados.errors.ArgumentError`.
1221
1222        Arguments:
1223
1224        * other: arvados.collection.Collection | None --- The collection
1225          whose contents should be merged into this instance. When not
1226          provided, this method reloads this collection's API record and
1227          constructs a Collection object from it.  If this instance does not
1228          have a corresponding API record, this method raises
1229          `arvados.errors.ArgumentError`.
1230
1231        * num_retries: int | None --- The number of times to retry reloading
1232          the collection's API record from the API server. If not specified,
1233          uses the `num_retries` provided when this instance was constructed.
1234        """
1235
1236        token_refresh_period = 60*60
1237        time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp)
1238        upstream_response = None
1239
1240        if other is None:
1241            if self._manifest_locator is None:
1242                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1243
1244            if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period:
1245                return
1246
1247            upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1248            other = CollectionReader(upstream_response["manifest_text"])
1249
1250        if self.committed():
1251            # 1st case, no local changes, content is the same
1252            if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period:
1253                # No difference in content.  Remember the API record
1254                # (metadata such as name or properties may have changed)
1255                # but don't update the token refresh timestamp.
1256                if upstream_response is not None:
1257                    self._remember_api_response(upstream_response)
1258                return
1259
1260            # 2nd case, no local changes, but either upstream changed
1261            # or we want to refresh tokens.
1262
1263            self.apply(self.diff(other))
1264            if upstream_response is not None:
1265                self._remember_api_response(upstream_response)
1266            self._update_token_timestamp()
1267            self.set_committed(True)
1268            return
1269
1270        # 3rd case, upstream changed, but we also have uncommitted
1271        # changes that we want to incorporate so they don't get lost.
1272
1273        # _manifest_text stores the text from last time we received a
1274        # record from the API server.  This is the state of the
1275        # collection before our uncommitted changes.
1276        baseline = Collection(self._manifest_text)
1277
1278        # Get the set of changes between our baseline and the other
1279        # collection and apply them to self.
1280        #
1281        # If a file was modified in both 'self' and 'other', the
1282        # 'apply' method keeps the contents of 'self' and creates a
1283        # conflict file with the contents of 'other'.
1284        self.apply(baseline.diff(other))
1285
1286        # Remember the new baseline, changes to a file
1287        if upstream_response is not None:
1288            self._remember_api_response(upstream_response)
1289
1290
1291    @synchronized
1292    def _my_api(self):
1293        if self._api_client is None:
1294            self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1295            if self._keep_client is None:
1296                self._keep_client = self._api_client.keep
1297        return self._api_client
1298
1299    @synchronized
1300    def _my_keep(self):
1301        if self._keep_client is None:
1302            if self._api_client is None:
1303                self._my_api()
1304            else:
1305                self._keep_client = KeepClient(api_client=self._api_client)
1306        return self._keep_client
1307
1308    @synchronized
1309    def _my_block_manager(self):
1310        if self._block_manager is None:
1311            copies = (self.replication_desired or
1312                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1313                                                   2))
1314            self._block_manager = _BlockManager(self._my_keep(),
1315                                                copies=copies,
1316                                                put_threads=self.put_threads,
1317                                                num_retries=self.num_retries,
1318                                                storage_classes_func=self.storage_classes_desired)
1319        return self._block_manager
1320
1321    def _remember_api_response(self, response):
1322        self._api_response = response
1323        self._manifest_text = self._api_response['manifest_text']
1324        self._portable_data_hash = self._api_response['portable_data_hash']
1325
1326    def _update_token_timestamp(self):
1327        self._token_refresh_timestamp = time.time()
1328
1329    def _populate_from_api_server(self):
1330        # As in KeepClient itself, we must wait until the last
1331        # possible moment to instantiate an API client, in order to
1332        # avoid tripping up clients that don't have access to an API
1333        # server.  If we do build one, make sure our Keep client uses
1334        # it.  If instantiation fails, we'll fall back to the except
1335        # clause, just like any other Collection lookup
1336        # failure. Return an exception, or None if successful.
1337        self._remember_api_response(self._my_api().collections().get(
1338            uuid=self._manifest_locator).execute(
1339                num_retries=self.num_retries))
1340
1341        # If not overriden via kwargs, we should try to load the
1342        # replication_desired and storage_classes_desired from the API server
1343        if self.replication_desired is None:
1344            self.replication_desired = self._api_response.get('replication_desired', None)
1345        if self._storage_classes_desired is None:
1346            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1347
1348    def _populate(self):
1349        if self._manifest_text is None:
1350            if self._manifest_locator is None:
1351                return
1352            else:
1353                self._populate_from_api_server()
1354        self._baseline_manifest = self._manifest_text
1355        self._import_manifest(self._manifest_text)
1356
1357    def _has_collection_uuid(self):
1358        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1359
1360    def _has_local_collection_uuid(self):
1361        return self._has_collection_uuid and \
1362            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1363
1364    def __enter__(self):
1365        return self
1366
1367    def __exit__(self, exc_type, exc_value, traceback):
1368        """Exit a context with this collection instance
1369
1370        If no exception was raised inside the context block, and this
1371        collection is writable and has a corresponding API record, that
1372        record will be updated to match the state of this instance at the end
1373        of the block.
1374        """
1375        if exc_type is None:
1376            if self.writable() and self._has_collection_uuid():
1377                self.save()
1378        self.stop_threads()
1379
1380    def stop_threads(self) -> None:
1381        """Stop background Keep upload/download threads"""
1382        if self._block_manager is not None:
1383            self._block_manager.stop_threads()
1384
1385    @synchronized
1386    def manifest_locator(self) -> Optional[str]:
1387        """Get this collection's manifest locator, if any
1388
1389        * If this collection instance is associated with an API record with a
1390          UUID, return that.
1391        * Otherwise, if this collection instance was loaded from an API record
1392          by portable data hash, return that.
1393        * Otherwise, return `None`.
1394        """
1395        return self._manifest_locator
1396
1397    @synchronized
1398    def clone(
1399            self,
1400            new_parent: Optional['Collection']=None,
1401            new_name: Optional[str]=None,
1402            readonly: bool=False,
1403            new_config: Optional[Mapping[str, str]]=None,
1404    ) -> 'Collection':
1405        """Create a Collection object with the same contents as this instance
1406
1407        This method creates a new Collection object with contents that match
1408        this instance's. The new collection will not be associated with any API
1409        record.
1410
1411        Arguments:
1412
1413        * new_parent: arvados.collection.Collection | None --- This value is
1414          passed to the new Collection's constructor as the `parent`
1415          argument.
1416
1417        * new_name: str | None --- This value is unused.
1418
1419        * readonly: bool --- If this value is true, this method constructs and
1420          returns a `CollectionReader`. Otherwise, it returns a mutable
1421          `Collection`. Default `False`.
1422
1423        * new_config: Mapping[str, str] | None --- This value is passed to the
1424          new Collection's constructor as `apiconfig`. If no value is provided,
1425          defaults to the configuration passed to this instance's constructor.
1426        """
1427        if new_config is None:
1428            new_config = self._config
1429        if readonly:
1430            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1431        else:
1432            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1433
1434        newcollection._clonefrom(self)
1435        return newcollection
1436
1437    @synchronized
1438    def api_response(self) -> Optional[Dict[str, Any]]:
1439        """Get this instance's associated API record
1440
1441        If this Collection instance has an associated API record, return it.
1442        Otherwise, return `None`.
1443        """
1444        return self._api_response
1445
1446    def find_or_create(
1447            self,
1448            path: str,
1449            create_type: CreateType,
1450    ) -> CollectionItem:
1451        if path == ".":
1452            return self
1453        else:
1454            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1455
1456    def find(self, path: str) -> CollectionItem:
1457        if path == ".":
1458            return self
1459        else:
1460            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1461
1462    def remove(self, path: str, recursive: bool=False) -> None:
1463        if path == ".":
1464            raise errors.ArgumentError("Cannot remove '.'")
1465        else:
1466            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1467
1468    @must_be_writable
1469    @synchronized
1470    @retry_method
1471    def save(
1472            self,
1473            properties: Optional[Properties]=None,
1474            storage_classes: Optional[StorageClasses]=None,
1475            trash_at: Optional[datetime.datetime]=None,
1476            merge: bool=True,
1477            num_retries: Optional[int]=None,
1478            preserve_version: bool=False,
1479    ) -> str:
1480        """Save collection to an existing API record
1481
1482        This method updates the instance's corresponding API record to match
1483        the instance's state. If this instance does not have a corresponding API
1484        record yet, raises `AssertionError`. (To create a new API record, use
1485        `Collection.save_new`.) This method returns the saved collection
1486        manifest.
1487
1488        Arguments:
1489
1490        * properties: dict[str, Any] | None --- If provided, the API record will
1491          be updated with these properties. Note this will completely replace
1492          any existing properties.
1493
1494        * storage_classes: list[str] | None --- If provided, the API record will
1495          be updated with this value in the `storage_classes_desired` field.
1496          This value will also be saved on the instance and used for any
1497          changes that follow.
1498
1499        * trash_at: datetime.datetime | None --- If provided, the API record
1500          will be updated with this value in the `trash_at` field.
1501
1502        * merge: bool --- If `True` (the default), this method will first
1503          reload this collection's API record, and merge any new contents into
1504          this instance before saving changes. See `Collection.update` for
1505          details.
1506
1507        * num_retries: int | None --- The number of times to retry reloading
1508          the collection's API record from the API server. If not specified,
1509          uses the `num_retries` provided when this instance was constructed.
1510
1511        * preserve_version: bool --- This value will be passed to directly
1512          to the underlying API call. If `True`, the Arvados API will
1513          preserve the versions of this collection both immediately before
1514          and after the update. If `True` when the API server is not
1515          configured with collection versioning, this method raises
1516          `arvados.errors.ArgumentError`.
1517        """
1518        if properties and type(properties) is not dict:
1519            raise errors.ArgumentError("properties must be dictionary type.")
1520
1521        if storage_classes and type(storage_classes) is not list:
1522            raise errors.ArgumentError("storage_classes must be list type.")
1523        if storage_classes:
1524            self._storage_classes_desired = storage_classes
1525
1526        if trash_at and type(trash_at) is not datetime.datetime:
1527            raise errors.ArgumentError("trash_at must be datetime type.")
1528
1529        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1530            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1531
1532        body={}
1533        if properties:
1534            body["properties"] = properties
1535        if self.storage_classes_desired():
1536            body["storage_classes_desired"] = self.storage_classes_desired()
1537        if trash_at:
1538            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1539            body["trash_at"] = t
1540        if preserve_version:
1541            body["preserve_version"] = preserve_version
1542
1543        if not self.committed():
1544            if self._has_remote_blocks:
1545                # Copy any remote blocks to the local cluster.
1546                self._copy_remote_blocks(remote_blocks={})
1547                self._has_remote_blocks = False
1548            if not self._has_collection_uuid():
1549                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1550            elif not self._has_local_collection_uuid():
1551                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1552
1553            self._my_block_manager().commit_all()
1554
1555            if merge:
1556                self.update()
1557
1558            text = self.manifest_text(strip=False)
1559            body['manifest_text'] = text
1560
1561            self._remember_api_response(self._my_api().collections().update(
1562                uuid=self._manifest_locator,
1563                body=body
1564                ).execute(num_retries=num_retries))
1565            self.set_committed(True)
1566        elif body:
1567            self._remember_api_response(self._my_api().collections().update(
1568                uuid=self._manifest_locator,
1569                body=body
1570                ).execute(num_retries=num_retries))
1571
1572        return self._manifest_text
1573
1574
1575    @must_be_writable
1576    @synchronized
1577    @retry_method
1578    def save_new(
1579            self,
1580            name: Optional[str]=None,
1581            create_collection_record: bool=True,
1582            owner_uuid: Optional[str]=None,
1583            properties: Optional[Properties]=None,
1584            storage_classes: Optional[StorageClasses]=None,
1585            trash_at: Optional[datetime.datetime]=None,
1586            ensure_unique_name: bool=False,
1587            num_retries: Optional[int]=None,
1588            preserve_version: bool=False,
1589    ):
1590        """Save collection to a new API record
1591
1592        This method finishes uploading new data blocks and (optionally)
1593        creates a new API collection record with the provided data. If a new
1594        record is created, this instance becomes associated with that record
1595        for future updates like `save()`. This method returns the saved
1596        collection manifest.
1597
1598        Arguments:
1599
1600        * name: str | None --- The `name` field to use on the new collection
1601          record. If not specified, a generic default name is generated.
1602
1603        * create_collection_record: bool --- If `True` (the default), creates a
1604          collection record on the API server. If `False`, the method finishes
1605          all data uploads and only returns the resulting collection manifest
1606          without sending it to the API server.
1607
1608        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1609          new collection record.
1610
1611        * properties: dict[str, Any] | None --- The `properties` field to use on
1612          the new collection record.
1613
1614        * storage_classes: list[str] | None --- The
1615          `storage_classes_desired` field to use on the new collection record.
1616
1617        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1618          on the new collection record.
1619
1620        * ensure_unique_name: bool --- This value is passed directly to the
1621          Arvados API when creating the collection record. If `True`, the API
1622          server may modify the submitted `name` to ensure the collection's
1623          `name`+`owner_uuid` combination is unique. If `False` (the default),
1624          if a collection already exists with this same `name`+`owner_uuid`
1625          combination, creating a collection record will raise a validation
1626          error.
1627
1628        * num_retries: int | None --- The number of times to retry reloading
1629          the collection's API record from the API server. If not specified,
1630          uses the `num_retries` provided when this instance was constructed.
1631
1632        * preserve_version: bool --- This value will be passed to directly
1633          to the underlying API call. If `True`, the Arvados API will
1634          preserve the versions of this collection both immediately before
1635          and after the update. If `True` when the API server is not
1636          configured with collection versioning, this method raises
1637          `arvados.errors.ArgumentError`.
1638        """
1639        if properties and type(properties) is not dict:
1640            raise errors.ArgumentError("properties must be dictionary type.")
1641
1642        if storage_classes and type(storage_classes) is not list:
1643            raise errors.ArgumentError("storage_classes must be list type.")
1644
1645        if trash_at and type(trash_at) is not datetime.datetime:
1646            raise errors.ArgumentError("trash_at must be datetime type.")
1647
1648        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1649            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1650
1651        if self._has_remote_blocks:
1652            # Copy any remote blocks to the local cluster.
1653            self._copy_remote_blocks(remote_blocks={})
1654            self._has_remote_blocks = False
1655
1656        if storage_classes:
1657            self._storage_classes_desired = storage_classes
1658
1659        self._my_block_manager().commit_all()
1660        text = self.manifest_text(strip=False)
1661
1662        if create_collection_record:
1663            if name is None:
1664                name = "New collection"
1665                ensure_unique_name = True
1666
1667            body = {"manifest_text": text,
1668                    "name": name,
1669                    "replication_desired": self.replication_desired}
1670            if owner_uuid:
1671                body["owner_uuid"] = owner_uuid
1672            if properties:
1673                body["properties"] = properties
1674            if self.storage_classes_desired():
1675                body["storage_classes_desired"] = self.storage_classes_desired()
1676            if trash_at:
1677                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1678                body["trash_at"] = t
1679            if preserve_version:
1680                body["preserve_version"] = preserve_version
1681
1682            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1683            self._manifest_locator = self._api_response["uuid"]
1684            self.set_committed(True)
1685
1686        return text
1687
1688    _token_re = re.compile(r'(\S+)(\s+|$)')
1689    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1690    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1691
1692    def _unescape_manifest_path(self, path):
1693        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1694
1695    @synchronized
1696    def _import_manifest(self, manifest_text):
1697        """Import a manifest into a `Collection`.
1698
1699        :manifest_text:
1700          The manifest text to import from.
1701
1702        """
1703        if len(self) > 0:
1704            raise ArgumentError("Can only import manifest into an empty collection")
1705
1706        STREAM_NAME = 0
1707        BLOCKS = 1
1708        SEGMENTS = 2
1709
1710        stream_name = None
1711        state = STREAM_NAME
1712
1713        for token_and_separator in self._token_re.finditer(manifest_text):
1714            tok = token_and_separator.group(1)
1715            sep = token_and_separator.group(2)
1716
1717            if state == STREAM_NAME:
1718                # starting a new stream
1719                stream_name = self._unescape_manifest_path(tok)
1720                blocks = []
1721                segments = []
1722                streamoffset = 0
1723                state = BLOCKS
1724                self.find_or_create(stream_name, COLLECTION)
1725                continue
1726
1727            if state == BLOCKS:
1728                block_locator = self._block_re.match(tok)
1729                if block_locator:
1730                    blocksize = int(block_locator.group(1))
1731                    blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1732                    streamoffset += blocksize
1733                else:
1734                    state = SEGMENTS
1735
1736            if state == SEGMENTS:
1737                file_segment = self._segment_re.match(tok)
1738                if file_segment:
1739                    pos = int(file_segment.group(1))
1740                    size = int(file_segment.group(2))
1741                    name = self._unescape_manifest_path(file_segment.group(3))
1742                    if name.split('/')[-1] == '.':
1743                        # placeholder for persisting an empty directory, not a real file
1744                        if len(name) > 2:
1745                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1746                    else:
1747                        filepath = os.path.join(stream_name, name)
1748                        try:
1749                            afile = self.find_or_create(filepath, FILE)
1750                        except IOError as e:
1751                            if e.errno == errno.ENOTDIR:
1752                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1753                            else:
1754                                raise e from None
1755                        if isinstance(afile, ArvadosFile):
1756                            afile.add_segment(blocks, pos, size)
1757                        else:
1758                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1759                else:
1760                    # error!
1761                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1762
1763            if sep == "\n":
1764                stream_name = None
1765                state = STREAM_NAME
1766
1767        self._update_token_timestamp()
1768        self.set_committed(True)
1769
1770    @synchronized
1771    def notify(
1772            self,
1773            event: ChangeType,
1774            collection: 'RichCollectionBase',
1775            name: str,
1776            item: CollectionItem,
1777    ) -> None:
1778        if self._callback:
1779            self._callback(event, collection, name, item)
1780
1781
1782class Subcollection(RichCollectionBase):
1783    """Read and manipulate a stream/directory within an Arvados collection
1784
1785    This class represents a single stream (like a directory) within an Arvados
1786    `Collection`. It is returned by `Collection.find` and provides the same API.
1787    Operations that work on the API collection record propagate to the parent
1788    `Collection` object.
1789    """
1790
1791    def __init__(self, parent, name):
1792        super(Subcollection, self).__init__(parent)
1793        self.lock = self.root_collection().lock
1794        self._manifest_text = None
1795        self.name = name
1796        self.num_retries = parent.num_retries
1797
1798    def root_collection(self) -> 'Collection':
1799        return self.parent.root_collection()
1800
1801    def writable(self) -> bool:
1802        return self.root_collection().writable()
1803
1804    def _my_api(self):
1805        return self.root_collection()._my_api()
1806
1807    def _my_keep(self):
1808        return self.root_collection()._my_keep()
1809
1810    def _my_block_manager(self):
1811        return self.root_collection()._my_block_manager()
1812
1813    def stream_name(self) -> str:
1814        return os.path.join(self.parent.stream_name(), self.name)
1815
1816    @synchronized
1817    def clone(
1818            self,
1819            new_parent: Optional['Collection']=None,
1820            new_name: Optional[str]=None,
1821    ) -> 'Subcollection':
1822        c = Subcollection(new_parent, new_name)
1823        c._clonefrom(self)
1824        return c
1825
1826    @must_be_writable
1827    @synchronized
1828    def _reparent(self, newparent, newname):
1829        self.set_committed(False)
1830        self.flush()
1831        self.parent.remove(self.name, recursive=True)
1832        self.parent = newparent
1833        self.name = newname
1834        self.lock = self.parent.root_collection().lock
1835
1836    @synchronized
1837    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1838        """Encode empty directories by using an \056-named (".") empty file"""
1839        if len(self._items) == 0:
1840            return "%s %s 0:0:\\056\n" % (
1841                streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1842        return super(Subcollection, self)._get_manifest_text(stream_name,
1843                                                             strip, normalize,
1844                                                             only_committed)
1845
1846
1847class CollectionReader(Collection):
1848    """Read-only `Collection` subclass
1849
1850    This class will never create or update any API collection records. You can
1851    use this class for additional code safety when you only need to read
1852    existing collections.
1853    """
1854    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1855        self._in_init = True
1856        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1857        self._in_init = False
1858
1859        # Forego any locking since it should never change once initialized.
1860        self.lock = NoopLock()
1861
1862        # Backwards compatability with old CollectionReader
1863        # all_streams() and all_files()
1864        self._streams = None
1865
1866    def writable(self) -> bool:
1867        return self._in_init
FILE = 'file'

create_type value for Collection.find_or_create

COLLECTION = 'collection'

create_type value for Collection.find_or_create

ChangeList = typing.List[typing.Union[typing.Tuple[typing.Literal['add', 'del'], str, ForwardRef('Collection')], typing.Tuple[typing.Literal['mod', 'tok'], str, ForwardRef('Collection'), ForwardRef('Collection')]]]
ChangeType = typing.Literal['add', 'del', 'mod', 'tok']
CollectionItem = typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]
ChangeCallback = typing.Callable[[typing.Literal['add', 'del', 'mod', 'tok'], ForwardRef('Collection'), str, typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]], object]
CreateType = typing.Literal['collection', 'file']
Properties = typing.Dict[str, typing.Any]
StorageClasses = typing.List[str]
class CollectionBase:
 79class CollectionBase(object):
 80    """Abstract base class for Collection classes
 81
 82    .. ATTENTION:: Internal
 83       This class is meant to be used by other parts of the SDK. User code
 84       should instantiate or subclass `Collection` or one of its subclasses
 85       directly.
 86    """
 87
 88    def __enter__(self):
 89        """Enter a context block with this collection instance"""
 90        return self
 91
 92    def __exit__(self, exc_type, exc_value, traceback):
 93        """Exit a context block with this collection instance"""
 94        pass
 95
 96    def _my_keep(self):
 97        if self._keep_client is None:
 98            self._keep_client = KeepClient(api_client=self._api_client,
 99                                           num_retries=self.num_retries)
100        return self._keep_client
101
102    def stripped_manifest(self) -> str:
103        """Create a copy of the collection manifest with only size hints
104
105        This method returns a string with the current collection's manifest
106        text with all non-portable locator hints like permission hints and
107        remote cluster hints removed. The only hints in the returned manifest
108        will be size hints.
109        """
110        raw = self.manifest_text()
111        clean = []
112        for line in raw.split("\n"):
113            fields = line.split()
114            if fields:
115                clean_fields = fields[:1] + [
116                    (re.sub(r'\+[^\d][^\+]*', '', x)
117                     if re.match(arvados.util.keep_locator_pattern, x)
118                     else x)
119                    for x in fields[1:]]
120                clean += [' '.join(clean_fields), "\n"]
121        return ''.join(clean)

Abstract base class for Collection classes

def stripped_manifest(self) -> str:
102    def stripped_manifest(self) -> str:
103        """Create a copy of the collection manifest with only size hints
104
105        This method returns a string with the current collection's manifest
106        text with all non-portable locator hints like permission hints and
107        remote cluster hints removed. The only hints in the returned manifest
108        will be size hints.
109        """
110        raw = self.manifest_text()
111        clean = []
112        for line in raw.split("\n"):
113            fields = line.split()
114            if fields:
115                clean_fields = fields[:1] + [
116                    (re.sub(r'\+[^\d][^\+]*', '', x)
117                     if re.match(arvados.util.keep_locator_pattern, x)
118                     else x)
119                    for x in fields[1:]]
120                clean += [' '.join(clean_fields), "\n"]
121        return ''.join(clean)

Create a copy of the collection manifest with only size hints

This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.

class RichCollectionBase(CollectionBase):
 147class RichCollectionBase(CollectionBase):
 148    """Base class for Collection classes
 149
 150    .. ATTENTION:: Internal
 151       This class is meant to be used by other parts of the SDK. User code
 152       should instantiate or subclass `Collection` or one of its subclasses
 153       directly.
 154    """
 155
 156    def __init__(self, parent=None):
 157        self.parent = parent
 158        self._committed = False
 159        self._has_remote_blocks = False
 160        self._callback = None
 161        self._items = {}
 162
 163    def _my_api(self):
 164        raise NotImplementedError()
 165
 166    def _my_keep(self):
 167        raise NotImplementedError()
 168
 169    def _my_block_manager(self):
 170        raise NotImplementedError()
 171
 172    def writable(self) -> bool:
 173        """Indicate whether this collection object can be modified
 174
 175        This method returns `False` if this object is a `CollectionReader`,
 176        else `True`.
 177        """
 178        raise NotImplementedError()
 179
 180    def root_collection(self) -> 'Collection':
 181        """Get this collection's root collection object
 182
 183        If you open a subcollection with `Collection.find`, calling this method
 184        on that subcollection returns the source Collection object.
 185        """
 186        raise NotImplementedError()
 187
 188    def stream_name(self) -> str:
 189        """Get the name of the manifest stream represented by this collection
 190
 191        If you open a subcollection with `Collection.find`, calling this method
 192        on that subcollection returns the name of the stream you opened.
 193        """
 194        raise NotImplementedError()
 195
 196    @synchronized
 197    def has_remote_blocks(self) -> bool:
 198        """Indiciate whether the collection refers to remote data
 199
 200        Returns `True` if the collection manifest includes any Keep locators
 201        with a remote hint (`+R`), else `False`.
 202        """
 203        if self._has_remote_blocks:
 204            return True
 205        for item in self:
 206            if self[item].has_remote_blocks():
 207                return True
 208        return False
 209
 210    @synchronized
 211    def set_has_remote_blocks(self, val: bool) -> None:
 212        """Cache whether this collection refers to remote blocks
 213
 214        .. ATTENTION:: Internal
 215           This method is only meant to be used by other Collection methods.
 216
 217        Set this collection's cached "has remote blocks" flag to the given
 218        value.
 219        """
 220        self._has_remote_blocks = val
 221        if self.parent:
 222            self.parent.set_has_remote_blocks(val)
 223
 224    @must_be_writable
 225    @synchronized
 226    def find_or_create(
 227            self,
 228            path: str,
 229            create_type: CreateType,
 230    ) -> CollectionItem:
 231        """Get the item at the given path, creating it if necessary
 232
 233        If `path` refers to a stream in this collection, returns a
 234        corresponding `Subcollection` object. If `path` refers to a file in
 235        this collection, returns a corresponding
 236        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 237        this collection, then this method creates a new object and returns
 238        it, creating parent streams as needed. The type of object created is
 239        determined by the value of `create_type`.
 240
 241        Arguments:
 242
 243        * path: str --- The path to find or create within this collection.
 244
 245        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 246          create at `path` if one does not exist. Passing `COLLECTION`
 247          creates a stream and returns the corresponding
 248          `Subcollection`. Passing `FILE` creates a new file and returns the
 249          corresponding `arvados.arvfile.ArvadosFile`.
 250        """
 251        pathcomponents = path.split("/", 1)
 252        if pathcomponents[0]:
 253            item = self._items.get(pathcomponents[0])
 254            if len(pathcomponents) == 1:
 255                if item is None:
 256                    # create new file
 257                    if create_type == COLLECTION:
 258                        item = Subcollection(self, pathcomponents[0])
 259                    else:
 260                        item = ArvadosFile(self, pathcomponents[0])
 261                    self._items[pathcomponents[0]] = item
 262                    self.set_committed(False)
 263                    self.notify(ADD, self, pathcomponents[0], item)
 264                return item
 265            else:
 266                if item is None:
 267                    # create new collection
 268                    item = Subcollection(self, pathcomponents[0])
 269                    self._items[pathcomponents[0]] = item
 270                    self.set_committed(False)
 271                    self.notify(ADD, self, pathcomponents[0], item)
 272                if isinstance(item, RichCollectionBase):
 273                    return item.find_or_create(pathcomponents[1], create_type)
 274                else:
 275                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 276        else:
 277            return self
 278
 279    @synchronized
 280    def find(self, path: str) -> CollectionItem:
 281        """Get the item at the given path
 282
 283        If `path` refers to a stream in this collection, returns a
 284        corresponding `Subcollection` object. If `path` refers to a file in
 285        this collection, returns a corresponding
 286        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 287        this collection, then this method raises `NotADirectoryError`.
 288
 289        Arguments:
 290
 291        * path: str --- The path to find or create within this collection.
 292        """
 293        if not path:
 294            raise errors.ArgumentError("Parameter 'path' is empty.")
 295
 296        pathcomponents = path.split("/", 1)
 297        if pathcomponents[0] == '':
 298            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 299
 300        item = self._items.get(pathcomponents[0])
 301        if item is None:
 302            return None
 303        elif len(pathcomponents) == 1:
 304            return item
 305        else:
 306            if isinstance(item, RichCollectionBase):
 307                if pathcomponents[1]:
 308                    return item.find(pathcomponents[1])
 309                else:
 310                    return item
 311            else:
 312                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 313
 314    @synchronized
 315    def mkdirs(self, path: str) -> 'Subcollection':
 316        """Create and return a subcollection at `path`
 317
 318        If `path` exists within this collection, raises `FileExistsError`.
 319        Otherwise, creates a stream at that path and returns the
 320        corresponding `Subcollection`.
 321        """
 322        if self.find(path) != None:
 323            raise IOError(errno.EEXIST, "Directory or file exists", path)
 324
 325        return self.find_or_create(path, COLLECTION)
 326
 327    def open(
 328            self,
 329            path: str,
 330            mode: str="r",
 331            encoding: Optional[str]=None
 332    ) -> IO:
 333        """Open a file-like object within the collection
 334
 335        This method returns a file-like object that can read and/or write the
 336        file located at `path` within the collection. If you attempt to write
 337        a `path` that does not exist, the file is created with `find_or_create`.
 338        If the file cannot be opened for any other reason, this method raises
 339        `OSError` with an appropriate errno.
 340
 341        Arguments:
 342
 343        * path: str --- The path of the file to open within this collection
 344
 345        * mode: str --- The mode to open this file. Supports all the same
 346          values as `builtins.open`.
 347
 348        * encoding: str | None --- The text encoding of the file. Only used
 349          when the file is opened in text mode. The default is
 350          platform-dependent.
 351
 352        """
 353        if not re.search(r'^[rwa][bt]?\+?$', mode):
 354            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 355
 356        if mode[0] == 'r' and '+' not in mode:
 357            fclass = ArvadosFileReader
 358            arvfile = self.find(path)
 359        elif not self.writable():
 360            raise IOError(errno.EROFS, "Collection is read only")
 361        else:
 362            fclass = ArvadosFileWriter
 363            arvfile = self.find_or_create(path, FILE)
 364
 365        if arvfile is None:
 366            raise IOError(errno.ENOENT, "File not found", path)
 367        if not isinstance(arvfile, ArvadosFile):
 368            raise IOError(errno.EISDIR, "Is a directory", path)
 369
 370        if mode[0] == 'w':
 371            arvfile.truncate(0)
 372
 373        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 374        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 375        if 'b' not in mode:
 376            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 377            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 378        return f
 379
 380    def modified(self) -> bool:
 381        """Indicate whether this collection has an API server record
 382
 383        Returns `False` if this collection corresponds to a record loaded from
 384        the API server, `True` otherwise.
 385        """
 386        return not self.committed()
 387
 388    @synchronized
 389    def committed(self):
 390        """Indicate whether this collection has an API server record
 391
 392        Returns `True` if this collection corresponds to a record loaded from
 393        the API server, `False` otherwise.
 394        """
 395        return self._committed
 396
 397    @synchronized
 398    def set_committed(self, value: bool=True):
 399        """Cache whether this collection has an API server record
 400
 401        .. ATTENTION:: Internal
 402           This method is only meant to be used by other Collection methods.
 403
 404        Set this collection's cached "committed" flag to the given
 405        value and propagates it as needed.
 406        """
 407        if value == self._committed:
 408            return
 409        if value:
 410            for k,v in self._items.items():
 411                v.set_committed(True)
 412            self._committed = True
 413        else:
 414            self._committed = False
 415            if self.parent is not None:
 416                self.parent.set_committed(False)
 417
 418    @synchronized
 419    def __iter__(self) -> Iterator[str]:
 420        """Iterate names of streams and files in this collection
 421
 422        This method does not recurse. It only iterates the contents of this
 423        collection's corresponding stream.
 424        """
 425        return iter(self._items)
 426
 427    @synchronized
 428    def __getitem__(self, k: str) -> CollectionItem:
 429        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 430
 431        This method does not recurse. If you want to search a path, use
 432        `RichCollectionBase.find` instead.
 433        """
 434        return self._items[k]
 435
 436    @synchronized
 437    def __contains__(self, k: str) -> bool:
 438        """Indicate whether this collection has an item with this name
 439
 440        This method does not recurse. It you want to check a path, use
 441        `RichCollectionBase.exists` instead.
 442        """
 443        return k in self._items
 444
 445    @synchronized
 446    def __len__(self):
 447        """Get the number of items directly contained in this collection
 448
 449        This method does not recurse. It only counts the streams and files
 450        in this collection's corresponding stream.
 451        """
 452        return len(self._items)
 453
 454    @must_be_writable
 455    @synchronized
 456    def __delitem__(self, p: str) -> None:
 457        """Delete an item from this collection's stream
 458
 459        This method does not recurse. If you want to remove an item by a
 460        path, use `RichCollectionBase.remove` instead.
 461        """
 462        del self._items[p]
 463        self.set_committed(False)
 464        self.notify(DEL, self, p, None)
 465
 466    @synchronized
 467    def keys(self) -> Iterator[str]:
 468        """Iterate names of streams and files in this collection
 469
 470        This method does not recurse. It only iterates the contents of this
 471        collection's corresponding stream.
 472        """
 473        return self._items.keys()
 474
 475    @synchronized
 476    def values(self) -> List[CollectionItem]:
 477        """Get a list of objects in this collection's stream
 478
 479        The return value includes a `Subcollection` for every stream, and an
 480        `arvados.arvfile.ArvadosFile` for every file, directly within this
 481        collection's stream.  This method does not recurse.
 482        """
 483        return list(self._items.values())
 484
 485    @synchronized
 486    def items(self) -> List[Tuple[str, CollectionItem]]:
 487        """Get a list of `(name, object)` tuples from this collection's stream
 488
 489        The return value includes a `Subcollection` for every stream, and an
 490        `arvados.arvfile.ArvadosFile` for every file, directly within this
 491        collection's stream.  This method does not recurse.
 492        """
 493        return list(self._items.items())
 494
 495    def exists(self, path: str) -> bool:
 496        """Indicate whether this collection includes an item at `path`
 497
 498        This method returns `True` if `path` refers to a stream or file within
 499        this collection, else `False`.
 500
 501        Arguments:
 502
 503        * path: str --- The path to check for existence within this collection
 504        """
 505        return self.find(path) is not None
 506
 507    @must_be_writable
 508    @synchronized
 509    def remove(self, path: str, recursive: bool=False) -> None:
 510        """Remove the file or stream at `path`
 511
 512        Arguments:
 513
 514        * path: str --- The path of the item to remove from the collection
 515
 516        * recursive: bool --- Controls the method's behavior if `path` refers
 517          to a nonempty stream. If `False` (the default), this method raises
 518          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 519          items under the stream.
 520        """
 521        if not path:
 522            raise errors.ArgumentError("Parameter 'path' is empty.")
 523
 524        pathcomponents = path.split("/", 1)
 525        item = self._items.get(pathcomponents[0])
 526        if item is None:
 527            raise IOError(errno.ENOENT, "File not found", path)
 528        if len(pathcomponents) == 1:
 529            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 530                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 531            deleteditem = self._items[pathcomponents[0]]
 532            del self._items[pathcomponents[0]]
 533            self.set_committed(False)
 534            self.notify(DEL, self, pathcomponents[0], deleteditem)
 535        else:
 536            item.remove(pathcomponents[1], recursive=recursive)
 537
 538    def _clonefrom(self, source):
 539        for k,v in source.items():
 540            self._items[k] = v.clone(self, k)
 541
 542    def clone(self):
 543        raise NotImplementedError()
 544
 545    @must_be_writable
 546    @synchronized
 547    def add(
 548            self,
 549            source_obj: CollectionItem,
 550            target_name: str,
 551            overwrite: bool=False,
 552            reparent: bool=False,
 553    ) -> None:
 554        """Copy or move a file or subcollection object to this collection
 555
 556        Arguments:
 557
 558        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 559          to add to this collection
 560
 561        * target_name: str --- The path inside this collection where
 562          `source_obj` should be added.
 563
 564        * overwrite: bool --- Controls the behavior of this method when the
 565          collection already contains an object at `target_name`. If `False`
 566          (the default), this method will raise `FileExistsError`. If `True`,
 567          the object at `target_name` will be replaced with `source_obj`.
 568
 569        * reparent: bool --- Controls whether this method copies or moves
 570          `source_obj`. If `False` (the default), `source_obj` is copied into
 571          this collection. If `True`, `source_obj` is moved into this
 572          collection.
 573        """
 574        if target_name in self and not overwrite:
 575            raise IOError(errno.EEXIST, "File already exists", target_name)
 576
 577        modified_from = None
 578        if target_name in self:
 579            modified_from = self[target_name]
 580
 581        # Actually make the move or copy.
 582        if reparent:
 583            source_obj._reparent(self, target_name)
 584            item = source_obj
 585        else:
 586            item = source_obj.clone(self, target_name)
 587
 588        self._items[target_name] = item
 589        self.set_committed(False)
 590        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 591            self.set_has_remote_blocks(True)
 592
 593        if modified_from:
 594            self.notify(MOD, self, target_name, (modified_from, item))
 595        else:
 596            self.notify(ADD, self, target_name, item)
 597
 598    def _get_src_target(self, source, target_path, source_collection, create_dest):
 599        if source_collection is None:
 600            source_collection = self
 601
 602        # Find the object
 603        if isinstance(source, str):
 604            source_obj = source_collection.find(source)
 605            if source_obj is None:
 606                raise IOError(errno.ENOENT, "File not found", source)
 607            sourcecomponents = source.split("/")
 608        else:
 609            source_obj = source
 610            sourcecomponents = None
 611
 612        # Find parent collection the target path
 613        targetcomponents = target_path.split("/")
 614
 615        # Determine the name to use.
 616        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 617
 618        if not target_name:
 619            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 620
 621        if create_dest:
 622            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 623        else:
 624            if len(targetcomponents) > 1:
 625                target_dir = self.find("/".join(targetcomponents[0:-1]))
 626            else:
 627                target_dir = self
 628
 629        if target_dir is None:
 630            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 631
 632        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 633            target_dir = target_dir[target_name]
 634            target_name = sourcecomponents[-1]
 635
 636        return (source_obj, target_dir, target_name)
 637
 638    @must_be_writable
 639    @synchronized
 640    def copy(
 641            self,
 642            source: Union[str, CollectionItem],
 643            target_path: str,
 644            source_collection: Optional['RichCollectionBase']=None,
 645            overwrite: bool=False,
 646    ) -> None:
 647        """Copy a file or subcollection object to this collection
 648
 649        Arguments:
 650
 651        * source: str | arvados.arvfile.ArvadosFile |
 652          arvados.collection.Subcollection --- The file or subcollection to
 653          add to this collection. If `source` is a str, the object will be
 654          found by looking up this path from `source_collection` (see
 655          below).
 656
 657        * target_path: str --- The path inside this collection where the
 658          source object should be added.
 659
 660        * source_collection: arvados.collection.Collection | None --- The
 661          collection to find the source object from when `source` is a
 662          path. Defaults to the current collection (`self`).
 663
 664        * overwrite: bool --- Controls the behavior of this method when the
 665          collection already contains an object at `target_path`. If `False`
 666          (the default), this method will raise `FileExistsError`. If `True`,
 667          the object at `target_path` will be replaced with `source_obj`.
 668        """
 669        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 670        target_dir.add(source_obj, target_name, overwrite, False)
 671
 672    @must_be_writable
 673    @synchronized
 674    def rename(
 675            self,
 676            source: Union[str, CollectionItem],
 677            target_path: str,
 678            source_collection: Optional['RichCollectionBase']=None,
 679            overwrite: bool=False,
 680    ) -> None:
 681        """Move a file or subcollection object to this collection
 682
 683        Arguments:
 684
 685        * source: str | arvados.arvfile.ArvadosFile |
 686          arvados.collection.Subcollection --- The file or subcollection to
 687          add to this collection. If `source` is a str, the object will be
 688          found by looking up this path from `source_collection` (see
 689          below).
 690
 691        * target_path: str --- The path inside this collection where the
 692          source object should be added.
 693
 694        * source_collection: arvados.collection.Collection | None --- The
 695          collection to find the source object from when `source` is a
 696          path. Defaults to the current collection (`self`).
 697
 698        * overwrite: bool --- Controls the behavior of this method when the
 699          collection already contains an object at `target_path`. If `False`
 700          (the default), this method will raise `FileExistsError`. If `True`,
 701          the object at `target_path` will be replaced with `source_obj`.
 702        """
 703        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 704        if not source_obj.writable():
 705            raise IOError(errno.EROFS, "Source collection is read only", source)
 706        target_dir.add(source_obj, target_name, overwrite, True)
 707
 708    def portable_manifest_text(self, stream_name: str=".") -> str:
 709        """Get the portable manifest text for this collection
 710
 711        The portable manifest text is normalized, and does not include access
 712        tokens. This method does not flush outstanding blocks to Keep.
 713
 714        Arguments:
 715
 716        * stream_name: str --- The name to use for this collection's stream in
 717          the generated manifest. Default `'.'`.
 718        """
 719        return self._get_manifest_text(stream_name, True, True)
 720
 721    @synchronized
 722    def manifest_text(
 723            self,
 724            stream_name: str=".",
 725            strip: bool=False,
 726            normalize: bool=False,
 727            only_committed: bool=False,
 728    ) -> str:
 729        """Get the manifest text for this collection
 730
 731        Arguments:
 732
 733        * stream_name: str --- The name to use for this collection's stream in
 734          the generated manifest. Default `'.'`.
 735
 736        * strip: bool --- Controls whether or not the returned manifest text
 737          includes access tokens. If `False` (the default), the manifest text
 738          will include access tokens. If `True`, the manifest text will not
 739          include access tokens.
 740
 741        * normalize: bool --- Controls whether or not the returned manifest
 742          text is normalized. Default `False`.
 743
 744        * only_committed: bool --- Controls whether or not this method uploads
 745          pending data to Keep before building and returning the manifest text.
 746          If `False` (the default), this method will finish uploading all data
 747          to Keep, then return the final manifest. If `True`, this method will
 748          build and return a manifest that only refers to the data that has
 749          finished uploading at the time this method was called.
 750        """
 751        if not only_committed:
 752            self._my_block_manager().commit_all()
 753        return self._get_manifest_text(stream_name, strip, normalize,
 754                                       only_committed=only_committed)
 755
 756    @synchronized
 757    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 758        """Get the manifest text for this collection, sub collections and files.
 759
 760        :stream_name:
 761          Name to use for this stream (directory)
 762
 763        :strip:
 764          If True, remove signing tokens from block locators if present.
 765          If False (default), block locators are left unchanged.
 766
 767        :normalize:
 768          If True, always export the manifest text in normalized form
 769          even if the Collection is not modified.  If False (default) and the collection
 770          is not modified, return the original manifest text even if it is not
 771          in normalized form.
 772
 773        :only_committed:
 774          If True, only include blocks that were already committed to Keep.
 775
 776        """
 777
 778        if not self.committed() or self._manifest_text is None or normalize:
 779            stream = {}
 780            buf = []
 781            sorted_keys = sorted(self.keys())
 782            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 783                # Create a stream per file `k`
 784                arvfile = self[filename]
 785                filestream = []
 786                for segment in arvfile.segments():
 787                    loc = segment.locator
 788                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 789                        if only_committed:
 790                            continue
 791                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 792                    if strip:
 793                        loc = KeepLocator(loc).stripped()
 794                    filestream.append(streams.LocatorAndRange(
 795                        loc,
 796                        KeepLocator(loc).size,
 797                        segment.segment_offset,
 798                        segment.range_size,
 799                    ))
 800                stream[filename] = filestream
 801            if stream:
 802                buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
 803            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 804                buf.append(self[dirname].manifest_text(
 805                    stream_name=os.path.join(stream_name, dirname),
 806                    strip=strip, normalize=True, only_committed=only_committed))
 807            return "".join(buf)
 808        else:
 809            if strip:
 810                return self.stripped_manifest()
 811            else:
 812                return self._manifest_text
 813
 814    @synchronized
 815    def _copy_remote_blocks(self, remote_blocks={}):
 816        """Scan through the entire collection and ask Keep to copy remote blocks.
 817
 818        When accessing a remote collection, blocks will have a remote signature
 819        (+R instead of +A). Collect these signatures and request Keep to copy the
 820        blocks to the local cluster, returning local (+A) signatures.
 821
 822        :remote_blocks:
 823          Shared cache of remote to local block mappings. This is used to avoid
 824          doing extra work when blocks are shared by more than one file in
 825          different subdirectories.
 826
 827        """
 828        for item in self:
 829            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 830        return remote_blocks
 831
 832    @synchronized
 833    def diff(
 834            self,
 835            end_collection: 'RichCollectionBase',
 836            prefix: str=".",
 837            holding_collection: Optional['Collection']=None,
 838    ) -> ChangeList:
 839        """Build a list of differences between this collection and another
 840
 841        Arguments:
 842
 843        * end_collection: arvados.collection.RichCollectionBase --- A
 844          collection object with the desired end state. The returned diff
 845          list will describe how to go from the current collection object
 846          `self` to `end_collection`.
 847
 848        * prefix: str --- The name to use for this collection's stream in
 849          the diff list. Default `'.'`.
 850
 851        * holding_collection: arvados.collection.Collection | None --- A
 852          collection object used to hold objects for the returned diff
 853          list. By default, a new empty collection is created.
 854        """
 855        changes = []
 856        if holding_collection is None:
 857            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 858        for k in self:
 859            if k not in end_collection:
 860               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 861        for k in end_collection:
 862            if k in self:
 863                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 864                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 865                elif end_collection[k] != self[k]:
 866                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 867                else:
 868                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 869            else:
 870                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 871        return changes
 872
 873    @must_be_writable
 874    @synchronized
 875    def apply(self, changes: ChangeList) -> None:
 876        """Apply a list of changes from to this collection
 877
 878        This method takes a list of changes generated by
 879        `RichCollectionBase.diff` and applies it to this
 880        collection. Afterward, the state of this collection object will
 881        match the state of `end_collection` passed to `diff`. If a change
 882        conflicts with a local change, it will be saved to an alternate path
 883        indicating the conflict.
 884
 885        Arguments:
 886
 887        * changes: arvados.collection.ChangeList --- The list of differences
 888          generated by `RichCollectionBase.diff`.
 889        """
 890        if changes:
 891            self.set_committed(False)
 892        for change in changes:
 893            event_type = change[0]
 894            path = change[1]
 895            initial = change[2]
 896            local = self.find(path)
 897            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 898                                                                    time.gmtime()))
 899            if event_type == ADD:
 900                if local is None:
 901                    # No local file at path, safe to copy over new file
 902                    self.copy(initial, path)
 903                elif local is not None and local != initial:
 904                    # There is already local file and it is different:
 905                    # save change to conflict file.
 906                    self.copy(initial, conflictpath)
 907            elif event_type == MOD or event_type == TOK:
 908                final = change[3]
 909                if local == initial:
 910                    # Local matches the "initial" item so it has not
 911                    # changed locally and is safe to update.
 912                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 913                        # Replace contents of local file with new contents
 914                        local.replace_contents(final)
 915                    else:
 916                        # Overwrite path with new item; this can happen if
 917                        # path was a file and is now a collection or vice versa
 918                        self.copy(final, path, overwrite=True)
 919                elif event_type == MOD:
 920                    # Local doesn't match the "start" value or local
 921                    # is missing (presumably deleted) so save change
 922                    # to conflict file.  Don't do this for TOK events
 923                    # which means the file didn't change but only had
 924                    # tokens updated.
 925                    self.copy(final, conflictpath)
 926            elif event_type == DEL:
 927                if local == initial:
 928                    # Local item matches "initial" value, so it is safe to remove.
 929                    self.remove(path, recursive=True)
 930                # else, the file is modified or already removed, in either
 931                # case we don't want to try to remove it.
 932
 933    def portable_data_hash(self) -> str:
 934        """Get the portable data hash for this collection's manifest"""
 935        if self._manifest_locator and self.committed():
 936            # If the collection is already saved on the API server, and it's committed
 937            # then return API server's PDH response.
 938            return self._portable_data_hash
 939        else:
 940            stripped = self.portable_manifest_text().encode()
 941            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 942
 943    @synchronized
 944    def subscribe(self, callback: ChangeCallback) -> None:
 945        """Set a notify callback for changes to this collection
 946
 947        Arguments:
 948
 949        * callback: arvados.collection.ChangeCallback --- The callable to
 950          call each time the collection is changed.
 951        """
 952        if self._callback is None:
 953            self._callback = callback
 954        else:
 955            raise errors.ArgumentError("A callback is already set on this collection.")
 956
 957    @synchronized
 958    def unsubscribe(self) -> None:
 959        """Remove any notify callback set for changes to this collection"""
 960        if self._callback is not None:
 961            self._callback = None
 962
 963    @synchronized
 964    def notify(
 965            self,
 966            event: ChangeType,
 967            collection: 'RichCollectionBase',
 968            name: str,
 969            item: CollectionItem,
 970    ) -> None:
 971        """Notify any subscribed callback about a change to this collection
 972
 973        .. ATTENTION:: Internal
 974           This method is only meant to be used by other Collection methods.
 975
 976        If a callback has been registered with `RichCollectionBase.subscribe`,
 977        it will be called with information about a change to this collection.
 978        Then this notification will be propagated to this collection's root.
 979
 980        Arguments:
 981
 982        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 983          the collection.
 984
 985        * collection: arvados.collection.RichCollectionBase --- The
 986          collection that was modified.
 987
 988        * name: str --- The name of the file or stream within `collection` that
 989          was modified.
 990
 991        * item: arvados.arvfile.ArvadosFile |
 992          arvados.collection.Subcollection --- For ADD events, the new
 993          contents at `name` within `collection`; for DEL events, the
 994          item that was removed.  For MOD and TOK events, a 2-tuple of
 995          the previous item and the new item (may be the same object
 996          or different, depending on whether the action involved it
 997          being modified in place or replaced).
 998
 999        """
1000        if self._callback:
1001            self._callback(event, collection, name, item)
1002        self.root_collection().notify(event, collection, name, item)
1003
1004    @synchronized
1005    def __eq__(self, other: Any) -> bool:
1006        """Indicate whether this collection object is equal to another"""
1007        if other is self:
1008            return True
1009        if not isinstance(other, RichCollectionBase):
1010            return False
1011        if len(self._items) != len(other):
1012            return False
1013        for k in self._items:
1014            if k not in other:
1015                return False
1016            if self._items[k] != other[k]:
1017                return False
1018        return True
1019
1020    def __ne__(self, other: Any) -> bool:
1021        """Indicate whether this collection object is not equal to another"""
1022        return not self.__eq__(other)
1023
1024    @synchronized
1025    def flush(self) -> None:
1026        """Upload any pending data to Keep"""
1027        for e in self.values():
1028            e.flush()

Base class for Collection classes

RichCollectionBase(parent=None)
156    def __init__(self, parent=None):
157        self.parent = parent
158        self._committed = False
159        self._has_remote_blocks = False
160        self._callback = None
161        self._items = {}
parent
def writable(self) -> bool:
172    def writable(self) -> bool:
173        """Indicate whether this collection object can be modified
174
175        This method returns `False` if this object is a `CollectionReader`,
176        else `True`.
177        """
178        raise NotImplementedError()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def root_collection(self) -> Collection:
180    def root_collection(self) -> 'Collection':
181        """Get this collection's root collection object
182
183        If you open a subcollection with `Collection.find`, calling this method
184        on that subcollection returns the source Collection object.
185        """
186        raise NotImplementedError()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def stream_name(self) -> str:
188    def stream_name(self) -> str:
189        """Get the name of the manifest stream represented by this collection
190
191        If you open a subcollection with `Collection.find`, calling this method
192        on that subcollection returns the name of the stream you opened.
193        """
194        raise NotImplementedError()

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def has_remote_blocks(self) -> bool:
196    @synchronized
197    def has_remote_blocks(self) -> bool:
198        """Indiciate whether the collection refers to remote data
199
200        Returns `True` if the collection manifest includes any Keep locators
201        with a remote hint (`+R`), else `False`.
202        """
203        if self._has_remote_blocks:
204            return True
205        for item in self:
206            if self[item].has_remote_blocks():
207                return True
208        return False

Indiciate whether the collection refers to remote data

Returns True if the collection manifest includes any Keep locators with a remote hint (+R), else False.

@synchronized
def set_has_remote_blocks(self, val: bool) -> None:
210    @synchronized
211    def set_has_remote_blocks(self, val: bool) -> None:
212        """Cache whether this collection refers to remote blocks
213
214        .. ATTENTION:: Internal
215           This method is only meant to be used by other Collection methods.
216
217        Set this collection's cached "has remote blocks" flag to the given
218        value.
219        """
220        self._has_remote_blocks = val
221        if self.parent:
222            self.parent.set_has_remote_blocks(val)

Cache whether this collection refers to remote blocks

Set this collection’s cached “has remote blocks” flag to the given value.

@must_be_writable
@synchronized
def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
224    @must_be_writable
225    @synchronized
226    def find_or_create(
227            self,
228            path: str,
229            create_type: CreateType,
230    ) -> CollectionItem:
231        """Get the item at the given path, creating it if necessary
232
233        If `path` refers to a stream in this collection, returns a
234        corresponding `Subcollection` object. If `path` refers to a file in
235        this collection, returns a corresponding
236        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
237        this collection, then this method creates a new object and returns
238        it, creating parent streams as needed. The type of object created is
239        determined by the value of `create_type`.
240
241        Arguments:
242
243        * path: str --- The path to find or create within this collection.
244
245        * create_type: Literal[COLLECTION, FILE] --- The type of object to
246          create at `path` if one does not exist. Passing `COLLECTION`
247          creates a stream and returns the corresponding
248          `Subcollection`. Passing `FILE` creates a new file and returns the
249          corresponding `arvados.arvfile.ArvadosFile`.
250        """
251        pathcomponents = path.split("/", 1)
252        if pathcomponents[0]:
253            item = self._items.get(pathcomponents[0])
254            if len(pathcomponents) == 1:
255                if item is None:
256                    # create new file
257                    if create_type == COLLECTION:
258                        item = Subcollection(self, pathcomponents[0])
259                    else:
260                        item = ArvadosFile(self, pathcomponents[0])
261                    self._items[pathcomponents[0]] = item
262                    self.set_committed(False)
263                    self.notify(ADD, self, pathcomponents[0], item)
264                return item
265            else:
266                if item is None:
267                    # create new collection
268                    item = Subcollection(self, pathcomponents[0])
269                    self._items[pathcomponents[0]] = item
270                    self.set_committed(False)
271                    self.notify(ADD, self, pathcomponents[0], item)
272                if isinstance(item, RichCollectionBase):
273                    return item.find_or_create(pathcomponents[1], create_type)
274                else:
275                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
276        else:
277            return self

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

@synchronized
def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
279    @synchronized
280    def find(self, path: str) -> CollectionItem:
281        """Get the item at the given path
282
283        If `path` refers to a stream in this collection, returns a
284        corresponding `Subcollection` object. If `path` refers to a file in
285        this collection, returns a corresponding
286        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
287        this collection, then this method raises `NotADirectoryError`.
288
289        Arguments:
290
291        * path: str --- The path to find or create within this collection.
292        """
293        if not path:
294            raise errors.ArgumentError("Parameter 'path' is empty.")
295
296        pathcomponents = path.split("/", 1)
297        if pathcomponents[0] == '':
298            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
299
300        item = self._items.get(pathcomponents[0])
301        if item is None:
302            return None
303        elif len(pathcomponents) == 1:
304            return item
305        else:
306            if isinstance(item, RichCollectionBase):
307                if pathcomponents[1]:
308                    return item.find(pathcomponents[1])
309                else:
310                    return item
311            else:
312                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
@synchronized
def mkdirs(self, path: str) -> Subcollection:
314    @synchronized
315    def mkdirs(self, path: str) -> 'Subcollection':
316        """Create and return a subcollection at `path`
317
318        If `path` exists within this collection, raises `FileExistsError`.
319        Otherwise, creates a stream at that path and returns the
320        corresponding `Subcollection`.
321        """
322        if self.find(path) != None:
323            raise IOError(errno.EEXIST, "Directory or file exists", path)
324
325        return self.find_or_create(path, COLLECTION)

Create and return a subcollection at path

If path exists within this collection, raises FileExistsError. Otherwise, creates a stream at that path and returns the corresponding Subcollection.

def open( self, path: str, mode: str = 'r', encoding: Optional[str] = None) -> <class 'IO'>:
327    def open(
328            self,
329            path: str,
330            mode: str="r",
331            encoding: Optional[str]=None
332    ) -> IO:
333        """Open a file-like object within the collection
334
335        This method returns a file-like object that can read and/or write the
336        file located at `path` within the collection. If you attempt to write
337        a `path` that does not exist, the file is created with `find_or_create`.
338        If the file cannot be opened for any other reason, this method raises
339        `OSError` with an appropriate errno.
340
341        Arguments:
342
343        * path: str --- The path of the file to open within this collection
344
345        * mode: str --- The mode to open this file. Supports all the same
346          values as `builtins.open`.
347
348        * encoding: str | None --- The text encoding of the file. Only used
349          when the file is opened in text mode. The default is
350          platform-dependent.
351
352        """
353        if not re.search(r'^[rwa][bt]?\+?$', mode):
354            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
355
356        if mode[0] == 'r' and '+' not in mode:
357            fclass = ArvadosFileReader
358            arvfile = self.find(path)
359        elif not self.writable():
360            raise IOError(errno.EROFS, "Collection is read only")
361        else:
362            fclass = ArvadosFileWriter
363            arvfile = self.find_or_create(path, FILE)
364
365        if arvfile is None:
366            raise IOError(errno.ENOENT, "File not found", path)
367        if not isinstance(arvfile, ArvadosFile):
368            raise IOError(errno.EISDIR, "Is a directory", path)
369
370        if mode[0] == 'w':
371            arvfile.truncate(0)
372
373        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
374        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
375        if 'b' not in mode:
376            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
377            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
378        return f

Open a file-like object within the collection

This method returns a file-like object that can read and/or write the file located at path within the collection. If you attempt to write a path that does not exist, the file is created with find_or_create. If the file cannot be opened for any other reason, this method raises OSError with an appropriate errno.

Arguments:

  • path: str — The path of the file to open within this collection

  • mode: str — The mode to open this file. Supports all the same values as builtins.open.

  • encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.

def modified(self) -> bool:
380    def modified(self) -> bool:
381        """Indicate whether this collection has an API server record
382
383        Returns `False` if this collection corresponds to a record loaded from
384        the API server, `True` otherwise.
385        """
386        return not self.committed()

Indicate whether this collection has an API server record

Returns False if this collection corresponds to a record loaded from the API server, True otherwise.

@synchronized
def committed(self):
388    @synchronized
389    def committed(self):
390        """Indicate whether this collection has an API server record
391
392        Returns `True` if this collection corresponds to a record loaded from
393        the API server, `False` otherwise.
394        """
395        return self._committed

Indicate whether this collection has an API server record

Returns True if this collection corresponds to a record loaded from the API server, False otherwise.

@synchronized
def set_committed(self, value: bool = True):
397    @synchronized
398    def set_committed(self, value: bool=True):
399        """Cache whether this collection has an API server record
400
401        .. ATTENTION:: Internal
402           This method is only meant to be used by other Collection methods.
403
404        Set this collection's cached "committed" flag to the given
405        value and propagates it as needed.
406        """
407        if value == self._committed:
408            return
409        if value:
410            for k,v in self._items.items():
411                v.set_committed(True)
412            self._committed = True
413        else:
414            self._committed = False
415            if self.parent is not None:
416                self.parent.set_committed(False)

Cache whether this collection has an API server record

Set this collection’s cached “committed” flag to the given value and propagates it as needed.

@synchronized
def keys(self) -> Iterator[str]:
466    @synchronized
467    def keys(self) -> Iterator[str]:
468        """Iterate names of streams and files in this collection
469
470        This method does not recurse. It only iterates the contents of this
471        collection's corresponding stream.
472        """
473        return self._items.keys()

Iterate names of streams and files in this collection

This method does not recurse. It only iterates the contents of this collection’s corresponding stream.

@synchronized
def values( self) -> List[Union[arvados.arvfile.ArvadosFile, Collection]]:
475    @synchronized
476    def values(self) -> List[CollectionItem]:
477        """Get a list of objects in this collection's stream
478
479        The return value includes a `Subcollection` for every stream, and an
480        `arvados.arvfile.ArvadosFile` for every file, directly within this
481        collection's stream.  This method does not recurse.
482        """
483        return list(self._items.values())

Get a list of objects in this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

@synchronized
def items( self) -> List[Tuple[str, Union[arvados.arvfile.ArvadosFile, Collection]]]:
485    @synchronized
486    def items(self) -> List[Tuple[str, CollectionItem]]:
487        """Get a list of `(name, object)` tuples from this collection's stream
488
489        The return value includes a `Subcollection` for every stream, and an
490        `arvados.arvfile.ArvadosFile` for every file, directly within this
491        collection's stream.  This method does not recurse.
492        """
493        return list(self._items.items())

Get a list of (name, object) tuples from this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

def exists(self, path: str) -> bool:
495    def exists(self, path: str) -> bool:
496        """Indicate whether this collection includes an item at `path`
497
498        This method returns `True` if `path` refers to a stream or file within
499        this collection, else `False`.
500
501        Arguments:
502
503        * path: str --- The path to check for existence within this collection
504        """
505        return self.find(path) is not None

Indicate whether this collection includes an item at path

This method returns True if path refers to a stream or file within this collection, else False.

Arguments:

  • path: str — The path to check for existence within this collection
@must_be_writable
@synchronized
def remove(self, path: str, recursive: bool = False) -> None:
507    @must_be_writable
508    @synchronized
509    def remove(self, path: str, recursive: bool=False) -> None:
510        """Remove the file or stream at `path`
511
512        Arguments:
513
514        * path: str --- The path of the item to remove from the collection
515
516        * recursive: bool --- Controls the method's behavior if `path` refers
517          to a nonempty stream. If `False` (the default), this method raises
518          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
519          items under the stream.
520        """
521        if not path:
522            raise errors.ArgumentError("Parameter 'path' is empty.")
523
524        pathcomponents = path.split("/", 1)
525        item = self._items.get(pathcomponents[0])
526        if item is None:
527            raise IOError(errno.ENOENT, "File not found", path)
528        if len(pathcomponents) == 1:
529            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
530                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
531            deleteditem = self._items[pathcomponents[0]]
532            del self._items[pathcomponents[0]]
533            self.set_committed(False)
534            self.notify(DEL, self, pathcomponents[0], deleteditem)
535        else:
536            item.remove(pathcomponents[1], recursive=recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

def clone(self):
542    def clone(self):
543        raise NotImplementedError()
@must_be_writable
@synchronized
def add( self, source_obj: Union[arvados.arvfile.ArvadosFile, Collection], target_name: str, overwrite: bool = False, reparent: bool = False) -> None:
545    @must_be_writable
546    @synchronized
547    def add(
548            self,
549            source_obj: CollectionItem,
550            target_name: str,
551            overwrite: bool=False,
552            reparent: bool=False,
553    ) -> None:
554        """Copy or move a file or subcollection object to this collection
555
556        Arguments:
557
558        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
559          to add to this collection
560
561        * target_name: str --- The path inside this collection where
562          `source_obj` should be added.
563
564        * overwrite: bool --- Controls the behavior of this method when the
565          collection already contains an object at `target_name`. If `False`
566          (the default), this method will raise `FileExistsError`. If `True`,
567          the object at `target_name` will be replaced with `source_obj`.
568
569        * reparent: bool --- Controls whether this method copies or moves
570          `source_obj`. If `False` (the default), `source_obj` is copied into
571          this collection. If `True`, `source_obj` is moved into this
572          collection.
573        """
574        if target_name in self and not overwrite:
575            raise IOError(errno.EEXIST, "File already exists", target_name)
576
577        modified_from = None
578        if target_name in self:
579            modified_from = self[target_name]
580
581        # Actually make the move or copy.
582        if reparent:
583            source_obj._reparent(self, target_name)
584            item = source_obj
585        else:
586            item = source_obj.clone(self, target_name)
587
588        self._items[target_name] = item
589        self.set_committed(False)
590        if not self._has_remote_blocks and source_obj.has_remote_blocks():
591            self.set_has_remote_blocks(True)
592
593        if modified_from:
594            self.notify(MOD, self, target_name, (modified_from, item))
595        else:
596            self.notify(ADD, self, target_name, item)

Copy or move a file or subcollection object to this collection

Arguments:

  • source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection

  • target_name: str — The path inside this collection where source_obj should be added.

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_name. If False (the default), this method will raise FileExistsError. If True, the object at target_name will be replaced with source_obj.

  • reparent: bool — Controls whether this method copies or moves source_obj. If False (the default), source_obj is copied into this collection. If True, source_obj is moved into this collection.

@must_be_writable
@synchronized
def copy( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
638    @must_be_writable
639    @synchronized
640    def copy(
641            self,
642            source: Union[str, CollectionItem],
643            target_path: str,
644            source_collection: Optional['RichCollectionBase']=None,
645            overwrite: bool=False,
646    ) -> None:
647        """Copy a file or subcollection object to this collection
648
649        Arguments:
650
651        * source: str | arvados.arvfile.ArvadosFile |
652          arvados.collection.Subcollection --- The file or subcollection to
653          add to this collection. If `source` is a str, the object will be
654          found by looking up this path from `source_collection` (see
655          below).
656
657        * target_path: str --- The path inside this collection where the
658          source object should be added.
659
660        * source_collection: arvados.collection.Collection | None --- The
661          collection to find the source object from when `source` is a
662          path. Defaults to the current collection (`self`).
663
664        * overwrite: bool --- Controls the behavior of this method when the
665          collection already contains an object at `target_path`. If `False`
666          (the default), this method will raise `FileExistsError`. If `True`,
667          the object at `target_path` will be replaced with `source_obj`.
668        """
669        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
670        target_dir.add(source_obj, target_name, overwrite, False)

Copy a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

@must_be_writable
@synchronized
def rename( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
672    @must_be_writable
673    @synchronized
674    def rename(
675            self,
676            source: Union[str, CollectionItem],
677            target_path: str,
678            source_collection: Optional['RichCollectionBase']=None,
679            overwrite: bool=False,
680    ) -> None:
681        """Move a file or subcollection object to this collection
682
683        Arguments:
684
685        * source: str | arvados.arvfile.ArvadosFile |
686          arvados.collection.Subcollection --- The file or subcollection to
687          add to this collection. If `source` is a str, the object will be
688          found by looking up this path from `source_collection` (see
689          below).
690
691        * target_path: str --- The path inside this collection where the
692          source object should be added.
693
694        * source_collection: arvados.collection.Collection | None --- The
695          collection to find the source object from when `source` is a
696          path. Defaults to the current collection (`self`).
697
698        * overwrite: bool --- Controls the behavior of this method when the
699          collection already contains an object at `target_path`. If `False`
700          (the default), this method will raise `FileExistsError`. If `True`,
701          the object at `target_path` will be replaced with `source_obj`.
702        """
703        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
704        if not source_obj.writable():
705            raise IOError(errno.EROFS, "Source collection is read only", source)
706        target_dir.add(source_obj, target_name, overwrite, True)

Move a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

def portable_manifest_text(self, stream_name: str = '.') -> str:
708    def portable_manifest_text(self, stream_name: str=".") -> str:
709        """Get the portable manifest text for this collection
710
711        The portable manifest text is normalized, and does not include access
712        tokens. This method does not flush outstanding blocks to Keep.
713
714        Arguments:
715
716        * stream_name: str --- The name to use for this collection's stream in
717          the generated manifest. Default `'.'`.
718        """
719        return self._get_manifest_text(stream_name, True, True)

Get the portable manifest text for this collection

The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.
@synchronized
def manifest_text( self, stream_name: str = '.', strip: bool = False, normalize: bool = False, only_committed: bool = False) -> str:
721    @synchronized
722    def manifest_text(
723            self,
724            stream_name: str=".",
725            strip: bool=False,
726            normalize: bool=False,
727            only_committed: bool=False,
728    ) -> str:
729        """Get the manifest text for this collection
730
731        Arguments:
732
733        * stream_name: str --- The name to use for this collection's stream in
734          the generated manifest. Default `'.'`.
735
736        * strip: bool --- Controls whether or not the returned manifest text
737          includes access tokens. If `False` (the default), the manifest text
738          will include access tokens. If `True`, the manifest text will not
739          include access tokens.
740
741        * normalize: bool --- Controls whether or not the returned manifest
742          text is normalized. Default `False`.
743
744        * only_committed: bool --- Controls whether or not this method uploads
745          pending data to Keep before building and returning the manifest text.
746          If `False` (the default), this method will finish uploading all data
747          to Keep, then return the final manifest. If `True`, this method will
748          build and return a manifest that only refers to the data that has
749          finished uploading at the time this method was called.
750        """
751        if not only_committed:
752            self._my_block_manager().commit_all()
753        return self._get_manifest_text(stream_name, strip, normalize,
754                                       only_committed=only_committed)

Get the manifest text for this collection

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.

  • strip: bool — Controls whether or not the returned manifest text includes access tokens. If False (the default), the manifest text will include access tokens. If True, the manifest text will not include access tokens.

  • normalize: bool — Controls whether or not the returned manifest text is normalized. Default False.

  • only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If False (the default), this method will finish uploading all data to Keep, then return the final manifest. If True, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.

@synchronized
def diff( self, end_collection: RichCollectionBase, prefix: str = '.', holding_collection: Optional[Collection] = None) -> List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]:
832    @synchronized
833    def diff(
834            self,
835            end_collection: 'RichCollectionBase',
836            prefix: str=".",
837            holding_collection: Optional['Collection']=None,
838    ) -> ChangeList:
839        """Build a list of differences between this collection and another
840
841        Arguments:
842
843        * end_collection: arvados.collection.RichCollectionBase --- A
844          collection object with the desired end state. The returned diff
845          list will describe how to go from the current collection object
846          `self` to `end_collection`.
847
848        * prefix: str --- The name to use for this collection's stream in
849          the diff list. Default `'.'`.
850
851        * holding_collection: arvados.collection.Collection | None --- A
852          collection object used to hold objects for the returned diff
853          list. By default, a new empty collection is created.
854        """
855        changes = []
856        if holding_collection is None:
857            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
858        for k in self:
859            if k not in end_collection:
860               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
861        for k in end_collection:
862            if k in self:
863                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
864                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
865                elif end_collection[k] != self[k]:
866                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
867                else:
868                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
869            else:
870                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
871        return changes

Build a list of differences between this collection and another

Arguments:

  • end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object self to end_collection.

  • prefix: str — The name to use for this collection’s stream in the diff list. Default '.'.

  • holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.

@must_be_writable
@synchronized
def apply( self, changes: List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]) -> None:
873    @must_be_writable
874    @synchronized
875    def apply(self, changes: ChangeList) -> None:
876        """Apply a list of changes from to this collection
877
878        This method takes a list of changes generated by
879        `RichCollectionBase.diff` and applies it to this
880        collection. Afterward, the state of this collection object will
881        match the state of `end_collection` passed to `diff`. If a change
882        conflicts with a local change, it will be saved to an alternate path
883        indicating the conflict.
884
885        Arguments:
886
887        * changes: arvados.collection.ChangeList --- The list of differences
888          generated by `RichCollectionBase.diff`.
889        """
890        if changes:
891            self.set_committed(False)
892        for change in changes:
893            event_type = change[0]
894            path = change[1]
895            initial = change[2]
896            local = self.find(path)
897            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
898                                                                    time.gmtime()))
899            if event_type == ADD:
900                if local is None:
901                    # No local file at path, safe to copy over new file
902                    self.copy(initial, path)
903                elif local is not None and local != initial:
904                    # There is already local file and it is different:
905                    # save change to conflict file.
906                    self.copy(initial, conflictpath)
907            elif event_type == MOD or event_type == TOK:
908                final = change[3]
909                if local == initial:
910                    # Local matches the "initial" item so it has not
911                    # changed locally and is safe to update.
912                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
913                        # Replace contents of local file with new contents
914                        local.replace_contents(final)
915                    else:
916                        # Overwrite path with new item; this can happen if
917                        # path was a file and is now a collection or vice versa
918                        self.copy(final, path, overwrite=True)
919                elif event_type == MOD:
920                    # Local doesn't match the "start" value or local
921                    # is missing (presumably deleted) so save change
922                    # to conflict file.  Don't do this for TOK events
923                    # which means the file didn't change but only had
924                    # tokens updated.
925                    self.copy(final, conflictpath)
926            elif event_type == DEL:
927                if local == initial:
928                    # Local item matches "initial" value, so it is safe to remove.
929                    self.remove(path, recursive=True)
930                # else, the file is modified or already removed, in either
931                # case we don't want to try to remove it.

Apply a list of changes from to this collection

This method takes a list of changes generated by RichCollectionBase.diff and applies it to this collection. Afterward, the state of this collection object will match the state of end_collection passed to diff. If a change conflicts with a local change, it will be saved to an alternate path indicating the conflict.

Arguments:

def portable_data_hash(self) -> str:
933    def portable_data_hash(self) -> str:
934        """Get the portable data hash for this collection's manifest"""
935        if self._manifest_locator and self.committed():
936            # If the collection is already saved on the API server, and it's committed
937            # then return API server's PDH response.
938            return self._portable_data_hash
939        else:
940            stripped = self.portable_manifest_text().encode()
941            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))

Get the portable data hash for this collection’s manifest

@synchronized
def subscribe( self, callback: Callable[[Literal['add', 'del', 'mod', 'tok'], Collection, str, Union[arvados.arvfile.ArvadosFile, Collection]], object]) -> None:
943    @synchronized
944    def subscribe(self, callback: ChangeCallback) -> None:
945        """Set a notify callback for changes to this collection
946
947        Arguments:
948
949        * callback: arvados.collection.ChangeCallback --- The callable to
950          call each time the collection is changed.
951        """
952        if self._callback is None:
953            self._callback = callback
954        else:
955            raise errors.ArgumentError("A callback is already set on this collection.")

Set a notify callback for changes to this collection

Arguments:

@synchronized
def unsubscribe(self) -> None:
957    @synchronized
958    def unsubscribe(self) -> None:
959        """Remove any notify callback set for changes to this collection"""
960        if self._callback is not None:
961            self._callback = None

Remove any notify callback set for changes to this collection

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
 963    @synchronized
 964    def notify(
 965            self,
 966            event: ChangeType,
 967            collection: 'RichCollectionBase',
 968            name: str,
 969            item: CollectionItem,
 970    ) -> None:
 971        """Notify any subscribed callback about a change to this collection
 972
 973        .. ATTENTION:: Internal
 974           This method is only meant to be used by other Collection methods.
 975
 976        If a callback has been registered with `RichCollectionBase.subscribe`,
 977        it will be called with information about a change to this collection.
 978        Then this notification will be propagated to this collection's root.
 979
 980        Arguments:
 981
 982        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 983          the collection.
 984
 985        * collection: arvados.collection.RichCollectionBase --- The
 986          collection that was modified.
 987
 988        * name: str --- The name of the file or stream within `collection` that
 989          was modified.
 990
 991        * item: arvados.arvfile.ArvadosFile |
 992          arvados.collection.Subcollection --- For ADD events, the new
 993          contents at `name` within `collection`; for DEL events, the
 994          item that was removed.  For MOD and TOK events, a 2-tuple of
 995          the previous item and the new item (may be the same object
 996          or different, depending on whether the action involved it
 997          being modified in place or replaced).
 998
 999        """
1000        if self._callback:
1001            self._callback(event, collection, name, item)
1002        self.root_collection().notify(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — For ADD events, the new contents at name within collection; for DEL events, the item that was removed. For MOD and TOK events, a 2-tuple of the previous item and the new item (may be the same object or different, depending on whether the action involved it being modified in place or replaced).

@synchronized
def flush(self) -> None:
1024    @synchronized
1025    def flush(self) -> None:
1026        """Upload any pending data to Keep"""
1027        for e in self.values():
1028            e.flush()

Upload any pending data to Keep

Inherited Members
CollectionBase
stripped_manifest
class Collection(RichCollectionBase):
1031class Collection(RichCollectionBase):
1032    """Read and manipulate an Arvados collection
1033
1034    This class provides a high-level interface to create, read, and update
1035    Arvados collections and their contents. Refer to the Arvados Python SDK
1036    cookbook for [an introduction to using the Collection class][cookbook].
1037
1038    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1039    """
1040
1041    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1042                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1043                 keep_client: Optional['arvados.keep.KeepClient']=None,
1044                 num_retries: int=10,
1045                 parent: Optional['Collection']=None,
1046                 apiconfig: Optional[Mapping[str, str]]=None,
1047                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1048                 replication_desired: Optional[int]=None,
1049                 storage_classes_desired: Optional[List[str]]=None,
1050                 put_threads: Optional[int]=None):
1051        """Initialize a Collection object
1052
1053        Arguments:
1054
1055        * manifest_locator_or_text: str | None --- This string can contain a
1056          collection manifest text, portable data hash, or UUID. When given a
1057          portable data hash or UUID, this instance will load a collection
1058          record from the API server. Otherwise, this instance will represent a
1059          new collection without an API server record. The default value `None`
1060          instantiates a new collection with an empty manifest.
1061
1062        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1063          Arvados API client object this instance uses to make requests. If
1064          none is given, this instance creates its own client using the
1065          settings from `apiconfig` (see below). If your client instantiates
1066          many Collection objects, you can help limit memory utilization by
1067          calling `arvados.api.api` to construct an
1068          `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1069          for every Collection.
1070
1071        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1072          object this instance uses to make requests. If none is given, this
1073          instance creates its own client using its `api_client`.
1074
1075        * num_retries: int --- The number of times that client requests are
1076          retried. Default 10.
1077
1078        * parent: arvados.collection.Collection | None --- The parent Collection
1079          object of this instance, if any. This argument is primarily used by
1080          other Collection methods; user client code shouldn't need to use it.
1081
1082        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1083          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1084          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1085          Collection object constructs one from these settings. If no
1086          mapping is provided, calls `arvados.config.settings` to get these
1087          parameters from user configuration.
1088
1089        * block_manager: arvados.arvfile._BlockManager | None --- The
1090          _BlockManager object used by this instance to coordinate reading
1091          and writing Keep data blocks. If none is given, this instance
1092          constructs its own. This argument is primarily used by other
1093          Collection methods; user client code shouldn't need to use it.
1094
1095        * replication_desired: int | None --- This controls both the value of
1096          the `replication_desired` field on API collection records saved by
1097          this class, as well as the number of Keep services that the object
1098          writes new data blocks to. If none is given, uses the default value
1099          configured for the cluster.
1100
1101        * storage_classes_desired: list[str] | None --- This controls both
1102          the value of the `storage_classes_desired` field on API collection
1103          records saved by this class, as well as selecting which specific
1104          Keep services the object writes new data blocks to. If none is
1105          given, defaults to an empty list.
1106
1107        * put_threads: int | None --- The number of threads to run
1108          simultaneously to upload data blocks to Keep. This value is used when
1109          building a new `block_manager`. It is unused when a `block_manager`
1110          is provided.
1111        """
1112
1113        if storage_classes_desired and type(storage_classes_desired) is not list:
1114            raise errors.ArgumentError("storage_classes_desired must be list type.")
1115
1116        super(Collection, self).__init__(parent)
1117        self._api_client = api_client
1118        self._keep_client = keep_client
1119
1120        # Use the keep client from ThreadSafeAPIClient
1121        if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1122            self._keep_client = self._api_client.keep
1123
1124        self._block_manager = block_manager
1125        self.replication_desired = replication_desired
1126        self._storage_classes_desired = storage_classes_desired
1127        self.put_threads = put_threads
1128
1129        if apiconfig:
1130            self._config = apiconfig
1131        else:
1132            self._config = config.settings()
1133
1134        self.num_retries = num_retries
1135        self._manifest_locator = None
1136        self._manifest_text = None
1137        self._portable_data_hash = None
1138        self._api_response = None
1139        self._token_refresh_timestamp = 0
1140
1141        self.lock = threading.RLock()
1142        self.events = None
1143
1144        if manifest_locator_or_text:
1145            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1146                self._manifest_locator = manifest_locator_or_text
1147            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1148                self._manifest_locator = manifest_locator_or_text
1149                if not self._has_local_collection_uuid():
1150                    self._has_remote_blocks = True
1151            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1152                self._manifest_text = manifest_locator_or_text
1153                if '+R' in self._manifest_text:
1154                    self._has_remote_blocks = True
1155            else:
1156                raise errors.ArgumentError(
1157                    "Argument to CollectionReader is not a manifest or a collection UUID")
1158
1159            try:
1160                self._populate()
1161            except errors.SyntaxError as e:
1162                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1163
1164    def storage_classes_desired(self) -> List[str]:
1165        """Get this collection's `storage_classes_desired` value"""
1166        return self._storage_classes_desired or []
1167
1168    def root_collection(self) -> 'Collection':
1169        return self
1170
1171    def get_properties(self) -> Properties:
1172        """Get this collection's properties
1173
1174        This method always returns a dict. If this collection object does not
1175        have an associated API record, or that record does not have any
1176        properties set, this method returns an empty dict.
1177        """
1178        if self._api_response and self._api_response["properties"]:
1179            return self._api_response["properties"]
1180        else:
1181            return {}
1182
1183    def get_trash_at(self) -> Optional[datetime.datetime]:
1184        """Get this collection's `trash_at` field
1185
1186        This method parses the `trash_at` field of the collection's API
1187        record and returns a datetime from it. If that field is not set, or
1188        this collection object does not have an associated API record,
1189        returns None.
1190        """
1191        if self._api_response and self._api_response["trash_at"]:
1192            try:
1193                return ciso8601.parse_datetime(self._api_response["trash_at"])
1194            except ValueError:
1195                return None
1196        else:
1197            return None
1198
1199    def stream_name(self) -> str:
1200        return "."
1201
1202    def writable(self) -> bool:
1203        return True
1204
1205    @synchronized
1206    @retry_method
1207    def update(
1208            self,
1209            other: Optional['Collection']=None,
1210            num_retries: Optional[int]=None,
1211    ) -> None:
1212        """Merge another collection's contents into this one
1213
1214        This method compares the manifest of this collection instance with
1215        another, then updates this instance's manifest with changes from the
1216        other, renaming files to flag conflicts where necessary.
1217
1218        When called without any arguments, this method reloads the collection's
1219        API record, and updates this instance with any changes that have
1220        appeared server-side. If this instance does not have a corresponding
1221        API record, this method raises `arvados.errors.ArgumentError`.
1222
1223        Arguments:
1224
1225        * other: arvados.collection.Collection | None --- The collection
1226          whose contents should be merged into this instance. When not
1227          provided, this method reloads this collection's API record and
1228          constructs a Collection object from it.  If this instance does not
1229          have a corresponding API record, this method raises
1230          `arvados.errors.ArgumentError`.
1231
1232        * num_retries: int | None --- The number of times to retry reloading
1233          the collection's API record from the API server. If not specified,
1234          uses the `num_retries` provided when this instance was constructed.
1235        """
1236
1237        token_refresh_period = 60*60
1238        time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp)
1239        upstream_response = None
1240
1241        if other is None:
1242            if self._manifest_locator is None:
1243                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1244
1245            if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period:
1246                return
1247
1248            upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249            other = CollectionReader(upstream_response["manifest_text"])
1250
1251        if self.committed():
1252            # 1st case, no local changes, content is the same
1253            if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period:
1254                # No difference in content.  Remember the API record
1255                # (metadata such as name or properties may have changed)
1256                # but don't update the token refresh timestamp.
1257                if upstream_response is not None:
1258                    self._remember_api_response(upstream_response)
1259                return
1260
1261            # 2nd case, no local changes, but either upstream changed
1262            # or we want to refresh tokens.
1263
1264            self.apply(self.diff(other))
1265            if upstream_response is not None:
1266                self._remember_api_response(upstream_response)
1267            self._update_token_timestamp()
1268            self.set_committed(True)
1269            return
1270
1271        # 3rd case, upstream changed, but we also have uncommitted
1272        # changes that we want to incorporate so they don't get lost.
1273
1274        # _manifest_text stores the text from last time we received a
1275        # record from the API server.  This is the state of the
1276        # collection before our uncommitted changes.
1277        baseline = Collection(self._manifest_text)
1278
1279        # Get the set of changes between our baseline and the other
1280        # collection and apply them to self.
1281        #
1282        # If a file was modified in both 'self' and 'other', the
1283        # 'apply' method keeps the contents of 'self' and creates a
1284        # conflict file with the contents of 'other'.
1285        self.apply(baseline.diff(other))
1286
1287        # Remember the new baseline, changes to a file
1288        if upstream_response is not None:
1289            self._remember_api_response(upstream_response)
1290
1291
1292    @synchronized
1293    def _my_api(self):
1294        if self._api_client is None:
1295            self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1296            if self._keep_client is None:
1297                self._keep_client = self._api_client.keep
1298        return self._api_client
1299
1300    @synchronized
1301    def _my_keep(self):
1302        if self._keep_client is None:
1303            if self._api_client is None:
1304                self._my_api()
1305            else:
1306                self._keep_client = KeepClient(api_client=self._api_client)
1307        return self._keep_client
1308
1309    @synchronized
1310    def _my_block_manager(self):
1311        if self._block_manager is None:
1312            copies = (self.replication_desired or
1313                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1314                                                   2))
1315            self._block_manager = _BlockManager(self._my_keep(),
1316                                                copies=copies,
1317                                                put_threads=self.put_threads,
1318                                                num_retries=self.num_retries,
1319                                                storage_classes_func=self.storage_classes_desired)
1320        return self._block_manager
1321
1322    def _remember_api_response(self, response):
1323        self._api_response = response
1324        self._manifest_text = self._api_response['manifest_text']
1325        self._portable_data_hash = self._api_response['portable_data_hash']
1326
1327    def _update_token_timestamp(self):
1328        self._token_refresh_timestamp = time.time()
1329
1330    def _populate_from_api_server(self):
1331        # As in KeepClient itself, we must wait until the last
1332        # possible moment to instantiate an API client, in order to
1333        # avoid tripping up clients that don't have access to an API
1334        # server.  If we do build one, make sure our Keep client uses
1335        # it.  If instantiation fails, we'll fall back to the except
1336        # clause, just like any other Collection lookup
1337        # failure. Return an exception, or None if successful.
1338        self._remember_api_response(self._my_api().collections().get(
1339            uuid=self._manifest_locator).execute(
1340                num_retries=self.num_retries))
1341
1342        # If not overriden via kwargs, we should try to load the
1343        # replication_desired and storage_classes_desired from the API server
1344        if self.replication_desired is None:
1345            self.replication_desired = self._api_response.get('replication_desired', None)
1346        if self._storage_classes_desired is None:
1347            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1348
1349    def _populate(self):
1350        if self._manifest_text is None:
1351            if self._manifest_locator is None:
1352                return
1353            else:
1354                self._populate_from_api_server()
1355        self._baseline_manifest = self._manifest_text
1356        self._import_manifest(self._manifest_text)
1357
1358    def _has_collection_uuid(self):
1359        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1360
1361    def _has_local_collection_uuid(self):
1362        return self._has_collection_uuid and \
1363            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1364
1365    def __enter__(self):
1366        return self
1367
1368    def __exit__(self, exc_type, exc_value, traceback):
1369        """Exit a context with this collection instance
1370
1371        If no exception was raised inside the context block, and this
1372        collection is writable and has a corresponding API record, that
1373        record will be updated to match the state of this instance at the end
1374        of the block.
1375        """
1376        if exc_type is None:
1377            if self.writable() and self._has_collection_uuid():
1378                self.save()
1379        self.stop_threads()
1380
1381    def stop_threads(self) -> None:
1382        """Stop background Keep upload/download threads"""
1383        if self._block_manager is not None:
1384            self._block_manager.stop_threads()
1385
1386    @synchronized
1387    def manifest_locator(self) -> Optional[str]:
1388        """Get this collection's manifest locator, if any
1389
1390        * If this collection instance is associated with an API record with a
1391          UUID, return that.
1392        * Otherwise, if this collection instance was loaded from an API record
1393          by portable data hash, return that.
1394        * Otherwise, return `None`.
1395        """
1396        return self._manifest_locator
1397
1398    @synchronized
1399    def clone(
1400            self,
1401            new_parent: Optional['Collection']=None,
1402            new_name: Optional[str]=None,
1403            readonly: bool=False,
1404            new_config: Optional[Mapping[str, str]]=None,
1405    ) -> 'Collection':
1406        """Create a Collection object with the same contents as this instance
1407
1408        This method creates a new Collection object with contents that match
1409        this instance's. The new collection will not be associated with any API
1410        record.
1411
1412        Arguments:
1413
1414        * new_parent: arvados.collection.Collection | None --- This value is
1415          passed to the new Collection's constructor as the `parent`
1416          argument.
1417
1418        * new_name: str | None --- This value is unused.
1419
1420        * readonly: bool --- If this value is true, this method constructs and
1421          returns a `CollectionReader`. Otherwise, it returns a mutable
1422          `Collection`. Default `False`.
1423
1424        * new_config: Mapping[str, str] | None --- This value is passed to the
1425          new Collection's constructor as `apiconfig`. If no value is provided,
1426          defaults to the configuration passed to this instance's constructor.
1427        """
1428        if new_config is None:
1429            new_config = self._config
1430        if readonly:
1431            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1432        else:
1433            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1434
1435        newcollection._clonefrom(self)
1436        return newcollection
1437
1438    @synchronized
1439    def api_response(self) -> Optional[Dict[str, Any]]:
1440        """Get this instance's associated API record
1441
1442        If this Collection instance has an associated API record, return it.
1443        Otherwise, return `None`.
1444        """
1445        return self._api_response
1446
1447    def find_or_create(
1448            self,
1449            path: str,
1450            create_type: CreateType,
1451    ) -> CollectionItem:
1452        if path == ".":
1453            return self
1454        else:
1455            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1456
1457    def find(self, path: str) -> CollectionItem:
1458        if path == ".":
1459            return self
1460        else:
1461            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1462
1463    def remove(self, path: str, recursive: bool=False) -> None:
1464        if path == ".":
1465            raise errors.ArgumentError("Cannot remove '.'")
1466        else:
1467            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1468
1469    @must_be_writable
1470    @synchronized
1471    @retry_method
1472    def save(
1473            self,
1474            properties: Optional[Properties]=None,
1475            storage_classes: Optional[StorageClasses]=None,
1476            trash_at: Optional[datetime.datetime]=None,
1477            merge: bool=True,
1478            num_retries: Optional[int]=None,
1479            preserve_version: bool=False,
1480    ) -> str:
1481        """Save collection to an existing API record
1482
1483        This method updates the instance's corresponding API record to match
1484        the instance's state. If this instance does not have a corresponding API
1485        record yet, raises `AssertionError`. (To create a new API record, use
1486        `Collection.save_new`.) This method returns the saved collection
1487        manifest.
1488
1489        Arguments:
1490
1491        * properties: dict[str, Any] | None --- If provided, the API record will
1492          be updated with these properties. Note this will completely replace
1493          any existing properties.
1494
1495        * storage_classes: list[str] | None --- If provided, the API record will
1496          be updated with this value in the `storage_classes_desired` field.
1497          This value will also be saved on the instance and used for any
1498          changes that follow.
1499
1500        * trash_at: datetime.datetime | None --- If provided, the API record
1501          will be updated with this value in the `trash_at` field.
1502
1503        * merge: bool --- If `True` (the default), this method will first
1504          reload this collection's API record, and merge any new contents into
1505          this instance before saving changes. See `Collection.update` for
1506          details.
1507
1508        * num_retries: int | None --- The number of times to retry reloading
1509          the collection's API record from the API server. If not specified,
1510          uses the `num_retries` provided when this instance was constructed.
1511
1512        * preserve_version: bool --- This value will be passed to directly
1513          to the underlying API call. If `True`, the Arvados API will
1514          preserve the versions of this collection both immediately before
1515          and after the update. If `True` when the API server is not
1516          configured with collection versioning, this method raises
1517          `arvados.errors.ArgumentError`.
1518        """
1519        if properties and type(properties) is not dict:
1520            raise errors.ArgumentError("properties must be dictionary type.")
1521
1522        if storage_classes and type(storage_classes) is not list:
1523            raise errors.ArgumentError("storage_classes must be list type.")
1524        if storage_classes:
1525            self._storage_classes_desired = storage_classes
1526
1527        if trash_at and type(trash_at) is not datetime.datetime:
1528            raise errors.ArgumentError("trash_at must be datetime type.")
1529
1530        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1531            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1532
1533        body={}
1534        if properties:
1535            body["properties"] = properties
1536        if self.storage_classes_desired():
1537            body["storage_classes_desired"] = self.storage_classes_desired()
1538        if trash_at:
1539            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1540            body["trash_at"] = t
1541        if preserve_version:
1542            body["preserve_version"] = preserve_version
1543
1544        if not self.committed():
1545            if self._has_remote_blocks:
1546                # Copy any remote blocks to the local cluster.
1547                self._copy_remote_blocks(remote_blocks={})
1548                self._has_remote_blocks = False
1549            if not self._has_collection_uuid():
1550                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1551            elif not self._has_local_collection_uuid():
1552                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1553
1554            self._my_block_manager().commit_all()
1555
1556            if merge:
1557                self.update()
1558
1559            text = self.manifest_text(strip=False)
1560            body['manifest_text'] = text
1561
1562            self._remember_api_response(self._my_api().collections().update(
1563                uuid=self._manifest_locator,
1564                body=body
1565                ).execute(num_retries=num_retries))
1566            self.set_committed(True)
1567        elif body:
1568            self._remember_api_response(self._my_api().collections().update(
1569                uuid=self._manifest_locator,
1570                body=body
1571                ).execute(num_retries=num_retries))
1572
1573        return self._manifest_text
1574
1575
1576    @must_be_writable
1577    @synchronized
1578    @retry_method
1579    def save_new(
1580            self,
1581            name: Optional[str]=None,
1582            create_collection_record: bool=True,
1583            owner_uuid: Optional[str]=None,
1584            properties: Optional[Properties]=None,
1585            storage_classes: Optional[StorageClasses]=None,
1586            trash_at: Optional[datetime.datetime]=None,
1587            ensure_unique_name: bool=False,
1588            num_retries: Optional[int]=None,
1589            preserve_version: bool=False,
1590    ):
1591        """Save collection to a new API record
1592
1593        This method finishes uploading new data blocks and (optionally)
1594        creates a new API collection record with the provided data. If a new
1595        record is created, this instance becomes associated with that record
1596        for future updates like `save()`. This method returns the saved
1597        collection manifest.
1598
1599        Arguments:
1600
1601        * name: str | None --- The `name` field to use on the new collection
1602          record. If not specified, a generic default name is generated.
1603
1604        * create_collection_record: bool --- If `True` (the default), creates a
1605          collection record on the API server. If `False`, the method finishes
1606          all data uploads and only returns the resulting collection manifest
1607          without sending it to the API server.
1608
1609        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1610          new collection record.
1611
1612        * properties: dict[str, Any] | None --- The `properties` field to use on
1613          the new collection record.
1614
1615        * storage_classes: list[str] | None --- The
1616          `storage_classes_desired` field to use on the new collection record.
1617
1618        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1619          on the new collection record.
1620
1621        * ensure_unique_name: bool --- This value is passed directly to the
1622          Arvados API when creating the collection record. If `True`, the API
1623          server may modify the submitted `name` to ensure the collection's
1624          `name`+`owner_uuid` combination is unique. If `False` (the default),
1625          if a collection already exists with this same `name`+`owner_uuid`
1626          combination, creating a collection record will raise a validation
1627          error.
1628
1629        * num_retries: int | None --- The number of times to retry reloading
1630          the collection's API record from the API server. If not specified,
1631          uses the `num_retries` provided when this instance was constructed.
1632
1633        * preserve_version: bool --- This value will be passed to directly
1634          to the underlying API call. If `True`, the Arvados API will
1635          preserve the versions of this collection both immediately before
1636          and after the update. If `True` when the API server is not
1637          configured with collection versioning, this method raises
1638          `arvados.errors.ArgumentError`.
1639        """
1640        if properties and type(properties) is not dict:
1641            raise errors.ArgumentError("properties must be dictionary type.")
1642
1643        if storage_classes and type(storage_classes) is not list:
1644            raise errors.ArgumentError("storage_classes must be list type.")
1645
1646        if trash_at and type(trash_at) is not datetime.datetime:
1647            raise errors.ArgumentError("trash_at must be datetime type.")
1648
1649        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1650            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1651
1652        if self._has_remote_blocks:
1653            # Copy any remote blocks to the local cluster.
1654            self._copy_remote_blocks(remote_blocks={})
1655            self._has_remote_blocks = False
1656
1657        if storage_classes:
1658            self._storage_classes_desired = storage_classes
1659
1660        self._my_block_manager().commit_all()
1661        text = self.manifest_text(strip=False)
1662
1663        if create_collection_record:
1664            if name is None:
1665                name = "New collection"
1666                ensure_unique_name = True
1667
1668            body = {"manifest_text": text,
1669                    "name": name,
1670                    "replication_desired": self.replication_desired}
1671            if owner_uuid:
1672                body["owner_uuid"] = owner_uuid
1673            if properties:
1674                body["properties"] = properties
1675            if self.storage_classes_desired():
1676                body["storage_classes_desired"] = self.storage_classes_desired()
1677            if trash_at:
1678                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1679                body["trash_at"] = t
1680            if preserve_version:
1681                body["preserve_version"] = preserve_version
1682
1683            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1684            self._manifest_locator = self._api_response["uuid"]
1685            self.set_committed(True)
1686
1687        return text
1688
1689    _token_re = re.compile(r'(\S+)(\s+|$)')
1690    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1691    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1692
1693    def _unescape_manifest_path(self, path):
1694        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1695
1696    @synchronized
1697    def _import_manifest(self, manifest_text):
1698        """Import a manifest into a `Collection`.
1699
1700        :manifest_text:
1701          The manifest text to import from.
1702
1703        """
1704        if len(self) > 0:
1705            raise ArgumentError("Can only import manifest into an empty collection")
1706
1707        STREAM_NAME = 0
1708        BLOCKS = 1
1709        SEGMENTS = 2
1710
1711        stream_name = None
1712        state = STREAM_NAME
1713
1714        for token_and_separator in self._token_re.finditer(manifest_text):
1715            tok = token_and_separator.group(1)
1716            sep = token_and_separator.group(2)
1717
1718            if state == STREAM_NAME:
1719                # starting a new stream
1720                stream_name = self._unescape_manifest_path(tok)
1721                blocks = []
1722                segments = []
1723                streamoffset = 0
1724                state = BLOCKS
1725                self.find_or_create(stream_name, COLLECTION)
1726                continue
1727
1728            if state == BLOCKS:
1729                block_locator = self._block_re.match(tok)
1730                if block_locator:
1731                    blocksize = int(block_locator.group(1))
1732                    blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1733                    streamoffset += blocksize
1734                else:
1735                    state = SEGMENTS
1736
1737            if state == SEGMENTS:
1738                file_segment = self._segment_re.match(tok)
1739                if file_segment:
1740                    pos = int(file_segment.group(1))
1741                    size = int(file_segment.group(2))
1742                    name = self._unescape_manifest_path(file_segment.group(3))
1743                    if name.split('/')[-1] == '.':
1744                        # placeholder for persisting an empty directory, not a real file
1745                        if len(name) > 2:
1746                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1747                    else:
1748                        filepath = os.path.join(stream_name, name)
1749                        try:
1750                            afile = self.find_or_create(filepath, FILE)
1751                        except IOError as e:
1752                            if e.errno == errno.ENOTDIR:
1753                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1754                            else:
1755                                raise e from None
1756                        if isinstance(afile, ArvadosFile):
1757                            afile.add_segment(blocks, pos, size)
1758                        else:
1759                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1760                else:
1761                    # error!
1762                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1763
1764            if sep == "\n":
1765                stream_name = None
1766                state = STREAM_NAME
1767
1768        self._update_token_timestamp()
1769        self.set_committed(True)
1770
1771    @synchronized
1772    def notify(
1773            self,
1774            event: ChangeType,
1775            collection: 'RichCollectionBase',
1776            name: str,
1777            item: CollectionItem,
1778    ) -> None:
1779        if self._callback:
1780            self._callback(event, collection, name, item)

Read and manipulate an Arvados collection

This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

Collection( manifest_locator_or_text: Optional[str] = None, api_client: Optional[arvados.api_resources.ArvadosAPIClient] = None, keep_client: Optional[arvados.keep.KeepClient] = None, num_retries: int = 10, parent: Optional[Collection] = None, apiconfig: Optional[Mapping[str, str]] = None, block_manager: Optional[arvados.arvfile._BlockManager] = None, replication_desired: Optional[int] = None, storage_classes_desired: Optional[List[str]] = None, put_threads: Optional[int] = None)
1041    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1042                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1043                 keep_client: Optional['arvados.keep.KeepClient']=None,
1044                 num_retries: int=10,
1045                 parent: Optional['Collection']=None,
1046                 apiconfig: Optional[Mapping[str, str]]=None,
1047                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1048                 replication_desired: Optional[int]=None,
1049                 storage_classes_desired: Optional[List[str]]=None,
1050                 put_threads: Optional[int]=None):
1051        """Initialize a Collection object
1052
1053        Arguments:
1054
1055        * manifest_locator_or_text: str | None --- This string can contain a
1056          collection manifest text, portable data hash, or UUID. When given a
1057          portable data hash or UUID, this instance will load a collection
1058          record from the API server. Otherwise, this instance will represent a
1059          new collection without an API server record. The default value `None`
1060          instantiates a new collection with an empty manifest.
1061
1062        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1063          Arvados API client object this instance uses to make requests. If
1064          none is given, this instance creates its own client using the
1065          settings from `apiconfig` (see below). If your client instantiates
1066          many Collection objects, you can help limit memory utilization by
1067          calling `arvados.api.api` to construct an
1068          `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1069          for every Collection.
1070
1071        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1072          object this instance uses to make requests. If none is given, this
1073          instance creates its own client using its `api_client`.
1074
1075        * num_retries: int --- The number of times that client requests are
1076          retried. Default 10.
1077
1078        * parent: arvados.collection.Collection | None --- The parent Collection
1079          object of this instance, if any. This argument is primarily used by
1080          other Collection methods; user client code shouldn't need to use it.
1081
1082        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1083          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1084          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1085          Collection object constructs one from these settings. If no
1086          mapping is provided, calls `arvados.config.settings` to get these
1087          parameters from user configuration.
1088
1089        * block_manager: arvados.arvfile._BlockManager | None --- The
1090          _BlockManager object used by this instance to coordinate reading
1091          and writing Keep data blocks. If none is given, this instance
1092          constructs its own. This argument is primarily used by other
1093          Collection methods; user client code shouldn't need to use it.
1094
1095        * replication_desired: int | None --- This controls both the value of
1096          the `replication_desired` field on API collection records saved by
1097          this class, as well as the number of Keep services that the object
1098          writes new data blocks to. If none is given, uses the default value
1099          configured for the cluster.
1100
1101        * storage_classes_desired: list[str] | None --- This controls both
1102          the value of the `storage_classes_desired` field on API collection
1103          records saved by this class, as well as selecting which specific
1104          Keep services the object writes new data blocks to. If none is
1105          given, defaults to an empty list.
1106
1107        * put_threads: int | None --- The number of threads to run
1108          simultaneously to upload data blocks to Keep. This value is used when
1109          building a new `block_manager`. It is unused when a `block_manager`
1110          is provided.
1111        """
1112
1113        if storage_classes_desired and type(storage_classes_desired) is not list:
1114            raise errors.ArgumentError("storage_classes_desired must be list type.")
1115
1116        super(Collection, self).__init__(parent)
1117        self._api_client = api_client
1118        self._keep_client = keep_client
1119
1120        # Use the keep client from ThreadSafeAPIClient
1121        if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1122            self._keep_client = self._api_client.keep
1123
1124        self._block_manager = block_manager
1125        self.replication_desired = replication_desired
1126        self._storage_classes_desired = storage_classes_desired
1127        self.put_threads = put_threads
1128
1129        if apiconfig:
1130            self._config = apiconfig
1131        else:
1132            self._config = config.settings()
1133
1134        self.num_retries = num_retries
1135        self._manifest_locator = None
1136        self._manifest_text = None
1137        self._portable_data_hash = None
1138        self._api_response = None
1139        self._token_refresh_timestamp = 0
1140
1141        self.lock = threading.RLock()
1142        self.events = None
1143
1144        if manifest_locator_or_text:
1145            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1146                self._manifest_locator = manifest_locator_or_text
1147            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1148                self._manifest_locator = manifest_locator_or_text
1149                if not self._has_local_collection_uuid():
1150                    self._has_remote_blocks = True
1151            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1152                self._manifest_text = manifest_locator_or_text
1153                if '+R' in self._manifest_text:
1154                    self._has_remote_blocks = True
1155            else:
1156                raise errors.ArgumentError(
1157                    "Argument to CollectionReader is not a manifest or a collection UUID")
1158
1159            try:
1160                self._populate()
1161            except errors.SyntaxError as e:
1162                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.api.ThreadSafeAPIClient, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

replication_desired
put_threads
num_retries
lock
events
def storage_classes_desired(self) -> List[str]:
1164    def storage_classes_desired(self) -> List[str]:
1165        """Get this collection's `storage_classes_desired` value"""
1166        return self._storage_classes_desired or []

Get this collection’s storage_classes_desired value

def root_collection(self) -> Collection:
1168    def root_collection(self) -> 'Collection':
1169        return self

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def get_properties(self) -> Dict[str, Any]:
1171    def get_properties(self) -> Properties:
1172        """Get this collection's properties
1173
1174        This method always returns a dict. If this collection object does not
1175        have an associated API record, or that record does not have any
1176        properties set, this method returns an empty dict.
1177        """
1178        if self._api_response and self._api_response["properties"]:
1179            return self._api_response["properties"]
1180        else:
1181            return {}

Get this collection’s properties

This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.

def get_trash_at(self) -> Optional[datetime.datetime]:
1183    def get_trash_at(self) -> Optional[datetime.datetime]:
1184        """Get this collection's `trash_at` field
1185
1186        This method parses the `trash_at` field of the collection's API
1187        record and returns a datetime from it. If that field is not set, or
1188        this collection object does not have an associated API record,
1189        returns None.
1190        """
1191        if self._api_response and self._api_response["trash_at"]:
1192            try:
1193                return ciso8601.parse_datetime(self._api_response["trash_at"])
1194            except ValueError:
1195                return None
1196        else:
1197            return None

Get this collection’s trash_at field

This method parses the trash_at field of the collection’s API record and returns a datetime from it. If that field is not set, or this collection object does not have an associated API record, returns None.

def stream_name(self) -> str:
1199    def stream_name(self) -> str:
1200        return "."

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

def writable(self) -> bool:
1202    def writable(self) -> bool:
1203        return True

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

@synchronized
@retry_method
def update( self, other: Optional[Collection] = None, num_retries: Optional[int] = None) -> None:
1205    @synchronized
1206    @retry_method
1207    def update(
1208            self,
1209            other: Optional['Collection']=None,
1210            num_retries: Optional[int]=None,
1211    ) -> None:
1212        """Merge another collection's contents into this one
1213
1214        This method compares the manifest of this collection instance with
1215        another, then updates this instance's manifest with changes from the
1216        other, renaming files to flag conflicts where necessary.
1217
1218        When called without any arguments, this method reloads the collection's
1219        API record, and updates this instance with any changes that have
1220        appeared server-side. If this instance does not have a corresponding
1221        API record, this method raises `arvados.errors.ArgumentError`.
1222
1223        Arguments:
1224
1225        * other: arvados.collection.Collection | None --- The collection
1226          whose contents should be merged into this instance. When not
1227          provided, this method reloads this collection's API record and
1228          constructs a Collection object from it.  If this instance does not
1229          have a corresponding API record, this method raises
1230          `arvados.errors.ArgumentError`.
1231
1232        * num_retries: int | None --- The number of times to retry reloading
1233          the collection's API record from the API server. If not specified,
1234          uses the `num_retries` provided when this instance was constructed.
1235        """
1236
1237        token_refresh_period = 60*60
1238        time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp)
1239        upstream_response = None
1240
1241        if other is None:
1242            if self._manifest_locator is None:
1243                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1244
1245            if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period:
1246                return
1247
1248            upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249            other = CollectionReader(upstream_response["manifest_text"])
1250
1251        if self.committed():
1252            # 1st case, no local changes, content is the same
1253            if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period:
1254                # No difference in content.  Remember the API record
1255                # (metadata such as name or properties may have changed)
1256                # but don't update the token refresh timestamp.
1257                if upstream_response is not None:
1258                    self._remember_api_response(upstream_response)
1259                return
1260
1261            # 2nd case, no local changes, but either upstream changed
1262            # or we want to refresh tokens.
1263
1264            self.apply(self.diff(other))
1265            if upstream_response is not None:
1266                self._remember_api_response(upstream_response)
1267            self._update_token_timestamp()
1268            self.set_committed(True)
1269            return
1270
1271        # 3rd case, upstream changed, but we also have uncommitted
1272        # changes that we want to incorporate so they don't get lost.
1273
1274        # _manifest_text stores the text from last time we received a
1275        # record from the API server.  This is the state of the
1276        # collection before our uncommitted changes.
1277        baseline = Collection(self._manifest_text)
1278
1279        # Get the set of changes between our baseline and the other
1280        # collection and apply them to self.
1281        #
1282        # If a file was modified in both 'self' and 'other', the
1283        # 'apply' method keeps the contents of 'self' and creates a
1284        # conflict file with the contents of 'other'.
1285        self.apply(baseline.diff(other))
1286
1287        # Remember the new baseline, changes to a file
1288        if upstream_response is not None:
1289            self._remember_api_response(upstream_response)

Merge another collection’s contents into this one

This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.

When called without any arguments, this method reloads the collection’s API record, and updates this instance with any changes that have appeared server-side. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

Arguments:

  • other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

def stop_threads(self) -> None:
1381    def stop_threads(self) -> None:
1382        """Stop background Keep upload/download threads"""
1383        if self._block_manager is not None:
1384            self._block_manager.stop_threads()

Stop background Keep upload/download threads

@synchronized
def manifest_locator(self) -> Optional[str]:
1386    @synchronized
1387    def manifest_locator(self) -> Optional[str]:
1388        """Get this collection's manifest locator, if any
1389
1390        * If this collection instance is associated with an API record with a
1391          UUID, return that.
1392        * Otherwise, if this collection instance was loaded from an API record
1393          by portable data hash, return that.
1394        * Otherwise, return `None`.
1395        """
1396        return self._manifest_locator

Get this collection’s manifest locator, if any

  • If this collection instance is associated with an API record with a UUID, return that.
  • Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
  • Otherwise, return None.
@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None, readonly: bool = False, new_config: Optional[Mapping[str, str]] = None) -> Collection:
1398    @synchronized
1399    def clone(
1400            self,
1401            new_parent: Optional['Collection']=None,
1402            new_name: Optional[str]=None,
1403            readonly: bool=False,
1404            new_config: Optional[Mapping[str, str]]=None,
1405    ) -> 'Collection':
1406        """Create a Collection object with the same contents as this instance
1407
1408        This method creates a new Collection object with contents that match
1409        this instance's. The new collection will not be associated with any API
1410        record.
1411
1412        Arguments:
1413
1414        * new_parent: arvados.collection.Collection | None --- This value is
1415          passed to the new Collection's constructor as the `parent`
1416          argument.
1417
1418        * new_name: str | None --- This value is unused.
1419
1420        * readonly: bool --- If this value is true, this method constructs and
1421          returns a `CollectionReader`. Otherwise, it returns a mutable
1422          `Collection`. Default `False`.
1423
1424        * new_config: Mapping[str, str] | None --- This value is passed to the
1425          new Collection's constructor as `apiconfig`. If no value is provided,
1426          defaults to the configuration passed to this instance's constructor.
1427        """
1428        if new_config is None:
1429            new_config = self._config
1430        if readonly:
1431            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1432        else:
1433            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1434
1435        newcollection._clonefrom(self)
1436        return newcollection

Create a Collection object with the same contents as this instance

This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.

Arguments:

  • new_parent: Collection | None — This value is passed to the new Collection’s constructor as the parent argument.

  • new_name: str | None — This value is unused.

  • readonly: bool — If this value is true, this method constructs and returns a CollectionReader. Otherwise, it returns a mutable Collection. Default False.

  • new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as apiconfig. If no value is provided, defaults to the configuration passed to this instance’s constructor.

@synchronized
def api_response(self) -> Optional[Dict[str, Any]]:
1438    @synchronized
1439    def api_response(self) -> Optional[Dict[str, Any]]:
1440        """Get this instance's associated API record
1441
1442        If this Collection instance has an associated API record, return it.
1443        Otherwise, return `None`.
1444        """
1445        return self._api_response

Get this instance’s associated API record

If this Collection instance has an associated API record, return it. Otherwise, return None.

def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1447    def find_or_create(
1448            self,
1449            path: str,
1450            create_type: CreateType,
1451    ) -> CollectionItem:
1452        if path == ".":
1453            return self
1454        else:
1455            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1457    def find(self, path: str) -> CollectionItem:
1458        if path == ".":
1459            return self
1460        else:
1461            return super(Collection, self).find(path[2:] if path.startswith("./") else path)

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
def remove(self, path: str, recursive: bool = False) -> None:
1463    def remove(self, path: str, recursive: bool=False) -> None:
1464        if path == ".":
1465            raise errors.ArgumentError("Cannot remove '.'")
1466        else:
1467            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

@must_be_writable
@synchronized
@retry_method
def save( self, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, merge: bool = True, num_retries: Optional[int] = None, preserve_version: bool = False) -> str:
1469    @must_be_writable
1470    @synchronized
1471    @retry_method
1472    def save(
1473            self,
1474            properties: Optional[Properties]=None,
1475            storage_classes: Optional[StorageClasses]=None,
1476            trash_at: Optional[datetime.datetime]=None,
1477            merge: bool=True,
1478            num_retries: Optional[int]=None,
1479            preserve_version: bool=False,
1480    ) -> str:
1481        """Save collection to an existing API record
1482
1483        This method updates the instance's corresponding API record to match
1484        the instance's state. If this instance does not have a corresponding API
1485        record yet, raises `AssertionError`. (To create a new API record, use
1486        `Collection.save_new`.) This method returns the saved collection
1487        manifest.
1488
1489        Arguments:
1490
1491        * properties: dict[str, Any] | None --- If provided, the API record will
1492          be updated with these properties. Note this will completely replace
1493          any existing properties.
1494
1495        * storage_classes: list[str] | None --- If provided, the API record will
1496          be updated with this value in the `storage_classes_desired` field.
1497          This value will also be saved on the instance and used for any
1498          changes that follow.
1499
1500        * trash_at: datetime.datetime | None --- If provided, the API record
1501          will be updated with this value in the `trash_at` field.
1502
1503        * merge: bool --- If `True` (the default), this method will first
1504          reload this collection's API record, and merge any new contents into
1505          this instance before saving changes. See `Collection.update` for
1506          details.
1507
1508        * num_retries: int | None --- The number of times to retry reloading
1509          the collection's API record from the API server. If not specified,
1510          uses the `num_retries` provided when this instance was constructed.
1511
1512        * preserve_version: bool --- This value will be passed to directly
1513          to the underlying API call. If `True`, the Arvados API will
1514          preserve the versions of this collection both immediately before
1515          and after the update. If `True` when the API server is not
1516          configured with collection versioning, this method raises
1517          `arvados.errors.ArgumentError`.
1518        """
1519        if properties and type(properties) is not dict:
1520            raise errors.ArgumentError("properties must be dictionary type.")
1521
1522        if storage_classes and type(storage_classes) is not list:
1523            raise errors.ArgumentError("storage_classes must be list type.")
1524        if storage_classes:
1525            self._storage_classes_desired = storage_classes
1526
1527        if trash_at and type(trash_at) is not datetime.datetime:
1528            raise errors.ArgumentError("trash_at must be datetime type.")
1529
1530        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1531            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1532
1533        body={}
1534        if properties:
1535            body["properties"] = properties
1536        if self.storage_classes_desired():
1537            body["storage_classes_desired"] = self.storage_classes_desired()
1538        if trash_at:
1539            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1540            body["trash_at"] = t
1541        if preserve_version:
1542            body["preserve_version"] = preserve_version
1543
1544        if not self.committed():
1545            if self._has_remote_blocks:
1546                # Copy any remote blocks to the local cluster.
1547                self._copy_remote_blocks(remote_blocks={})
1548                self._has_remote_blocks = False
1549            if not self._has_collection_uuid():
1550                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1551            elif not self._has_local_collection_uuid():
1552                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1553
1554            self._my_block_manager().commit_all()
1555
1556            if merge:
1557                self.update()
1558
1559            text = self.manifest_text(strip=False)
1560            body['manifest_text'] = text
1561
1562            self._remember_api_response(self._my_api().collections().update(
1563                uuid=self._manifest_locator,
1564                body=body
1565                ).execute(num_retries=num_retries))
1566            self.set_committed(True)
1567        elif body:
1568            self._remember_api_response(self._my_api().collections().update(
1569                uuid=self._manifest_locator,
1570                body=body
1571                ).execute(num_retries=num_retries))
1572
1573        return self._manifest_text

Save collection to an existing API record

This method updates the instance’s corresponding API record to match the instance’s state. If this instance does not have a corresponding API record yet, raises AssertionError. (To create a new API record, use Collection.save_new.) This method returns the saved collection manifest.

Arguments:

  • properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.

  • storage_classes: list[str] | None — If provided, the API record will be updated with this value in the storage_classes_desired field. This value will also be saved on the instance and used for any changes that follow.

  • trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the trash_at field.

  • merge: bool — If True (the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. See Collection.update for details.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@must_be_writable
@synchronized
@retry_method
def save_new( self, name: Optional[str] = None, create_collection_record: bool = True, owner_uuid: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, ensure_unique_name: bool = False, num_retries: Optional[int] = None, preserve_version: bool = False):
1576    @must_be_writable
1577    @synchronized
1578    @retry_method
1579    def save_new(
1580            self,
1581            name: Optional[str]=None,
1582            create_collection_record: bool=True,
1583            owner_uuid: Optional[str]=None,
1584            properties: Optional[Properties]=None,
1585            storage_classes: Optional[StorageClasses]=None,
1586            trash_at: Optional[datetime.datetime]=None,
1587            ensure_unique_name: bool=False,
1588            num_retries: Optional[int]=None,
1589            preserve_version: bool=False,
1590    ):
1591        """Save collection to a new API record
1592
1593        This method finishes uploading new data blocks and (optionally)
1594        creates a new API collection record with the provided data. If a new
1595        record is created, this instance becomes associated with that record
1596        for future updates like `save()`. This method returns the saved
1597        collection manifest.
1598
1599        Arguments:
1600
1601        * name: str | None --- The `name` field to use on the new collection
1602          record. If not specified, a generic default name is generated.
1603
1604        * create_collection_record: bool --- If `True` (the default), creates a
1605          collection record on the API server. If `False`, the method finishes
1606          all data uploads and only returns the resulting collection manifest
1607          without sending it to the API server.
1608
1609        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1610          new collection record.
1611
1612        * properties: dict[str, Any] | None --- The `properties` field to use on
1613          the new collection record.
1614
1615        * storage_classes: list[str] | None --- The
1616          `storage_classes_desired` field to use on the new collection record.
1617
1618        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1619          on the new collection record.
1620
1621        * ensure_unique_name: bool --- This value is passed directly to the
1622          Arvados API when creating the collection record. If `True`, the API
1623          server may modify the submitted `name` to ensure the collection's
1624          `name`+`owner_uuid` combination is unique. If `False` (the default),
1625          if a collection already exists with this same `name`+`owner_uuid`
1626          combination, creating a collection record will raise a validation
1627          error.
1628
1629        * num_retries: int | None --- The number of times to retry reloading
1630          the collection's API record from the API server. If not specified,
1631          uses the `num_retries` provided when this instance was constructed.
1632
1633        * preserve_version: bool --- This value will be passed to directly
1634          to the underlying API call. If `True`, the Arvados API will
1635          preserve the versions of this collection both immediately before
1636          and after the update. If `True` when the API server is not
1637          configured with collection versioning, this method raises
1638          `arvados.errors.ArgumentError`.
1639        """
1640        if properties and type(properties) is not dict:
1641            raise errors.ArgumentError("properties must be dictionary type.")
1642
1643        if storage_classes and type(storage_classes) is not list:
1644            raise errors.ArgumentError("storage_classes must be list type.")
1645
1646        if trash_at and type(trash_at) is not datetime.datetime:
1647            raise errors.ArgumentError("trash_at must be datetime type.")
1648
1649        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1650            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1651
1652        if self._has_remote_blocks:
1653            # Copy any remote blocks to the local cluster.
1654            self._copy_remote_blocks(remote_blocks={})
1655            self._has_remote_blocks = False
1656
1657        if storage_classes:
1658            self._storage_classes_desired = storage_classes
1659
1660        self._my_block_manager().commit_all()
1661        text = self.manifest_text(strip=False)
1662
1663        if create_collection_record:
1664            if name is None:
1665                name = "New collection"
1666                ensure_unique_name = True
1667
1668            body = {"manifest_text": text,
1669                    "name": name,
1670                    "replication_desired": self.replication_desired}
1671            if owner_uuid:
1672                body["owner_uuid"] = owner_uuid
1673            if properties:
1674                body["properties"] = properties
1675            if self.storage_classes_desired():
1676                body["storage_classes_desired"] = self.storage_classes_desired()
1677            if trash_at:
1678                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1679                body["trash_at"] = t
1680            if preserve_version:
1681                body["preserve_version"] = preserve_version
1682
1683            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1684            self._manifest_locator = self._api_response["uuid"]
1685            self.set_committed(True)
1686
1687        return text

Save collection to a new API record

This method finishes uploading new data blocks and (optionally) creates a new API collection record with the provided data. If a new record is created, this instance becomes associated with that record for future updates like save(). This method returns the saved collection manifest.

Arguments:

  • name: str | None — The name field to use on the new collection record. If not specified, a generic default name is generated.

  • create_collection_record: bool — If True (the default), creates a collection record on the API server. If False, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.

  • owner_uuid: str | None — The owner_uuid field to use on the new collection record.

  • properties: dict[str, Any] | None — The properties field to use on the new collection record.

  • storage_classes: list[str] | None — The storage_classes_desired field to use on the new collection record.

  • trash_at: datetime.datetime | None — The trash_at field to use on the new collection record.

  • ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If True, the API server may modify the submitted name to ensure the collection’s name+owner_uuid combination is unique. If False (the default), if a collection already exists with this same name+owner_uuid combination, creating a collection record will raise a validation error.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
1771    @synchronized
1772    def notify(
1773            self,
1774            event: ChangeType,
1775            collection: 'RichCollectionBase',
1776            name: str,
1777            item: CollectionItem,
1778    ) -> None:
1779        if self._callback:
1780            self._callback(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — For ADD events, the new contents at name within collection; for DEL events, the item that was removed. For MOD and TOK events, a 2-tuple of the previous item and the new item (may be the same object or different, depending on whether the action involved it being modified in place or replaced).

class Subcollection(RichCollectionBase):
1783class Subcollection(RichCollectionBase):
1784    """Read and manipulate a stream/directory within an Arvados collection
1785
1786    This class represents a single stream (like a directory) within an Arvados
1787    `Collection`. It is returned by `Collection.find` and provides the same API.
1788    Operations that work on the API collection record propagate to the parent
1789    `Collection` object.
1790    """
1791
1792    def __init__(self, parent, name):
1793        super(Subcollection, self).__init__(parent)
1794        self.lock = self.root_collection().lock
1795        self._manifest_text = None
1796        self.name = name
1797        self.num_retries = parent.num_retries
1798
1799    def root_collection(self) -> 'Collection':
1800        return self.parent.root_collection()
1801
1802    def writable(self) -> bool:
1803        return self.root_collection().writable()
1804
1805    def _my_api(self):
1806        return self.root_collection()._my_api()
1807
1808    def _my_keep(self):
1809        return self.root_collection()._my_keep()
1810
1811    def _my_block_manager(self):
1812        return self.root_collection()._my_block_manager()
1813
1814    def stream_name(self) -> str:
1815        return os.path.join(self.parent.stream_name(), self.name)
1816
1817    @synchronized
1818    def clone(
1819            self,
1820            new_parent: Optional['Collection']=None,
1821            new_name: Optional[str]=None,
1822    ) -> 'Subcollection':
1823        c = Subcollection(new_parent, new_name)
1824        c._clonefrom(self)
1825        return c
1826
1827    @must_be_writable
1828    @synchronized
1829    def _reparent(self, newparent, newname):
1830        self.set_committed(False)
1831        self.flush()
1832        self.parent.remove(self.name, recursive=True)
1833        self.parent = newparent
1834        self.name = newname
1835        self.lock = self.parent.root_collection().lock
1836
1837    @synchronized
1838    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1839        """Encode empty directories by using an \056-named (".") empty file"""
1840        if len(self._items) == 0:
1841            return "%s %s 0:0:\\056\n" % (
1842                streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1843        return super(Subcollection, self)._get_manifest_text(stream_name,
1844                                                             strip, normalize,
1845                                                             only_committed)

Read and manipulate a stream/directory within an Arvados collection

This class represents a single stream (like a directory) within an Arvados Collection. It is returned by Collection.find and provides the same API. Operations that work on the API collection record propagate to the parent Collection object.

Subcollection(parent, name)
1792    def __init__(self, parent, name):
1793        super(Subcollection, self).__init__(parent)
1794        self.lock = self.root_collection().lock
1795        self._manifest_text = None
1796        self.name = name
1797        self.num_retries = parent.num_retries
lock
name
num_retries
def root_collection(self) -> Collection:
1799    def root_collection(self) -> 'Collection':
1800        return self.parent.root_collection()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def writable(self) -> bool:
1802    def writable(self) -> bool:
1803        return self.root_collection().writable()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def stream_name(self) -> str:
1814    def stream_name(self) -> str:
1815        return os.path.join(self.parent.stream_name(), self.name)

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None) -> Subcollection:
1817    @synchronized
1818    def clone(
1819            self,
1820            new_parent: Optional['Collection']=None,
1821            new_name: Optional[str]=None,
1822    ) -> 'Subcollection':
1823        c = Subcollection(new_parent, new_name)
1824        c._clonefrom(self)
1825        return c
class CollectionReader(Collection):
1848class CollectionReader(Collection):
1849    """Read-only `Collection` subclass
1850
1851    This class will never create or update any API collection records. You can
1852    use this class for additional code safety when you only need to read
1853    existing collections.
1854    """
1855    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1856        self._in_init = True
1857        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1858        self._in_init = False
1859
1860        # Forego any locking since it should never change once initialized.
1861        self.lock = NoopLock()
1862
1863        # Backwards compatability with old CollectionReader
1864        # all_streams() and all_files()
1865        self._streams = None
1866
1867    def writable(self) -> bool:
1868        return self._in_init

Read-only Collection subclass

This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.

CollectionReader(manifest_locator_or_text, *args, **kwargs)
1855    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1856        self._in_init = True
1857        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1858        self._in_init = False
1859
1860        # Forego any locking since it should never change once initialized.
1861        self.lock = NoopLock()
1862
1863        # Backwards compatability with old CollectionReader
1864        # all_streams() and all_files()
1865        self._streams = None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.api.ThreadSafeAPIClient, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

lock
def writable(self) -> bool:
1867    def writable(self) -> bool:
1868        return self._in_init

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.