arvados.collection

Tools to work with Arvados collections

This module provides high-level interfaces to create, read, and update Arvados collections. Most users will want to instantiate Collection objects, and use methods like Collection.open and Collection.mkdirs to read and write data in the collection. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4"""Tools to work with Arvados collections
   5
   6This module provides high-level interfaces to create, read, and update
   7Arvados collections. Most users will want to instantiate `Collection`
   8objects, and use methods like `Collection.open` and `Collection.mkdirs` to
   9read and write data in the collection. Refer to the Arvados Python SDK
  10cookbook for [an introduction to using the Collection class][cookbook].
  11
  12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
  13"""
  14
  15import ciso8601
  16import datetime
  17import errno
  18import functools
  19import hashlib
  20import io
  21import logging
  22import os
  23import re
  24import sys
  25import threading
  26import time
  27
  28from collections import deque
  29from stat import *
  30
  31from ._internal import streams
  32from .api import ThreadSafeAPIClient
  33from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
  34from .keep import KeepLocator, KeepClient
  35import arvados.config as config
  36import arvados.errors as errors
  37import arvados.util
  38import arvados.events as events
  39from arvados.retry import retry_method
  40
  41from typing import (
  42    Any,
  43    Callable,
  44    Dict,
  45    IO,
  46    Iterator,
  47    List,
  48    Mapping,
  49    Optional,
  50    Tuple,
  51    Union,
  52)
  53
  54if sys.version_info < (3, 8):
  55    from typing_extensions import Literal
  56else:
  57    from typing import Literal
  58
  59_logger = logging.getLogger('arvados.collection')
  60
  61ADD = "add"
  62"""Argument value for `Collection` methods to represent an added item"""
  63DEL = "del"
  64"""Argument value for `Collection` methods to represent a removed item"""
  65MOD = "mod"
  66"""Argument value for `Collection` methods to represent a modified item"""
  67TOK = "tok"
  68"""Argument value for `Collection` methods to represent an item with token differences"""
  69FILE = "file"
  70"""`create_type` value for `Collection.find_or_create`"""
  71COLLECTION = "collection"
  72"""`create_type` value for `Collection.find_or_create`"""
  73
  74ChangeList = List[Union[
  75    Tuple[Literal[ADD, DEL], str, 'Collection'],
  76    Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
  77]]
  78ChangeType = Literal[ADD, DEL, MOD, TOK]
  79CollectionItem = Union[ArvadosFile, 'Collection']
  80ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
  81CreateType = Literal[COLLECTION, FILE]
  82Properties = Dict[str, Any]
  83StorageClasses = List[str]
  84
  85class CollectionBase(object):
  86    """Abstract base class for Collection classes
  87
  88    .. ATTENTION:: Internal
  89       This class is meant to be used by other parts of the SDK. User code
  90       should instantiate or subclass `Collection` or one of its subclasses
  91       directly.
  92    """
  93
  94    def __enter__(self):
  95        """Enter a context block with this collection instance"""
  96        return self
  97
  98    def __exit__(self, exc_type, exc_value, traceback):
  99        """Exit a context block with this collection instance"""
 100        pass
 101
 102    def _my_keep(self):
 103        if self._keep_client is None:
 104            self._keep_client = KeepClient(api_client=self._api_client,
 105                                           num_retries=self.num_retries)
 106        return self._keep_client
 107
 108    def stripped_manifest(self) -> str:
 109        """Create a copy of the collection manifest with only size hints
 110
 111        This method returns a string with the current collection's manifest
 112        text with all non-portable locator hints like permission hints and
 113        remote cluster hints removed. The only hints in the returned manifest
 114        will be size hints.
 115        """
 116        raw = self.manifest_text()
 117        clean = []
 118        for line in raw.split("\n"):
 119            fields = line.split()
 120            if fields:
 121                clean_fields = fields[:1] + [
 122                    (re.sub(r'\+[^\d][^\+]*', '', x)
 123                     if re.match(arvados.util.keep_locator_pattern, x)
 124                     else x)
 125                    for x in fields[1:]]
 126                clean += [' '.join(clean_fields), "\n"]
 127        return ''.join(clean)
 128
 129
 130class _WriterFile(_FileLikeObjectBase):
 131    def __init__(self, coll_writer, name):
 132        super(_WriterFile, self).__init__(name, 'wb')
 133        self.dest = coll_writer
 134
 135    def close(self):
 136        super(_WriterFile, self).close()
 137        self.dest.finish_current_file()
 138
 139    @_FileLikeObjectBase._before_close
 140    def write(self, data):
 141        self.dest.write(data)
 142
 143    @_FileLikeObjectBase._before_close
 144    def writelines(self, seq):
 145        for data in seq:
 146            self.write(data)
 147
 148    @_FileLikeObjectBase._before_close
 149    def flush(self):
 150        self.dest.flush_data()
 151
 152
 153class RichCollectionBase(CollectionBase):
 154    """Base class for Collection classes
 155
 156    .. ATTENTION:: Internal
 157       This class is meant to be used by other parts of the SDK. User code
 158       should instantiate or subclass `Collection` or one of its subclasses
 159       directly.
 160    """
 161
 162    def __init__(self, parent=None):
 163        self.parent = parent
 164        self._committed = False
 165        self._has_remote_blocks = False
 166        self._callback = None
 167        self._items = {}
 168
 169    def _my_api(self):
 170        raise NotImplementedError()
 171
 172    def _my_keep(self):
 173        raise NotImplementedError()
 174
 175    def _my_block_manager(self):
 176        raise NotImplementedError()
 177
 178    def writable(self) -> bool:
 179        """Indicate whether this collection object can be modified
 180
 181        This method returns `False` if this object is a `CollectionReader`,
 182        else `True`.
 183        """
 184        raise NotImplementedError()
 185
 186    def root_collection(self) -> 'Collection':
 187        """Get this collection's root collection object
 188
 189        If you open a subcollection with `Collection.find`, calling this method
 190        on that subcollection returns the source Collection object.
 191        """
 192        raise NotImplementedError()
 193
 194    def stream_name(self) -> str:
 195        """Get the name of the manifest stream represented by this collection
 196
 197        If you open a subcollection with `Collection.find`, calling this method
 198        on that subcollection returns the name of the stream you opened.
 199        """
 200        raise NotImplementedError()
 201
 202    @synchronized
 203    def has_remote_blocks(self) -> bool:
 204        """Indiciate whether the collection refers to remote data
 205
 206        Returns `True` if the collection manifest includes any Keep locators
 207        with a remote hint (`+R`), else `False`.
 208        """
 209        if self._has_remote_blocks:
 210            return True
 211        for item in self:
 212            if self[item].has_remote_blocks():
 213                return True
 214        return False
 215
 216    @synchronized
 217    def set_has_remote_blocks(self, val: bool) -> None:
 218        """Cache whether this collection refers to remote blocks
 219
 220        .. ATTENTION:: Internal
 221           This method is only meant to be used by other Collection methods.
 222
 223        Set this collection's cached "has remote blocks" flag to the given
 224        value.
 225        """
 226        self._has_remote_blocks = val
 227        if self.parent:
 228            self.parent.set_has_remote_blocks(val)
 229
 230    @must_be_writable
 231    @synchronized
 232    def find_or_create(
 233            self,
 234            path: str,
 235            create_type: CreateType,
 236    ) -> CollectionItem:
 237        """Get the item at the given path, creating it if necessary
 238
 239        If `path` refers to a stream in this collection, returns a
 240        corresponding `Subcollection` object. If `path` refers to a file in
 241        this collection, returns a corresponding
 242        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 243        this collection, then this method creates a new object and returns
 244        it, creating parent streams as needed. The type of object created is
 245        determined by the value of `create_type`.
 246
 247        Arguments:
 248
 249        * path: str --- The path to find or create within this collection.
 250
 251        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 252          create at `path` if one does not exist. Passing `COLLECTION`
 253          creates a stream and returns the corresponding
 254          `Subcollection`. Passing `FILE` creates a new file and returns the
 255          corresponding `arvados.arvfile.ArvadosFile`.
 256        """
 257        pathcomponents = path.split("/", 1)
 258        if pathcomponents[0]:
 259            item = self._items.get(pathcomponents[0])
 260            if len(pathcomponents) == 1:
 261                if item is None:
 262                    # create new file
 263                    if create_type == COLLECTION:
 264                        item = Subcollection(self, pathcomponents[0])
 265                    else:
 266                        item = ArvadosFile(self, pathcomponents[0])
 267                    self._items[pathcomponents[0]] = item
 268                    self.set_committed(False)
 269                    self.notify(ADD, self, pathcomponents[0], item)
 270                return item
 271            else:
 272                if item is None:
 273                    # create new collection
 274                    item = Subcollection(self, pathcomponents[0])
 275                    self._items[pathcomponents[0]] = item
 276                    self.set_committed(False)
 277                    self.notify(ADD, self, pathcomponents[0], item)
 278                if isinstance(item, RichCollectionBase):
 279                    return item.find_or_create(pathcomponents[1], create_type)
 280                else:
 281                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 282        else:
 283            return self
 284
 285    @synchronized
 286    def find(self, path: str) -> CollectionItem:
 287        """Get the item at the given path
 288
 289        If `path` refers to a stream in this collection, returns a
 290        corresponding `Subcollection` object. If `path` refers to a file in
 291        this collection, returns a corresponding
 292        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 293        this collection, then this method raises `NotADirectoryError`.
 294
 295        Arguments:
 296
 297        * path: str --- The path to find or create within this collection.
 298        """
 299        if not path:
 300            raise errors.ArgumentError("Parameter 'path' is empty.")
 301
 302        pathcomponents = path.split("/", 1)
 303        if pathcomponents[0] == '':
 304            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 305
 306        item = self._items.get(pathcomponents[0])
 307        if item is None:
 308            return None
 309        elif len(pathcomponents) == 1:
 310            return item
 311        else:
 312            if isinstance(item, RichCollectionBase):
 313                if pathcomponents[1]:
 314                    return item.find(pathcomponents[1])
 315                else:
 316                    return item
 317            else:
 318                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 319
 320    @synchronized
 321    def mkdirs(self, path: str) -> 'Subcollection':
 322        """Create and return a subcollection at `path`
 323
 324        If `path` exists within this collection, raises `FileExistsError`.
 325        Otherwise, creates a stream at that path and returns the
 326        corresponding `Subcollection`.
 327        """
 328        if self.find(path) != None:
 329            raise IOError(errno.EEXIST, "Directory or file exists", path)
 330
 331        return self.find_or_create(path, COLLECTION)
 332
 333    def open(
 334            self,
 335            path: str,
 336            mode: str="r",
 337            encoding: Optional[str]=None
 338    ) -> IO:
 339        """Open a file-like object within the collection
 340
 341        This method returns a file-like object that can read and/or write the
 342        file located at `path` within the collection. If you attempt to write
 343        a `path` that does not exist, the file is created with `find_or_create`.
 344        If the file cannot be opened for any other reason, this method raises
 345        `OSError` with an appropriate errno.
 346
 347        Arguments:
 348
 349        * path: str --- The path of the file to open within this collection
 350
 351        * mode: str --- The mode to open this file. Supports all the same
 352          values as `builtins.open`.
 353
 354        * encoding: str | None --- The text encoding of the file. Only used
 355          when the file is opened in text mode. The default is
 356          platform-dependent.
 357
 358        """
 359        if not re.search(r'^[rwa][bt]?\+?$', mode):
 360            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 361
 362        if mode[0] == 'r' and '+' not in mode:
 363            fclass = ArvadosFileReader
 364            arvfile = self.find(path)
 365        elif not self.writable():
 366            raise IOError(errno.EROFS, "Collection is read only")
 367        else:
 368            fclass = ArvadosFileWriter
 369            arvfile = self.find_or_create(path, FILE)
 370
 371        if arvfile is None:
 372            raise IOError(errno.ENOENT, "File not found", path)
 373        if not isinstance(arvfile, ArvadosFile):
 374            raise IOError(errno.EISDIR, "Is a directory", path)
 375
 376        if mode[0] == 'w':
 377            arvfile.truncate(0)
 378
 379        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 380        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 381        if 'b' not in mode:
 382            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 383            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 384        return f
 385
 386    def modified(self) -> bool:
 387        """Indicate whether this collection has an API server record
 388
 389        Returns `False` if this collection corresponds to a record loaded from
 390        the API server, `True` otherwise.
 391        """
 392        return not self.committed()
 393
 394    @synchronized
 395    def committed(self):
 396        """Indicate whether this collection has an API server record
 397
 398        Returns `True` if this collection corresponds to a record loaded from
 399        the API server, `False` otherwise.
 400        """
 401        return self._committed
 402
 403    @synchronized
 404    def set_committed(self, value: bool=True):
 405        """Cache whether this collection has an API server record
 406
 407        .. ATTENTION:: Internal
 408           This method is only meant to be used by other Collection methods.
 409
 410        Set this collection's cached "committed" flag to the given
 411        value and propagates it as needed.
 412        """
 413        if value == self._committed:
 414            return
 415        if value:
 416            for k,v in self._items.items():
 417                v.set_committed(True)
 418            self._committed = True
 419        else:
 420            self._committed = False
 421            if self.parent is not None:
 422                self.parent.set_committed(False)
 423
 424    @synchronized
 425    def __iter__(self) -> Iterator[str]:
 426        """Iterate names of streams and files in this collection
 427
 428        This method does not recurse. It only iterates the contents of this
 429        collection's corresponding stream.
 430        """
 431        return iter(self._items)
 432
 433    @synchronized
 434    def __getitem__(self, k: str) -> CollectionItem:
 435        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 436
 437        This method does not recurse. If you want to search a path, use
 438        `RichCollectionBase.find` instead.
 439        """
 440        return self._items[k]
 441
 442    @synchronized
 443    def __contains__(self, k: str) -> bool:
 444        """Indicate whether this collection has an item with this name
 445
 446        This method does not recurse. It you want to check a path, use
 447        `RichCollectionBase.exists` instead.
 448        """
 449        return k in self._items
 450
 451    @synchronized
 452    def __len__(self):
 453        """Get the number of items directly contained in this collection
 454
 455        This method does not recurse. It only counts the streams and files
 456        in this collection's corresponding stream.
 457        """
 458        return len(self._items)
 459
 460    @must_be_writable
 461    @synchronized
 462    def __delitem__(self, p: str) -> None:
 463        """Delete an item from this collection's stream
 464
 465        This method does not recurse. If you want to remove an item by a
 466        path, use `RichCollectionBase.remove` instead.
 467        """
 468        del self._items[p]
 469        self.set_committed(False)
 470        self.notify(DEL, self, p, None)
 471
 472    @synchronized
 473    def keys(self) -> Iterator[str]:
 474        """Iterate names of streams and files in this collection
 475
 476        This method does not recurse. It only iterates the contents of this
 477        collection's corresponding stream.
 478        """
 479        return self._items.keys()
 480
 481    @synchronized
 482    def values(self) -> List[CollectionItem]:
 483        """Get a list of objects in this collection's stream
 484
 485        The return value includes a `Subcollection` for every stream, and an
 486        `arvados.arvfile.ArvadosFile` for every file, directly within this
 487        collection's stream.  This method does not recurse.
 488        """
 489        return list(self._items.values())
 490
 491    @synchronized
 492    def items(self) -> List[Tuple[str, CollectionItem]]:
 493        """Get a list of `(name, object)` tuples from this collection's stream
 494
 495        The return value includes a `Subcollection` for every stream, and an
 496        `arvados.arvfile.ArvadosFile` for every file, directly within this
 497        collection's stream.  This method does not recurse.
 498        """
 499        return list(self._items.items())
 500
 501    def exists(self, path: str) -> bool:
 502        """Indicate whether this collection includes an item at `path`
 503
 504        This method returns `True` if `path` refers to a stream or file within
 505        this collection, else `False`.
 506
 507        Arguments:
 508
 509        * path: str --- The path to check for existence within this collection
 510        """
 511        return self.find(path) is not None
 512
 513    @must_be_writable
 514    @synchronized
 515    def remove(self, path: str, recursive: bool=False) -> None:
 516        """Remove the file or stream at `path`
 517
 518        Arguments:
 519
 520        * path: str --- The path of the item to remove from the collection
 521
 522        * recursive: bool --- Controls the method's behavior if `path` refers
 523          to a nonempty stream. If `False` (the default), this method raises
 524          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 525          items under the stream.
 526        """
 527        if not path:
 528            raise errors.ArgumentError("Parameter 'path' is empty.")
 529
 530        pathcomponents = path.split("/", 1)
 531        item = self._items.get(pathcomponents[0])
 532        if item is None:
 533            raise IOError(errno.ENOENT, "File not found", path)
 534        if len(pathcomponents) == 1:
 535            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 536                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 537            deleteditem = self._items[pathcomponents[0]]
 538            del self._items[pathcomponents[0]]
 539            self.set_committed(False)
 540            self.notify(DEL, self, pathcomponents[0], deleteditem)
 541        else:
 542            item.remove(pathcomponents[1], recursive=recursive)
 543
 544    def _clonefrom(self, source):
 545        for k,v in source.items():
 546            self._items[k] = v.clone(self, k)
 547
 548    def clone(self):
 549        raise NotImplementedError()
 550
 551    @must_be_writable
 552    @synchronized
 553    def add(
 554            self,
 555            source_obj: CollectionItem,
 556            target_name: str,
 557            overwrite: bool=False,
 558            reparent: bool=False,
 559    ) -> None:
 560        """Copy or move a file or subcollection object to this collection
 561
 562        Arguments:
 563
 564        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 565          to add to this collection
 566
 567        * target_name: str --- The path inside this collection where
 568          `source_obj` should be added.
 569
 570        * overwrite: bool --- Controls the behavior of this method when the
 571          collection already contains an object at `target_name`. If `False`
 572          (the default), this method will raise `FileExistsError`. If `True`,
 573          the object at `target_name` will be replaced with `source_obj`.
 574
 575        * reparent: bool --- Controls whether this method copies or moves
 576          `source_obj`. If `False` (the default), `source_obj` is copied into
 577          this collection. If `True`, `source_obj` is moved into this
 578          collection.
 579        """
 580        if target_name in self and not overwrite:
 581            raise IOError(errno.EEXIST, "File already exists", target_name)
 582
 583        modified_from = None
 584        if target_name in self:
 585            modified_from = self[target_name]
 586
 587        # Actually make the move or copy.
 588        if reparent:
 589            source_obj._reparent(self, target_name)
 590            item = source_obj
 591        else:
 592            item = source_obj.clone(self, target_name)
 593
 594        self._items[target_name] = item
 595        self.set_committed(False)
 596        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 597            self.set_has_remote_blocks(True)
 598
 599        if modified_from:
 600            self.notify(MOD, self, target_name, (modified_from, item))
 601        else:
 602            self.notify(ADD, self, target_name, item)
 603
 604    def _get_src_target(self, source, target_path, source_collection, create_dest):
 605        if source_collection is None:
 606            source_collection = self
 607
 608        # Find the object
 609        if isinstance(source, str):
 610            source_obj = source_collection.find(source)
 611            if source_obj is None:
 612                raise IOError(errno.ENOENT, "File not found", source)
 613            sourcecomponents = source.split("/")
 614        else:
 615            source_obj = source
 616            sourcecomponents = None
 617
 618        # Find parent collection the target path
 619        targetcomponents = target_path.split("/")
 620
 621        # Determine the name to use.
 622        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 623
 624        if not target_name:
 625            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 626
 627        if create_dest:
 628            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 629        else:
 630            if len(targetcomponents) > 1:
 631                target_dir = self.find("/".join(targetcomponents[0:-1]))
 632            else:
 633                target_dir = self
 634
 635        if target_dir is None:
 636            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 637
 638        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 639            target_dir = target_dir[target_name]
 640            target_name = sourcecomponents[-1]
 641
 642        return (source_obj, target_dir, target_name)
 643
 644    @must_be_writable
 645    @synchronized
 646    def copy(
 647            self,
 648            source: Union[str, CollectionItem],
 649            target_path: str,
 650            source_collection: Optional['RichCollectionBase']=None,
 651            overwrite: bool=False,
 652    ) -> None:
 653        """Copy a file or subcollection object to this collection
 654
 655        Arguments:
 656
 657        * source: str | arvados.arvfile.ArvadosFile |
 658          arvados.collection.Subcollection --- The file or subcollection to
 659          add to this collection. If `source` is a str, the object will be
 660          found by looking up this path from `source_collection` (see
 661          below).
 662
 663        * target_path: str --- The path inside this collection where the
 664          source object should be added.
 665
 666        * source_collection: arvados.collection.Collection | None --- The
 667          collection to find the source object from when `source` is a
 668          path. Defaults to the current collection (`self`).
 669
 670        * overwrite: bool --- Controls the behavior of this method when the
 671          collection already contains an object at `target_path`. If `False`
 672          (the default), this method will raise `FileExistsError`. If `True`,
 673          the object at `target_path` will be replaced with `source_obj`.
 674        """
 675        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 676        target_dir.add(source_obj, target_name, overwrite, False)
 677
 678    @must_be_writable
 679    @synchronized
 680    def rename(
 681            self,
 682            source: Union[str, CollectionItem],
 683            target_path: str,
 684            source_collection: Optional['RichCollectionBase']=None,
 685            overwrite: bool=False,
 686    ) -> None:
 687        """Move a file or subcollection object to this collection
 688
 689        Arguments:
 690
 691        * source: str | arvados.arvfile.ArvadosFile |
 692          arvados.collection.Subcollection --- The file or subcollection to
 693          add to this collection. If `source` is a str, the object will be
 694          found by looking up this path from `source_collection` (see
 695          below).
 696
 697        * target_path: str --- The path inside this collection where the
 698          source object should be added.
 699
 700        * source_collection: arvados.collection.Collection | None --- The
 701          collection to find the source object from when `source` is a
 702          path. Defaults to the current collection (`self`).
 703
 704        * overwrite: bool --- Controls the behavior of this method when the
 705          collection already contains an object at `target_path`. If `False`
 706          (the default), this method will raise `FileExistsError`. If `True`,
 707          the object at `target_path` will be replaced with `source_obj`.
 708        """
 709        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 710        if not source_obj.writable():
 711            raise IOError(errno.EROFS, "Source collection is read only", source)
 712        target_dir.add(source_obj, target_name, overwrite, True)
 713
 714    def portable_manifest_text(self, stream_name: str=".") -> str:
 715        """Get the portable manifest text for this collection
 716
 717        The portable manifest text is normalized, and does not include access
 718        tokens. This method does not flush outstanding blocks to Keep.
 719
 720        Arguments:
 721
 722        * stream_name: str --- The name to use for this collection's stream in
 723          the generated manifest. Default `'.'`.
 724        """
 725        return self._get_manifest_text(stream_name, True, True)
 726
 727    @synchronized
 728    def manifest_text(
 729            self,
 730            stream_name: str=".",
 731            strip: bool=False,
 732            normalize: bool=False,
 733            only_committed: bool=False,
 734    ) -> str:
 735        """Get the manifest text for this collection
 736
 737        Arguments:
 738
 739        * stream_name: str --- The name to use for this collection's stream in
 740          the generated manifest. Default `'.'`.
 741
 742        * strip: bool --- Controls whether or not the returned manifest text
 743          includes access tokens. If `False` (the default), the manifest text
 744          will include access tokens. If `True`, the manifest text will not
 745          include access tokens.
 746
 747        * normalize: bool --- Controls whether or not the returned manifest
 748          text is normalized. Default `False`.
 749
 750        * only_committed: bool --- Controls whether or not this method uploads
 751          pending data to Keep before building and returning the manifest text.
 752          If `False` (the default), this method will finish uploading all data
 753          to Keep, then return the final manifest. If `True`, this method will
 754          build and return a manifest that only refers to the data that has
 755          finished uploading at the time this method was called.
 756        """
 757        if not only_committed:
 758            self._my_block_manager().commit_all()
 759        return self._get_manifest_text(stream_name, strip, normalize,
 760                                       only_committed=only_committed)
 761
 762    @synchronized
 763    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 764        """Get the manifest text for this collection, sub collections and files.
 765
 766        :stream_name:
 767          Name to use for this stream (directory)
 768
 769        :strip:
 770          If True, remove signing tokens from block locators if present.
 771          If False (default), block locators are left unchanged.
 772
 773        :normalize:
 774          If True, always export the manifest text in normalized form
 775          even if the Collection is not modified.  If False (default) and the collection
 776          is not modified, return the original manifest text even if it is not
 777          in normalized form.
 778
 779        :only_committed:
 780          If True, only include blocks that were already committed to Keep.
 781
 782        """
 783
 784        if not self.committed() or self._manifest_text is None or normalize:
 785            stream = {}
 786            buf = []
 787            sorted_keys = sorted(self.keys())
 788            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 789                # Create a stream per file `k`
 790                arvfile = self[filename]
 791                filestream = []
 792                for segment in arvfile.segments():
 793                    loc = segment.locator
 794                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 795                        if only_committed:
 796                            continue
 797                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 798                    if strip:
 799                        loc = KeepLocator(loc).stripped()
 800                    filestream.append(streams.LocatorAndRange(
 801                        loc,
 802                        KeepLocator(loc).size,
 803                        segment.segment_offset,
 804                        segment.range_size,
 805                    ))
 806                stream[filename] = filestream
 807            if stream:
 808                buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
 809            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 810                buf.append(self[dirname].manifest_text(
 811                    stream_name=os.path.join(stream_name, dirname),
 812                    strip=strip, normalize=True, only_committed=only_committed))
 813            return "".join(buf)
 814        else:
 815            if strip:
 816                return self.stripped_manifest()
 817            else:
 818                return self._manifest_text
 819
 820    @synchronized
 821    def _copy_remote_blocks(self, remote_blocks={}):
 822        """Scan through the entire collection and ask Keep to copy remote blocks.
 823
 824        When accessing a remote collection, blocks will have a remote signature
 825        (+R instead of +A). Collect these signatures and request Keep to copy the
 826        blocks to the local cluster, returning local (+A) signatures.
 827
 828        :remote_blocks:
 829          Shared cache of remote to local block mappings. This is used to avoid
 830          doing extra work when blocks are shared by more than one file in
 831          different subdirectories.
 832
 833        """
 834        for item in self:
 835            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 836        return remote_blocks
 837
 838    @synchronized
 839    def diff(
 840            self,
 841            end_collection: 'RichCollectionBase',
 842            prefix: str=".",
 843            holding_collection: Optional['Collection']=None,
 844    ) -> ChangeList:
 845        """Build a list of differences between this collection and another
 846
 847        Arguments:
 848
 849        * end_collection: arvados.collection.RichCollectionBase --- A
 850          collection object with the desired end state. The returned diff
 851          list will describe how to go from the current collection object
 852          `self` to `end_collection`.
 853
 854        * prefix: str --- The name to use for this collection's stream in
 855          the diff list. Default `'.'`.
 856
 857        * holding_collection: arvados.collection.Collection | None --- A
 858          collection object used to hold objects for the returned diff
 859          list. By default, a new empty collection is created.
 860        """
 861        changes = []
 862        if holding_collection is None:
 863            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 864        for k in self:
 865            if k not in end_collection:
 866               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 867        for k in end_collection:
 868            if k in self:
 869                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 870                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 871                elif end_collection[k] != self[k]:
 872                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 873                else:
 874                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 875            else:
 876                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 877        return changes
 878
 879    @must_be_writable
 880    @synchronized
 881    def apply(self, changes: ChangeList) -> None:
 882        """Apply a list of changes from to this collection
 883
 884        This method takes a list of changes generated by
 885        `RichCollectionBase.diff` and applies it to this
 886        collection. Afterward, the state of this collection object will
 887        match the state of `end_collection` passed to `diff`. If a change
 888        conflicts with a local change, it will be saved to an alternate path
 889        indicating the conflict.
 890
 891        Arguments:
 892
 893        * changes: arvados.collection.ChangeList --- The list of differences
 894          generated by `RichCollectionBase.diff`.
 895        """
 896        if changes:
 897            self.set_committed(False)
 898        for change in changes:
 899            event_type = change[0]
 900            path = change[1]
 901            initial = change[2]
 902            local = self.find(path)
 903            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 904                                                                    time.gmtime()))
 905            if event_type == ADD:
 906                if local is None:
 907                    # No local file at path, safe to copy over new file
 908                    self.copy(initial, path)
 909                elif local is not None and local != initial:
 910                    # There is already local file and it is different:
 911                    # save change to conflict file.
 912                    self.copy(initial, conflictpath)
 913            elif event_type == MOD or event_type == TOK:
 914                final = change[3]
 915                if local == initial:
 916                    # Local matches the "initial" item so it has not
 917                    # changed locally and is safe to update.
 918                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 919                        # Replace contents of local file with new contents
 920                        local.replace_contents(final)
 921                    else:
 922                        # Overwrite path with new item; this can happen if
 923                        # path was a file and is now a collection or vice versa
 924                        self.copy(final, path, overwrite=True)
 925                else:
 926                    # Local is missing (presumably deleted) or local doesn't
 927                    # match the "start" value, so save change to conflict file
 928                    self.copy(final, conflictpath)
 929            elif event_type == DEL:
 930                if local == initial:
 931                    # Local item matches "initial" value, so it is safe to remove.
 932                    self.remove(path, recursive=True)
 933                # else, the file is modified or already removed, in either
 934                # case we don't want to try to remove it.
 935
 936    def portable_data_hash(self) -> str:
 937        """Get the portable data hash for this collection's manifest"""
 938        if self._manifest_locator and self.committed():
 939            # If the collection is already saved on the API server, and it's committed
 940            # then return API server's PDH response.
 941            return self._portable_data_hash
 942        else:
 943            stripped = self.portable_manifest_text().encode()
 944            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 945
 946    @synchronized
 947    def subscribe(self, callback: ChangeCallback) -> None:
 948        """Set a notify callback for changes to this collection
 949
 950        Arguments:
 951
 952        * callback: arvados.collection.ChangeCallback --- The callable to
 953          call each time the collection is changed.
 954        """
 955        if self._callback is None:
 956            self._callback = callback
 957        else:
 958            raise errors.ArgumentError("A callback is already set on this collection.")
 959
 960    @synchronized
 961    def unsubscribe(self) -> None:
 962        """Remove any notify callback set for changes to this collection"""
 963        if self._callback is not None:
 964            self._callback = None
 965
 966    @synchronized
 967    def notify(
 968            self,
 969            event: ChangeType,
 970            collection: 'RichCollectionBase',
 971            name: str,
 972            item: CollectionItem,
 973    ) -> None:
 974        """Notify any subscribed callback about a change to this collection
 975
 976        .. ATTENTION:: Internal
 977           This method is only meant to be used by other Collection methods.
 978
 979        If a callback has been registered with `RichCollectionBase.subscribe`,
 980        it will be called with information about a change to this collection.
 981        Then this notification will be propagated to this collection's root.
 982
 983        Arguments:
 984
 985        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 986          the collection.
 987
 988        * collection: arvados.collection.RichCollectionBase --- The
 989          collection that was modified.
 990
 991        * name: str --- The name of the file or stream within `collection` that
 992          was modified.
 993
 994        * item: arvados.arvfile.ArvadosFile |
 995          arvados.collection.Subcollection --- The new contents at `name`
 996          within `collection`.
 997        """
 998        if self._callback:
 999            self._callback(event, collection, name, item)
1000        self.root_collection().notify(event, collection, name, item)
1001
1002    @synchronized
1003    def __eq__(self, other: Any) -> bool:
1004        """Indicate whether this collection object is equal to another"""
1005        if other is self:
1006            return True
1007        if not isinstance(other, RichCollectionBase):
1008            return False
1009        if len(self._items) != len(other):
1010            return False
1011        for k in self._items:
1012            if k not in other:
1013                return False
1014            if self._items[k] != other[k]:
1015                return False
1016        return True
1017
1018    def __ne__(self, other: Any) -> bool:
1019        """Indicate whether this collection object is not equal to another"""
1020        return not self.__eq__(other)
1021
1022    @synchronized
1023    def flush(self) -> None:
1024        """Upload any pending data to Keep"""
1025        for e in self.values():
1026            e.flush()
1027
1028
1029class Collection(RichCollectionBase):
1030    """Read and manipulate an Arvados collection
1031
1032    This class provides a high-level interface to create, read, and update
1033    Arvados collections and their contents. Refer to the Arvados Python SDK
1034    cookbook for [an introduction to using the Collection class][cookbook].
1035
1036    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1037    """
1038
1039    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1040                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1041                 keep_client: Optional['arvados.keep.KeepClient']=None,
1042                 num_retries: int=10,
1043                 parent: Optional['Collection']=None,
1044                 apiconfig: Optional[Mapping[str, str]]=None,
1045                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1046                 replication_desired: Optional[int]=None,
1047                 storage_classes_desired: Optional[List[str]]=None,
1048                 put_threads: Optional[int]=None):
1049        """Initialize a Collection object
1050
1051        Arguments:
1052
1053        * manifest_locator_or_text: str | None --- This string can contain a
1054          collection manifest text, portable data hash, or UUID. When given a
1055          portable data hash or UUID, this instance will load a collection
1056          record from the API server. Otherwise, this instance will represent a
1057          new collection without an API server record. The default value `None`
1058          instantiates a new collection with an empty manifest.
1059
1060        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1061          Arvados API client object this instance uses to make requests. If
1062          none is given, this instance creates its own client using the
1063          settings from `apiconfig` (see below). If your client instantiates
1064          many Collection objects, you can help limit memory utilization by
1065          calling `arvados.api.api` to construct an
1066          `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1067          for every Collection.
1068
1069        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1070          object this instance uses to make requests. If none is given, this
1071          instance creates its own client using its `api_client`.
1072
1073        * num_retries: int --- The number of times that client requests are
1074          retried. Default 10.
1075
1076        * parent: arvados.collection.Collection | None --- The parent Collection
1077          object of this instance, if any. This argument is primarily used by
1078          other Collection methods; user client code shouldn't need to use it.
1079
1080        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1081          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1082          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1083          Collection object constructs one from these settings. If no
1084          mapping is provided, calls `arvados.config.settings` to get these
1085          parameters from user configuration.
1086
1087        * block_manager: arvados.arvfile._BlockManager | None --- The
1088          _BlockManager object used by this instance to coordinate reading
1089          and writing Keep data blocks. If none is given, this instance
1090          constructs its own. This argument is primarily used by other
1091          Collection methods; user client code shouldn't need to use it.
1092
1093        * replication_desired: int | None --- This controls both the value of
1094          the `replication_desired` field on API collection records saved by
1095          this class, as well as the number of Keep services that the object
1096          writes new data blocks to. If none is given, uses the default value
1097          configured for the cluster.
1098
1099        * storage_classes_desired: list[str] | None --- This controls both
1100          the value of the `storage_classes_desired` field on API collection
1101          records saved by this class, as well as selecting which specific
1102          Keep services the object writes new data blocks to. If none is
1103          given, defaults to an empty list.
1104
1105        * put_threads: int | None --- The number of threads to run
1106          simultaneously to upload data blocks to Keep. This value is used when
1107          building a new `block_manager`. It is unused when a `block_manager`
1108          is provided.
1109        """
1110
1111        if storage_classes_desired and type(storage_classes_desired) is not list:
1112            raise errors.ArgumentError("storage_classes_desired must be list type.")
1113
1114        super(Collection, self).__init__(parent)
1115        self._api_client = api_client
1116        self._keep_client = keep_client
1117
1118        # Use the keep client from ThreadSafeAPIClient
1119        if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1120            self._keep_client = self._api_client.keep
1121
1122        self._block_manager = block_manager
1123        self.replication_desired = replication_desired
1124        self._storage_classes_desired = storage_classes_desired
1125        self.put_threads = put_threads
1126
1127        if apiconfig:
1128            self._config = apiconfig
1129        else:
1130            self._config = config.settings()
1131
1132        self.num_retries = num_retries
1133        self._manifest_locator = None
1134        self._manifest_text = None
1135        self._portable_data_hash = None
1136        self._api_response = None
1137        self._past_versions = set()
1138
1139        self.lock = threading.RLock()
1140        self.events = None
1141
1142        if manifest_locator_or_text:
1143            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1144                self._manifest_locator = manifest_locator_or_text
1145            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1146                self._manifest_locator = manifest_locator_or_text
1147                if not self._has_local_collection_uuid():
1148                    self._has_remote_blocks = True
1149            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1150                self._manifest_text = manifest_locator_or_text
1151                if '+R' in self._manifest_text:
1152                    self._has_remote_blocks = True
1153            else:
1154                raise errors.ArgumentError(
1155                    "Argument to CollectionReader is not a manifest or a collection UUID")
1156
1157            try:
1158                self._populate()
1159            except errors.SyntaxError as e:
1160                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1161
1162    def storage_classes_desired(self) -> List[str]:
1163        """Get this collection's `storage_classes_desired` value"""
1164        return self._storage_classes_desired or []
1165
1166    def root_collection(self) -> 'Collection':
1167        return self
1168
1169    def get_properties(self) -> Properties:
1170        """Get this collection's properties
1171
1172        This method always returns a dict. If this collection object does not
1173        have an associated API record, or that record does not have any
1174        properties set, this method returns an empty dict.
1175        """
1176        if self._api_response and self._api_response["properties"]:
1177            return self._api_response["properties"]
1178        else:
1179            return {}
1180
1181    def get_trash_at(self) -> Optional[datetime.datetime]:
1182        """Get this collection's `trash_at` field
1183
1184        This method parses the `trash_at` field of the collection's API
1185        record and returns a datetime from it. If that field is not set, or
1186        this collection object does not have an associated API record,
1187        returns None.
1188        """
1189        if self._api_response and self._api_response["trash_at"]:
1190            try:
1191                return ciso8601.parse_datetime(self._api_response["trash_at"])
1192            except ValueError:
1193                return None
1194        else:
1195            return None
1196
1197    def stream_name(self) -> str:
1198        return "."
1199
1200    def writable(self) -> bool:
1201        return True
1202
1203    @synchronized
1204    def known_past_version(
1205            self,
1206            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1207    ) -> bool:
1208        """Indicate whether an API record for this collection has been seen before
1209
1210        As this collection object loads records from the API server, it records
1211        their `modified_at` and `portable_data_hash` fields. This method accepts
1212        a 2-tuple with values for those fields, and returns `True` if the
1213        combination was previously loaded.
1214        """
1215        return modified_at_and_portable_data_hash in self._past_versions
1216
1217    @synchronized
1218    @retry_method
1219    def update(
1220            self,
1221            other: Optional['Collection']=None,
1222            num_retries: Optional[int]=None,
1223    ) -> None:
1224        """Merge another collection's contents into this one
1225
1226        This method compares the manifest of this collection instance with
1227        another, then updates this instance's manifest with changes from the
1228        other, renaming files to flag conflicts where necessary.
1229
1230        When called without any arguments, this method reloads the collection's
1231        API record, and updates this instance with any changes that have
1232        appeared server-side. If this instance does not have a corresponding
1233        API record, this method raises `arvados.errors.ArgumentError`.
1234
1235        Arguments:
1236
1237        * other: arvados.collection.Collection | None --- The collection
1238          whose contents should be merged into this instance. When not
1239          provided, this method reloads this collection's API record and
1240          constructs a Collection object from it.  If this instance does not
1241          have a corresponding API record, this method raises
1242          `arvados.errors.ArgumentError`.
1243
1244        * num_retries: int | None --- The number of times to retry reloading
1245          the collection's API record from the API server. If not specified,
1246          uses the `num_retries` provided when this instance was constructed.
1247        """
1248        if other is None:
1249            if self._manifest_locator is None:
1250                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1251            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1252            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1253                response.get("portable_data_hash") != self.portable_data_hash()):
1254                # The record on the server is different from our current one, but we've seen it before,
1255                # so ignore it because it's already been merged.
1256                # However, if it's the same as our current record, proceed with the update, because we want to update
1257                # our tokens.
1258                return
1259            else:
1260                self._remember_api_response(response)
1261            other = CollectionReader(response["manifest_text"])
1262        baseline = CollectionReader(self._manifest_text)
1263        self.apply(baseline.diff(other))
1264        self._manifest_text = self.manifest_text()
1265
1266    @synchronized
1267    def _my_api(self):
1268        if self._api_client is None:
1269            self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1270            if self._keep_client is None:
1271                self._keep_client = self._api_client.keep
1272        return self._api_client
1273
1274    @synchronized
1275    def _my_keep(self):
1276        if self._keep_client is None:
1277            if self._api_client is None:
1278                self._my_api()
1279            else:
1280                self._keep_client = KeepClient(api_client=self._api_client)
1281        return self._keep_client
1282
1283    @synchronized
1284    def _my_block_manager(self):
1285        if self._block_manager is None:
1286            copies = (self.replication_desired or
1287                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1288                                                   2))
1289            self._block_manager = _BlockManager(self._my_keep(),
1290                                                copies=copies,
1291                                                put_threads=self.put_threads,
1292                                                num_retries=self.num_retries,
1293                                                storage_classes_func=self.storage_classes_desired)
1294        return self._block_manager
1295
1296    def _remember_api_response(self, response):
1297        self._api_response = response
1298        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1299
1300    def _populate_from_api_server(self):
1301        # As in KeepClient itself, we must wait until the last
1302        # possible moment to instantiate an API client, in order to
1303        # avoid tripping up clients that don't have access to an API
1304        # server.  If we do build one, make sure our Keep client uses
1305        # it.  If instantiation fails, we'll fall back to the except
1306        # clause, just like any other Collection lookup
1307        # failure. Return an exception, or None if successful.
1308        self._remember_api_response(self._my_api().collections().get(
1309            uuid=self._manifest_locator).execute(
1310                num_retries=self.num_retries))
1311        self._manifest_text = self._api_response['manifest_text']
1312        self._portable_data_hash = self._api_response['portable_data_hash']
1313        # If not overriden via kwargs, we should try to load the
1314        # replication_desired and storage_classes_desired from the API server
1315        if self.replication_desired is None:
1316            self.replication_desired = self._api_response.get('replication_desired', None)
1317        if self._storage_classes_desired is None:
1318            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1319
1320    def _populate(self):
1321        if self._manifest_text is None:
1322            if self._manifest_locator is None:
1323                return
1324            else:
1325                self._populate_from_api_server()
1326        self._baseline_manifest = self._manifest_text
1327        self._import_manifest(self._manifest_text)
1328
1329    def _has_collection_uuid(self):
1330        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1331
1332    def _has_local_collection_uuid(self):
1333        return self._has_collection_uuid and \
1334            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1335
1336    def __enter__(self):
1337        return self
1338
1339    def __exit__(self, exc_type, exc_value, traceback):
1340        """Exit a context with this collection instance
1341
1342        If no exception was raised inside the context block, and this
1343        collection is writable and has a corresponding API record, that
1344        record will be updated to match the state of this instance at the end
1345        of the block.
1346        """
1347        if exc_type is None:
1348            if self.writable() and self._has_collection_uuid():
1349                self.save()
1350        self.stop_threads()
1351
1352    def stop_threads(self) -> None:
1353        """Stop background Keep upload/download threads"""
1354        if self._block_manager is not None:
1355            self._block_manager.stop_threads()
1356
1357    @synchronized
1358    def manifest_locator(self) -> Optional[str]:
1359        """Get this collection's manifest locator, if any
1360
1361        * If this collection instance is associated with an API record with a
1362          UUID, return that.
1363        * Otherwise, if this collection instance was loaded from an API record
1364          by portable data hash, return that.
1365        * Otherwise, return `None`.
1366        """
1367        return self._manifest_locator
1368
1369    @synchronized
1370    def clone(
1371            self,
1372            new_parent: Optional['Collection']=None,
1373            new_name: Optional[str]=None,
1374            readonly: bool=False,
1375            new_config: Optional[Mapping[str, str]]=None,
1376    ) -> 'Collection':
1377        """Create a Collection object with the same contents as this instance
1378
1379        This method creates a new Collection object with contents that match
1380        this instance's. The new collection will not be associated with any API
1381        record.
1382
1383        Arguments:
1384
1385        * new_parent: arvados.collection.Collection | None --- This value is
1386          passed to the new Collection's constructor as the `parent`
1387          argument.
1388
1389        * new_name: str | None --- This value is unused.
1390
1391        * readonly: bool --- If this value is true, this method constructs and
1392          returns a `CollectionReader`. Otherwise, it returns a mutable
1393          `Collection`. Default `False`.
1394
1395        * new_config: Mapping[str, str] | None --- This value is passed to the
1396          new Collection's constructor as `apiconfig`. If no value is provided,
1397          defaults to the configuration passed to this instance's constructor.
1398        """
1399        if new_config is None:
1400            new_config = self._config
1401        if readonly:
1402            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1403        else:
1404            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1405
1406        newcollection._clonefrom(self)
1407        return newcollection
1408
1409    @synchronized
1410    def api_response(self) -> Optional[Dict[str, Any]]:
1411        """Get this instance's associated API record
1412
1413        If this Collection instance has an associated API record, return it.
1414        Otherwise, return `None`.
1415        """
1416        return self._api_response
1417
1418    def find_or_create(
1419            self,
1420            path: str,
1421            create_type: CreateType,
1422    ) -> CollectionItem:
1423        if path == ".":
1424            return self
1425        else:
1426            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1427
1428    def find(self, path: str) -> CollectionItem:
1429        if path == ".":
1430            return self
1431        else:
1432            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1433
1434    def remove(self, path: str, recursive: bool=False) -> None:
1435        if path == ".":
1436            raise errors.ArgumentError("Cannot remove '.'")
1437        else:
1438            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1439
1440    @must_be_writable
1441    @synchronized
1442    @retry_method
1443    def save(
1444            self,
1445            properties: Optional[Properties]=None,
1446            storage_classes: Optional[StorageClasses]=None,
1447            trash_at: Optional[datetime.datetime]=None,
1448            merge: bool=True,
1449            num_retries: Optional[int]=None,
1450            preserve_version: bool=False,
1451    ) -> str:
1452        """Save collection to an existing API record
1453
1454        This method updates the instance's corresponding API record to match
1455        the instance's state. If this instance does not have a corresponding API
1456        record yet, raises `AssertionError`. (To create a new API record, use
1457        `Collection.save_new`.) This method returns the saved collection
1458        manifest.
1459
1460        Arguments:
1461
1462        * properties: dict[str, Any] | None --- If provided, the API record will
1463          be updated with these properties. Note this will completely replace
1464          any existing properties.
1465
1466        * storage_classes: list[str] | None --- If provided, the API record will
1467          be updated with this value in the `storage_classes_desired` field.
1468          This value will also be saved on the instance and used for any
1469          changes that follow.
1470
1471        * trash_at: datetime.datetime | None --- If provided, the API record
1472          will be updated with this value in the `trash_at` field.
1473
1474        * merge: bool --- If `True` (the default), this method will first
1475          reload this collection's API record, and merge any new contents into
1476          this instance before saving changes. See `Collection.update` for
1477          details.
1478
1479        * num_retries: int | None --- The number of times to retry reloading
1480          the collection's API record from the API server. If not specified,
1481          uses the `num_retries` provided when this instance was constructed.
1482
1483        * preserve_version: bool --- This value will be passed to directly
1484          to the underlying API call. If `True`, the Arvados API will
1485          preserve the versions of this collection both immediately before
1486          and after the update. If `True` when the API server is not
1487          configured with collection versioning, this method raises
1488          `arvados.errors.ArgumentError`.
1489        """
1490        if properties and type(properties) is not dict:
1491            raise errors.ArgumentError("properties must be dictionary type.")
1492
1493        if storage_classes and type(storage_classes) is not list:
1494            raise errors.ArgumentError("storage_classes must be list type.")
1495        if storage_classes:
1496            self._storage_classes_desired = storage_classes
1497
1498        if trash_at and type(trash_at) is not datetime.datetime:
1499            raise errors.ArgumentError("trash_at must be datetime type.")
1500
1501        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1502            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1503
1504        body={}
1505        if properties:
1506            body["properties"] = properties
1507        if self.storage_classes_desired():
1508            body["storage_classes_desired"] = self.storage_classes_desired()
1509        if trash_at:
1510            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1511            body["trash_at"] = t
1512        if preserve_version:
1513            body["preserve_version"] = preserve_version
1514
1515        if not self.committed():
1516            if self._has_remote_blocks:
1517                # Copy any remote blocks to the local cluster.
1518                self._copy_remote_blocks(remote_blocks={})
1519                self._has_remote_blocks = False
1520            if not self._has_collection_uuid():
1521                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1522            elif not self._has_local_collection_uuid():
1523                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1524
1525            self._my_block_manager().commit_all()
1526
1527            if merge:
1528                self.update()
1529
1530            text = self.manifest_text(strip=False)
1531            body['manifest_text'] = text
1532
1533            self._remember_api_response(self._my_api().collections().update(
1534                uuid=self._manifest_locator,
1535                body=body
1536                ).execute(num_retries=num_retries))
1537            self._manifest_text = self._api_response["manifest_text"]
1538            self._portable_data_hash = self._api_response["portable_data_hash"]
1539            self.set_committed(True)
1540        elif body:
1541            self._remember_api_response(self._my_api().collections().update(
1542                uuid=self._manifest_locator,
1543                body=body
1544                ).execute(num_retries=num_retries))
1545
1546        return self._manifest_text
1547
1548
1549    @must_be_writable
1550    @synchronized
1551    @retry_method
1552    def save_new(
1553            self,
1554            name: Optional[str]=None,
1555            create_collection_record: bool=True,
1556            owner_uuid: Optional[str]=None,
1557            properties: Optional[Properties]=None,
1558            storage_classes: Optional[StorageClasses]=None,
1559            trash_at: Optional[datetime.datetime]=None,
1560            ensure_unique_name: bool=False,
1561            num_retries: Optional[int]=None,
1562            preserve_version: bool=False,
1563    ):
1564        """Save collection to a new API record
1565
1566        This method finishes uploading new data blocks and (optionally)
1567        creates a new API collection record with the provided data. If a new
1568        record is created, this instance becomes associated with that record
1569        for future updates like `save()`. This method returns the saved
1570        collection manifest.
1571
1572        Arguments:
1573
1574        * name: str | None --- The `name` field to use on the new collection
1575          record. If not specified, a generic default name is generated.
1576
1577        * create_collection_record: bool --- If `True` (the default), creates a
1578          collection record on the API server. If `False`, the method finishes
1579          all data uploads and only returns the resulting collection manifest
1580          without sending it to the API server.
1581
1582        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1583          new collection record.
1584
1585        * properties: dict[str, Any] | None --- The `properties` field to use on
1586          the new collection record.
1587
1588        * storage_classes: list[str] | None --- The
1589          `storage_classes_desired` field to use on the new collection record.
1590
1591        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1592          on the new collection record.
1593
1594        * ensure_unique_name: bool --- This value is passed directly to the
1595          Arvados API when creating the collection record. If `True`, the API
1596          server may modify the submitted `name` to ensure the collection's
1597          `name`+`owner_uuid` combination is unique. If `False` (the default),
1598          if a collection already exists with this same `name`+`owner_uuid`
1599          combination, creating a collection record will raise a validation
1600          error.
1601
1602        * num_retries: int | None --- The number of times to retry reloading
1603          the collection's API record from the API server. If not specified,
1604          uses the `num_retries` provided when this instance was constructed.
1605
1606        * preserve_version: bool --- This value will be passed to directly
1607          to the underlying API call. If `True`, the Arvados API will
1608          preserve the versions of this collection both immediately before
1609          and after the update. If `True` when the API server is not
1610          configured with collection versioning, this method raises
1611          `arvados.errors.ArgumentError`.
1612        """
1613        if properties and type(properties) is not dict:
1614            raise errors.ArgumentError("properties must be dictionary type.")
1615
1616        if storage_classes and type(storage_classes) is not list:
1617            raise errors.ArgumentError("storage_classes must be list type.")
1618
1619        if trash_at and type(trash_at) is not datetime.datetime:
1620            raise errors.ArgumentError("trash_at must be datetime type.")
1621
1622        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1623            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1624
1625        if self._has_remote_blocks:
1626            # Copy any remote blocks to the local cluster.
1627            self._copy_remote_blocks(remote_blocks={})
1628            self._has_remote_blocks = False
1629
1630        if storage_classes:
1631            self._storage_classes_desired = storage_classes
1632
1633        self._my_block_manager().commit_all()
1634        text = self.manifest_text(strip=False)
1635
1636        if create_collection_record:
1637            if name is None:
1638                name = "New collection"
1639                ensure_unique_name = True
1640
1641            body = {"manifest_text": text,
1642                    "name": name,
1643                    "replication_desired": self.replication_desired}
1644            if owner_uuid:
1645                body["owner_uuid"] = owner_uuid
1646            if properties:
1647                body["properties"] = properties
1648            if self.storage_classes_desired():
1649                body["storage_classes_desired"] = self.storage_classes_desired()
1650            if trash_at:
1651                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1652                body["trash_at"] = t
1653            if preserve_version:
1654                body["preserve_version"] = preserve_version
1655
1656            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1657            text = self._api_response["manifest_text"]
1658
1659            self._manifest_locator = self._api_response["uuid"]
1660            self._portable_data_hash = self._api_response["portable_data_hash"]
1661
1662            self._manifest_text = text
1663            self.set_committed(True)
1664
1665        return text
1666
1667    _token_re = re.compile(r'(\S+)(\s+|$)')
1668    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1669    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1670
1671    def _unescape_manifest_path(self, path):
1672        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1673
1674    @synchronized
1675    def _import_manifest(self, manifest_text):
1676        """Import a manifest into a `Collection`.
1677
1678        :manifest_text:
1679          The manifest text to import from.
1680
1681        """
1682        if len(self) > 0:
1683            raise ArgumentError("Can only import manifest into an empty collection")
1684
1685        STREAM_NAME = 0
1686        BLOCKS = 1
1687        SEGMENTS = 2
1688
1689        stream_name = None
1690        state = STREAM_NAME
1691
1692        for token_and_separator in self._token_re.finditer(manifest_text):
1693            tok = token_and_separator.group(1)
1694            sep = token_and_separator.group(2)
1695
1696            if state == STREAM_NAME:
1697                # starting a new stream
1698                stream_name = self._unescape_manifest_path(tok)
1699                blocks = []
1700                segments = []
1701                streamoffset = 0
1702                state = BLOCKS
1703                self.find_or_create(stream_name, COLLECTION)
1704                continue
1705
1706            if state == BLOCKS:
1707                block_locator = self._block_re.match(tok)
1708                if block_locator:
1709                    blocksize = int(block_locator.group(1))
1710                    blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1711                    streamoffset += blocksize
1712                else:
1713                    state = SEGMENTS
1714
1715            if state == SEGMENTS:
1716                file_segment = self._segment_re.match(tok)
1717                if file_segment:
1718                    pos = int(file_segment.group(1))
1719                    size = int(file_segment.group(2))
1720                    name = self._unescape_manifest_path(file_segment.group(3))
1721                    if name.split('/')[-1] == '.':
1722                        # placeholder for persisting an empty directory, not a real file
1723                        if len(name) > 2:
1724                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1725                    else:
1726                        filepath = os.path.join(stream_name, name)
1727                        try:
1728                            afile = self.find_or_create(filepath, FILE)
1729                        except IOError as e:
1730                            if e.errno == errno.ENOTDIR:
1731                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1732                            else:
1733                                raise e from None
1734                        if isinstance(afile, ArvadosFile):
1735                            afile.add_segment(blocks, pos, size)
1736                        else:
1737                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1738                else:
1739                    # error!
1740                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1741
1742            if sep == "\n":
1743                stream_name = None
1744                state = STREAM_NAME
1745
1746        self.set_committed(True)
1747
1748    @synchronized
1749    def notify(
1750            self,
1751            event: ChangeType,
1752            collection: 'RichCollectionBase',
1753            name: str,
1754            item: CollectionItem,
1755    ) -> None:
1756        if self._callback:
1757            self._callback(event, collection, name, item)
1758
1759
1760class Subcollection(RichCollectionBase):
1761    """Read and manipulate a stream/directory within an Arvados collection
1762
1763    This class represents a single stream (like a directory) within an Arvados
1764    `Collection`. It is returned by `Collection.find` and provides the same API.
1765    Operations that work on the API collection record propagate to the parent
1766    `Collection` object.
1767    """
1768
1769    def __init__(self, parent, name):
1770        super(Subcollection, self).__init__(parent)
1771        self.lock = self.root_collection().lock
1772        self._manifest_text = None
1773        self.name = name
1774        self.num_retries = parent.num_retries
1775
1776    def root_collection(self) -> 'Collection':
1777        return self.parent.root_collection()
1778
1779    def writable(self) -> bool:
1780        return self.root_collection().writable()
1781
1782    def _my_api(self):
1783        return self.root_collection()._my_api()
1784
1785    def _my_keep(self):
1786        return self.root_collection()._my_keep()
1787
1788    def _my_block_manager(self):
1789        return self.root_collection()._my_block_manager()
1790
1791    def stream_name(self) -> str:
1792        return os.path.join(self.parent.stream_name(), self.name)
1793
1794    @synchronized
1795    def clone(
1796            self,
1797            new_parent: Optional['Collection']=None,
1798            new_name: Optional[str]=None,
1799    ) -> 'Subcollection':
1800        c = Subcollection(new_parent, new_name)
1801        c._clonefrom(self)
1802        return c
1803
1804    @must_be_writable
1805    @synchronized
1806    def _reparent(self, newparent, newname):
1807        self.set_committed(False)
1808        self.flush()
1809        self.parent.remove(self.name, recursive=True)
1810        self.parent = newparent
1811        self.name = newname
1812        self.lock = self.parent.root_collection().lock
1813
1814    @synchronized
1815    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1816        """Encode empty directories by using an \056-named (".") empty file"""
1817        if len(self._items) == 0:
1818            return "%s %s 0:0:\\056\n" % (
1819                streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1820        return super(Subcollection, self)._get_manifest_text(stream_name,
1821                                                             strip, normalize,
1822                                                             only_committed)
1823
1824
1825class CollectionReader(Collection):
1826    """Read-only `Collection` subclass
1827
1828    This class will never create or update any API collection records. You can
1829    use this class for additional code safety when you only need to read
1830    existing collections.
1831    """
1832    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1833        self._in_init = True
1834        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1835        self._in_init = False
1836
1837        # Forego any locking since it should never change once initialized.
1838        self.lock = NoopLock()
1839
1840        # Backwards compatability with old CollectionReader
1841        # all_streams() and all_files()
1842        self._streams = None
1843
1844    def writable(self) -> bool:
1845        return self._in_init
ADD = 'add'

Argument value for Collection methods to represent an added item

DEL = 'del'

Argument value for Collection methods to represent a removed item

MOD = 'mod'

Argument value for Collection methods to represent a modified item

TOK = 'tok'

Argument value for Collection methods to represent an item with token differences

FILE = 'file'

create_type value for Collection.find_or_create

COLLECTION = 'collection'

create_type value for Collection.find_or_create

ChangeList = typing.List[typing.Union[typing.Tuple[typing.Literal['add', 'del'], str, ForwardRef('Collection')], typing.Tuple[typing.Literal['mod', 'tok'], str, ForwardRef('Collection'), ForwardRef('Collection')]]]
ChangeType = typing.Literal['add', 'del', 'mod', 'tok']
CollectionItem = typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]
ChangeCallback = typing.Callable[[typing.Literal['add', 'del', 'mod', 'tok'], ForwardRef('Collection'), str, typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]], object]
CreateType = typing.Literal['collection', 'file']
Properties = typing.Dict[str, typing.Any]
StorageClasses = typing.List[str]
class CollectionBase:
 86class CollectionBase(object):
 87    """Abstract base class for Collection classes
 88
 89    .. ATTENTION:: Internal
 90       This class is meant to be used by other parts of the SDK. User code
 91       should instantiate or subclass `Collection` or one of its subclasses
 92       directly.
 93    """
 94
 95    def __enter__(self):
 96        """Enter a context block with this collection instance"""
 97        return self
 98
 99    def __exit__(self, exc_type, exc_value, traceback):
100        """Exit a context block with this collection instance"""
101        pass
102
103    def _my_keep(self):
104        if self._keep_client is None:
105            self._keep_client = KeepClient(api_client=self._api_client,
106                                           num_retries=self.num_retries)
107        return self._keep_client
108
109    def stripped_manifest(self) -> str:
110        """Create a copy of the collection manifest with only size hints
111
112        This method returns a string with the current collection's manifest
113        text with all non-portable locator hints like permission hints and
114        remote cluster hints removed. The only hints in the returned manifest
115        will be size hints.
116        """
117        raw = self.manifest_text()
118        clean = []
119        for line in raw.split("\n"):
120            fields = line.split()
121            if fields:
122                clean_fields = fields[:1] + [
123                    (re.sub(r'\+[^\d][^\+]*', '', x)
124                     if re.match(arvados.util.keep_locator_pattern, x)
125                     else x)
126                    for x in fields[1:]]
127                clean += [' '.join(clean_fields), "\n"]
128        return ''.join(clean)

Abstract base class for Collection classes

def stripped_manifest(self) -> str:
109    def stripped_manifest(self) -> str:
110        """Create a copy of the collection manifest with only size hints
111
112        This method returns a string with the current collection's manifest
113        text with all non-portable locator hints like permission hints and
114        remote cluster hints removed. The only hints in the returned manifest
115        will be size hints.
116        """
117        raw = self.manifest_text()
118        clean = []
119        for line in raw.split("\n"):
120            fields = line.split()
121            if fields:
122                clean_fields = fields[:1] + [
123                    (re.sub(r'\+[^\d][^\+]*', '', x)
124                     if re.match(arvados.util.keep_locator_pattern, x)
125                     else x)
126                    for x in fields[1:]]
127                clean += [' '.join(clean_fields), "\n"]
128        return ''.join(clean)

Create a copy of the collection manifest with only size hints

This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.

class RichCollectionBase(CollectionBase):
 154class RichCollectionBase(CollectionBase):
 155    """Base class for Collection classes
 156
 157    .. ATTENTION:: Internal
 158       This class is meant to be used by other parts of the SDK. User code
 159       should instantiate or subclass `Collection` or one of its subclasses
 160       directly.
 161    """
 162
 163    def __init__(self, parent=None):
 164        self.parent = parent
 165        self._committed = False
 166        self._has_remote_blocks = False
 167        self._callback = None
 168        self._items = {}
 169
 170    def _my_api(self):
 171        raise NotImplementedError()
 172
 173    def _my_keep(self):
 174        raise NotImplementedError()
 175
 176    def _my_block_manager(self):
 177        raise NotImplementedError()
 178
 179    def writable(self) -> bool:
 180        """Indicate whether this collection object can be modified
 181
 182        This method returns `False` if this object is a `CollectionReader`,
 183        else `True`.
 184        """
 185        raise NotImplementedError()
 186
 187    def root_collection(self) -> 'Collection':
 188        """Get this collection's root collection object
 189
 190        If you open a subcollection with `Collection.find`, calling this method
 191        on that subcollection returns the source Collection object.
 192        """
 193        raise NotImplementedError()
 194
 195    def stream_name(self) -> str:
 196        """Get the name of the manifest stream represented by this collection
 197
 198        If you open a subcollection with `Collection.find`, calling this method
 199        on that subcollection returns the name of the stream you opened.
 200        """
 201        raise NotImplementedError()
 202
 203    @synchronized
 204    def has_remote_blocks(self) -> bool:
 205        """Indiciate whether the collection refers to remote data
 206
 207        Returns `True` if the collection manifest includes any Keep locators
 208        with a remote hint (`+R`), else `False`.
 209        """
 210        if self._has_remote_blocks:
 211            return True
 212        for item in self:
 213            if self[item].has_remote_blocks():
 214                return True
 215        return False
 216
 217    @synchronized
 218    def set_has_remote_blocks(self, val: bool) -> None:
 219        """Cache whether this collection refers to remote blocks
 220
 221        .. ATTENTION:: Internal
 222           This method is only meant to be used by other Collection methods.
 223
 224        Set this collection's cached "has remote blocks" flag to the given
 225        value.
 226        """
 227        self._has_remote_blocks = val
 228        if self.parent:
 229            self.parent.set_has_remote_blocks(val)
 230
 231    @must_be_writable
 232    @synchronized
 233    def find_or_create(
 234            self,
 235            path: str,
 236            create_type: CreateType,
 237    ) -> CollectionItem:
 238        """Get the item at the given path, creating it if necessary
 239
 240        If `path` refers to a stream in this collection, returns a
 241        corresponding `Subcollection` object. If `path` refers to a file in
 242        this collection, returns a corresponding
 243        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 244        this collection, then this method creates a new object and returns
 245        it, creating parent streams as needed. The type of object created is
 246        determined by the value of `create_type`.
 247
 248        Arguments:
 249
 250        * path: str --- The path to find or create within this collection.
 251
 252        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 253          create at `path` if one does not exist. Passing `COLLECTION`
 254          creates a stream and returns the corresponding
 255          `Subcollection`. Passing `FILE` creates a new file and returns the
 256          corresponding `arvados.arvfile.ArvadosFile`.
 257        """
 258        pathcomponents = path.split("/", 1)
 259        if pathcomponents[0]:
 260            item = self._items.get(pathcomponents[0])
 261            if len(pathcomponents) == 1:
 262                if item is None:
 263                    # create new file
 264                    if create_type == COLLECTION:
 265                        item = Subcollection(self, pathcomponents[0])
 266                    else:
 267                        item = ArvadosFile(self, pathcomponents[0])
 268                    self._items[pathcomponents[0]] = item
 269                    self.set_committed(False)
 270                    self.notify(ADD, self, pathcomponents[0], item)
 271                return item
 272            else:
 273                if item is None:
 274                    # create new collection
 275                    item = Subcollection(self, pathcomponents[0])
 276                    self._items[pathcomponents[0]] = item
 277                    self.set_committed(False)
 278                    self.notify(ADD, self, pathcomponents[0], item)
 279                if isinstance(item, RichCollectionBase):
 280                    return item.find_or_create(pathcomponents[1], create_type)
 281                else:
 282                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 283        else:
 284            return self
 285
 286    @synchronized
 287    def find(self, path: str) -> CollectionItem:
 288        """Get the item at the given path
 289
 290        If `path` refers to a stream in this collection, returns a
 291        corresponding `Subcollection` object. If `path` refers to a file in
 292        this collection, returns a corresponding
 293        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 294        this collection, then this method raises `NotADirectoryError`.
 295
 296        Arguments:
 297
 298        * path: str --- The path to find or create within this collection.
 299        """
 300        if not path:
 301            raise errors.ArgumentError("Parameter 'path' is empty.")
 302
 303        pathcomponents = path.split("/", 1)
 304        if pathcomponents[0] == '':
 305            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 306
 307        item = self._items.get(pathcomponents[0])
 308        if item is None:
 309            return None
 310        elif len(pathcomponents) == 1:
 311            return item
 312        else:
 313            if isinstance(item, RichCollectionBase):
 314                if pathcomponents[1]:
 315                    return item.find(pathcomponents[1])
 316                else:
 317                    return item
 318            else:
 319                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 320
 321    @synchronized
 322    def mkdirs(self, path: str) -> 'Subcollection':
 323        """Create and return a subcollection at `path`
 324
 325        If `path` exists within this collection, raises `FileExistsError`.
 326        Otherwise, creates a stream at that path and returns the
 327        corresponding `Subcollection`.
 328        """
 329        if self.find(path) != None:
 330            raise IOError(errno.EEXIST, "Directory or file exists", path)
 331
 332        return self.find_or_create(path, COLLECTION)
 333
 334    def open(
 335            self,
 336            path: str,
 337            mode: str="r",
 338            encoding: Optional[str]=None
 339    ) -> IO:
 340        """Open a file-like object within the collection
 341
 342        This method returns a file-like object that can read and/or write the
 343        file located at `path` within the collection. If you attempt to write
 344        a `path` that does not exist, the file is created with `find_or_create`.
 345        If the file cannot be opened for any other reason, this method raises
 346        `OSError` with an appropriate errno.
 347
 348        Arguments:
 349
 350        * path: str --- The path of the file to open within this collection
 351
 352        * mode: str --- The mode to open this file. Supports all the same
 353          values as `builtins.open`.
 354
 355        * encoding: str | None --- The text encoding of the file. Only used
 356          when the file is opened in text mode. The default is
 357          platform-dependent.
 358
 359        """
 360        if not re.search(r'^[rwa][bt]?\+?$', mode):
 361            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 362
 363        if mode[0] == 'r' and '+' not in mode:
 364            fclass = ArvadosFileReader
 365            arvfile = self.find(path)
 366        elif not self.writable():
 367            raise IOError(errno.EROFS, "Collection is read only")
 368        else:
 369            fclass = ArvadosFileWriter
 370            arvfile = self.find_or_create(path, FILE)
 371
 372        if arvfile is None:
 373            raise IOError(errno.ENOENT, "File not found", path)
 374        if not isinstance(arvfile, ArvadosFile):
 375            raise IOError(errno.EISDIR, "Is a directory", path)
 376
 377        if mode[0] == 'w':
 378            arvfile.truncate(0)
 379
 380        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 381        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 382        if 'b' not in mode:
 383            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 384            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 385        return f
 386
 387    def modified(self) -> bool:
 388        """Indicate whether this collection has an API server record
 389
 390        Returns `False` if this collection corresponds to a record loaded from
 391        the API server, `True` otherwise.
 392        """
 393        return not self.committed()
 394
 395    @synchronized
 396    def committed(self):
 397        """Indicate whether this collection has an API server record
 398
 399        Returns `True` if this collection corresponds to a record loaded from
 400        the API server, `False` otherwise.
 401        """
 402        return self._committed
 403
 404    @synchronized
 405    def set_committed(self, value: bool=True):
 406        """Cache whether this collection has an API server record
 407
 408        .. ATTENTION:: Internal
 409           This method is only meant to be used by other Collection methods.
 410
 411        Set this collection's cached "committed" flag to the given
 412        value and propagates it as needed.
 413        """
 414        if value == self._committed:
 415            return
 416        if value:
 417            for k,v in self._items.items():
 418                v.set_committed(True)
 419            self._committed = True
 420        else:
 421            self._committed = False
 422            if self.parent is not None:
 423                self.parent.set_committed(False)
 424
 425    @synchronized
 426    def __iter__(self) -> Iterator[str]:
 427        """Iterate names of streams and files in this collection
 428
 429        This method does not recurse. It only iterates the contents of this
 430        collection's corresponding stream.
 431        """
 432        return iter(self._items)
 433
 434    @synchronized
 435    def __getitem__(self, k: str) -> CollectionItem:
 436        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 437
 438        This method does not recurse. If you want to search a path, use
 439        `RichCollectionBase.find` instead.
 440        """
 441        return self._items[k]
 442
 443    @synchronized
 444    def __contains__(self, k: str) -> bool:
 445        """Indicate whether this collection has an item with this name
 446
 447        This method does not recurse. It you want to check a path, use
 448        `RichCollectionBase.exists` instead.
 449        """
 450        return k in self._items
 451
 452    @synchronized
 453    def __len__(self):
 454        """Get the number of items directly contained in this collection
 455
 456        This method does not recurse. It only counts the streams and files
 457        in this collection's corresponding stream.
 458        """
 459        return len(self._items)
 460
 461    @must_be_writable
 462    @synchronized
 463    def __delitem__(self, p: str) -> None:
 464        """Delete an item from this collection's stream
 465
 466        This method does not recurse. If you want to remove an item by a
 467        path, use `RichCollectionBase.remove` instead.
 468        """
 469        del self._items[p]
 470        self.set_committed(False)
 471        self.notify(DEL, self, p, None)
 472
 473    @synchronized
 474    def keys(self) -> Iterator[str]:
 475        """Iterate names of streams and files in this collection
 476
 477        This method does not recurse. It only iterates the contents of this
 478        collection's corresponding stream.
 479        """
 480        return self._items.keys()
 481
 482    @synchronized
 483    def values(self) -> List[CollectionItem]:
 484        """Get a list of objects in this collection's stream
 485
 486        The return value includes a `Subcollection` for every stream, and an
 487        `arvados.arvfile.ArvadosFile` for every file, directly within this
 488        collection's stream.  This method does not recurse.
 489        """
 490        return list(self._items.values())
 491
 492    @synchronized
 493    def items(self) -> List[Tuple[str, CollectionItem]]:
 494        """Get a list of `(name, object)` tuples from this collection's stream
 495
 496        The return value includes a `Subcollection` for every stream, and an
 497        `arvados.arvfile.ArvadosFile` for every file, directly within this
 498        collection's stream.  This method does not recurse.
 499        """
 500        return list(self._items.items())
 501
 502    def exists(self, path: str) -> bool:
 503        """Indicate whether this collection includes an item at `path`
 504
 505        This method returns `True` if `path` refers to a stream or file within
 506        this collection, else `False`.
 507
 508        Arguments:
 509
 510        * path: str --- The path to check for existence within this collection
 511        """
 512        return self.find(path) is not None
 513
 514    @must_be_writable
 515    @synchronized
 516    def remove(self, path: str, recursive: bool=False) -> None:
 517        """Remove the file or stream at `path`
 518
 519        Arguments:
 520
 521        * path: str --- The path of the item to remove from the collection
 522
 523        * recursive: bool --- Controls the method's behavior if `path` refers
 524          to a nonempty stream. If `False` (the default), this method raises
 525          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 526          items under the stream.
 527        """
 528        if not path:
 529            raise errors.ArgumentError("Parameter 'path' is empty.")
 530
 531        pathcomponents = path.split("/", 1)
 532        item = self._items.get(pathcomponents[0])
 533        if item is None:
 534            raise IOError(errno.ENOENT, "File not found", path)
 535        if len(pathcomponents) == 1:
 536            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 537                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 538            deleteditem = self._items[pathcomponents[0]]
 539            del self._items[pathcomponents[0]]
 540            self.set_committed(False)
 541            self.notify(DEL, self, pathcomponents[0], deleteditem)
 542        else:
 543            item.remove(pathcomponents[1], recursive=recursive)
 544
 545    def _clonefrom(self, source):
 546        for k,v in source.items():
 547            self._items[k] = v.clone(self, k)
 548
 549    def clone(self):
 550        raise NotImplementedError()
 551
 552    @must_be_writable
 553    @synchronized
 554    def add(
 555            self,
 556            source_obj: CollectionItem,
 557            target_name: str,
 558            overwrite: bool=False,
 559            reparent: bool=False,
 560    ) -> None:
 561        """Copy or move a file or subcollection object to this collection
 562
 563        Arguments:
 564
 565        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 566          to add to this collection
 567
 568        * target_name: str --- The path inside this collection where
 569          `source_obj` should be added.
 570
 571        * overwrite: bool --- Controls the behavior of this method when the
 572          collection already contains an object at `target_name`. If `False`
 573          (the default), this method will raise `FileExistsError`. If `True`,
 574          the object at `target_name` will be replaced with `source_obj`.
 575
 576        * reparent: bool --- Controls whether this method copies or moves
 577          `source_obj`. If `False` (the default), `source_obj` is copied into
 578          this collection. If `True`, `source_obj` is moved into this
 579          collection.
 580        """
 581        if target_name in self and not overwrite:
 582            raise IOError(errno.EEXIST, "File already exists", target_name)
 583
 584        modified_from = None
 585        if target_name in self:
 586            modified_from = self[target_name]
 587
 588        # Actually make the move or copy.
 589        if reparent:
 590            source_obj._reparent(self, target_name)
 591            item = source_obj
 592        else:
 593            item = source_obj.clone(self, target_name)
 594
 595        self._items[target_name] = item
 596        self.set_committed(False)
 597        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 598            self.set_has_remote_blocks(True)
 599
 600        if modified_from:
 601            self.notify(MOD, self, target_name, (modified_from, item))
 602        else:
 603            self.notify(ADD, self, target_name, item)
 604
 605    def _get_src_target(self, source, target_path, source_collection, create_dest):
 606        if source_collection is None:
 607            source_collection = self
 608
 609        # Find the object
 610        if isinstance(source, str):
 611            source_obj = source_collection.find(source)
 612            if source_obj is None:
 613                raise IOError(errno.ENOENT, "File not found", source)
 614            sourcecomponents = source.split("/")
 615        else:
 616            source_obj = source
 617            sourcecomponents = None
 618
 619        # Find parent collection the target path
 620        targetcomponents = target_path.split("/")
 621
 622        # Determine the name to use.
 623        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 624
 625        if not target_name:
 626            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 627
 628        if create_dest:
 629            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 630        else:
 631            if len(targetcomponents) > 1:
 632                target_dir = self.find("/".join(targetcomponents[0:-1]))
 633            else:
 634                target_dir = self
 635
 636        if target_dir is None:
 637            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 638
 639        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 640            target_dir = target_dir[target_name]
 641            target_name = sourcecomponents[-1]
 642
 643        return (source_obj, target_dir, target_name)
 644
 645    @must_be_writable
 646    @synchronized
 647    def copy(
 648            self,
 649            source: Union[str, CollectionItem],
 650            target_path: str,
 651            source_collection: Optional['RichCollectionBase']=None,
 652            overwrite: bool=False,
 653    ) -> None:
 654        """Copy a file or subcollection object to this collection
 655
 656        Arguments:
 657
 658        * source: str | arvados.arvfile.ArvadosFile |
 659          arvados.collection.Subcollection --- The file or subcollection to
 660          add to this collection. If `source` is a str, the object will be
 661          found by looking up this path from `source_collection` (see
 662          below).
 663
 664        * target_path: str --- The path inside this collection where the
 665          source object should be added.
 666
 667        * source_collection: arvados.collection.Collection | None --- The
 668          collection to find the source object from when `source` is a
 669          path. Defaults to the current collection (`self`).
 670
 671        * overwrite: bool --- Controls the behavior of this method when the
 672          collection already contains an object at `target_path`. If `False`
 673          (the default), this method will raise `FileExistsError`. If `True`,
 674          the object at `target_path` will be replaced with `source_obj`.
 675        """
 676        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 677        target_dir.add(source_obj, target_name, overwrite, False)
 678
 679    @must_be_writable
 680    @synchronized
 681    def rename(
 682            self,
 683            source: Union[str, CollectionItem],
 684            target_path: str,
 685            source_collection: Optional['RichCollectionBase']=None,
 686            overwrite: bool=False,
 687    ) -> None:
 688        """Move a file or subcollection object to this collection
 689
 690        Arguments:
 691
 692        * source: str | arvados.arvfile.ArvadosFile |
 693          arvados.collection.Subcollection --- The file or subcollection to
 694          add to this collection. If `source` is a str, the object will be
 695          found by looking up this path from `source_collection` (see
 696          below).
 697
 698        * target_path: str --- The path inside this collection where the
 699          source object should be added.
 700
 701        * source_collection: arvados.collection.Collection | None --- The
 702          collection to find the source object from when `source` is a
 703          path. Defaults to the current collection (`self`).
 704
 705        * overwrite: bool --- Controls the behavior of this method when the
 706          collection already contains an object at `target_path`. If `False`
 707          (the default), this method will raise `FileExistsError`. If `True`,
 708          the object at `target_path` will be replaced with `source_obj`.
 709        """
 710        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 711        if not source_obj.writable():
 712            raise IOError(errno.EROFS, "Source collection is read only", source)
 713        target_dir.add(source_obj, target_name, overwrite, True)
 714
 715    def portable_manifest_text(self, stream_name: str=".") -> str:
 716        """Get the portable manifest text for this collection
 717
 718        The portable manifest text is normalized, and does not include access
 719        tokens. This method does not flush outstanding blocks to Keep.
 720
 721        Arguments:
 722
 723        * stream_name: str --- The name to use for this collection's stream in
 724          the generated manifest. Default `'.'`.
 725        """
 726        return self._get_manifest_text(stream_name, True, True)
 727
 728    @synchronized
 729    def manifest_text(
 730            self,
 731            stream_name: str=".",
 732            strip: bool=False,
 733            normalize: bool=False,
 734            only_committed: bool=False,
 735    ) -> str:
 736        """Get the manifest text for this collection
 737
 738        Arguments:
 739
 740        * stream_name: str --- The name to use for this collection's stream in
 741          the generated manifest. Default `'.'`.
 742
 743        * strip: bool --- Controls whether or not the returned manifest text
 744          includes access tokens. If `False` (the default), the manifest text
 745          will include access tokens. If `True`, the manifest text will not
 746          include access tokens.
 747
 748        * normalize: bool --- Controls whether or not the returned manifest
 749          text is normalized. Default `False`.
 750
 751        * only_committed: bool --- Controls whether or not this method uploads
 752          pending data to Keep before building and returning the manifest text.
 753          If `False` (the default), this method will finish uploading all data
 754          to Keep, then return the final manifest. If `True`, this method will
 755          build and return a manifest that only refers to the data that has
 756          finished uploading at the time this method was called.
 757        """
 758        if not only_committed:
 759            self._my_block_manager().commit_all()
 760        return self._get_manifest_text(stream_name, strip, normalize,
 761                                       only_committed=only_committed)
 762
 763    @synchronized
 764    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 765        """Get the manifest text for this collection, sub collections and files.
 766
 767        :stream_name:
 768          Name to use for this stream (directory)
 769
 770        :strip:
 771          If True, remove signing tokens from block locators if present.
 772          If False (default), block locators are left unchanged.
 773
 774        :normalize:
 775          If True, always export the manifest text in normalized form
 776          even if the Collection is not modified.  If False (default) and the collection
 777          is not modified, return the original manifest text even if it is not
 778          in normalized form.
 779
 780        :only_committed:
 781          If True, only include blocks that were already committed to Keep.
 782
 783        """
 784
 785        if not self.committed() or self._manifest_text is None or normalize:
 786            stream = {}
 787            buf = []
 788            sorted_keys = sorted(self.keys())
 789            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 790                # Create a stream per file `k`
 791                arvfile = self[filename]
 792                filestream = []
 793                for segment in arvfile.segments():
 794                    loc = segment.locator
 795                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 796                        if only_committed:
 797                            continue
 798                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 799                    if strip:
 800                        loc = KeepLocator(loc).stripped()
 801                    filestream.append(streams.LocatorAndRange(
 802                        loc,
 803                        KeepLocator(loc).size,
 804                        segment.segment_offset,
 805                        segment.range_size,
 806                    ))
 807                stream[filename] = filestream
 808            if stream:
 809                buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
 810            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 811                buf.append(self[dirname].manifest_text(
 812                    stream_name=os.path.join(stream_name, dirname),
 813                    strip=strip, normalize=True, only_committed=only_committed))
 814            return "".join(buf)
 815        else:
 816            if strip:
 817                return self.stripped_manifest()
 818            else:
 819                return self._manifest_text
 820
 821    @synchronized
 822    def _copy_remote_blocks(self, remote_blocks={}):
 823        """Scan through the entire collection and ask Keep to copy remote blocks.
 824
 825        When accessing a remote collection, blocks will have a remote signature
 826        (+R instead of +A). Collect these signatures and request Keep to copy the
 827        blocks to the local cluster, returning local (+A) signatures.
 828
 829        :remote_blocks:
 830          Shared cache of remote to local block mappings. This is used to avoid
 831          doing extra work when blocks are shared by more than one file in
 832          different subdirectories.
 833
 834        """
 835        for item in self:
 836            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 837        return remote_blocks
 838
 839    @synchronized
 840    def diff(
 841            self,
 842            end_collection: 'RichCollectionBase',
 843            prefix: str=".",
 844            holding_collection: Optional['Collection']=None,
 845    ) -> ChangeList:
 846        """Build a list of differences between this collection and another
 847
 848        Arguments:
 849
 850        * end_collection: arvados.collection.RichCollectionBase --- A
 851          collection object with the desired end state. The returned diff
 852          list will describe how to go from the current collection object
 853          `self` to `end_collection`.
 854
 855        * prefix: str --- The name to use for this collection's stream in
 856          the diff list. Default `'.'`.
 857
 858        * holding_collection: arvados.collection.Collection | None --- A
 859          collection object used to hold objects for the returned diff
 860          list. By default, a new empty collection is created.
 861        """
 862        changes = []
 863        if holding_collection is None:
 864            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 865        for k in self:
 866            if k not in end_collection:
 867               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 868        for k in end_collection:
 869            if k in self:
 870                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 871                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 872                elif end_collection[k] != self[k]:
 873                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 874                else:
 875                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 876            else:
 877                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 878        return changes
 879
 880    @must_be_writable
 881    @synchronized
 882    def apply(self, changes: ChangeList) -> None:
 883        """Apply a list of changes from to this collection
 884
 885        This method takes a list of changes generated by
 886        `RichCollectionBase.diff` and applies it to this
 887        collection. Afterward, the state of this collection object will
 888        match the state of `end_collection` passed to `diff`. If a change
 889        conflicts with a local change, it will be saved to an alternate path
 890        indicating the conflict.
 891
 892        Arguments:
 893
 894        * changes: arvados.collection.ChangeList --- The list of differences
 895          generated by `RichCollectionBase.diff`.
 896        """
 897        if changes:
 898            self.set_committed(False)
 899        for change in changes:
 900            event_type = change[0]
 901            path = change[1]
 902            initial = change[2]
 903            local = self.find(path)
 904            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 905                                                                    time.gmtime()))
 906            if event_type == ADD:
 907                if local is None:
 908                    # No local file at path, safe to copy over new file
 909                    self.copy(initial, path)
 910                elif local is not None and local != initial:
 911                    # There is already local file and it is different:
 912                    # save change to conflict file.
 913                    self.copy(initial, conflictpath)
 914            elif event_type == MOD or event_type == TOK:
 915                final = change[3]
 916                if local == initial:
 917                    # Local matches the "initial" item so it has not
 918                    # changed locally and is safe to update.
 919                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 920                        # Replace contents of local file with new contents
 921                        local.replace_contents(final)
 922                    else:
 923                        # Overwrite path with new item; this can happen if
 924                        # path was a file and is now a collection or vice versa
 925                        self.copy(final, path, overwrite=True)
 926                else:
 927                    # Local is missing (presumably deleted) or local doesn't
 928                    # match the "start" value, so save change to conflict file
 929                    self.copy(final, conflictpath)
 930            elif event_type == DEL:
 931                if local == initial:
 932                    # Local item matches "initial" value, so it is safe to remove.
 933                    self.remove(path, recursive=True)
 934                # else, the file is modified or already removed, in either
 935                # case we don't want to try to remove it.
 936
 937    def portable_data_hash(self) -> str:
 938        """Get the portable data hash for this collection's manifest"""
 939        if self._manifest_locator and self.committed():
 940            # If the collection is already saved on the API server, and it's committed
 941            # then return API server's PDH response.
 942            return self._portable_data_hash
 943        else:
 944            stripped = self.portable_manifest_text().encode()
 945            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 946
 947    @synchronized
 948    def subscribe(self, callback: ChangeCallback) -> None:
 949        """Set a notify callback for changes to this collection
 950
 951        Arguments:
 952
 953        * callback: arvados.collection.ChangeCallback --- The callable to
 954          call each time the collection is changed.
 955        """
 956        if self._callback is None:
 957            self._callback = callback
 958        else:
 959            raise errors.ArgumentError("A callback is already set on this collection.")
 960
 961    @synchronized
 962    def unsubscribe(self) -> None:
 963        """Remove any notify callback set for changes to this collection"""
 964        if self._callback is not None:
 965            self._callback = None
 966
 967    @synchronized
 968    def notify(
 969            self,
 970            event: ChangeType,
 971            collection: 'RichCollectionBase',
 972            name: str,
 973            item: CollectionItem,
 974    ) -> None:
 975        """Notify any subscribed callback about a change to this collection
 976
 977        .. ATTENTION:: Internal
 978           This method is only meant to be used by other Collection methods.
 979
 980        If a callback has been registered with `RichCollectionBase.subscribe`,
 981        it will be called with information about a change to this collection.
 982        Then this notification will be propagated to this collection's root.
 983
 984        Arguments:
 985
 986        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 987          the collection.
 988
 989        * collection: arvados.collection.RichCollectionBase --- The
 990          collection that was modified.
 991
 992        * name: str --- The name of the file or stream within `collection` that
 993          was modified.
 994
 995        * item: arvados.arvfile.ArvadosFile |
 996          arvados.collection.Subcollection --- The new contents at `name`
 997          within `collection`.
 998        """
 999        if self._callback:
1000            self._callback(event, collection, name, item)
1001        self.root_collection().notify(event, collection, name, item)
1002
1003    @synchronized
1004    def __eq__(self, other: Any) -> bool:
1005        """Indicate whether this collection object is equal to another"""
1006        if other is self:
1007            return True
1008        if not isinstance(other, RichCollectionBase):
1009            return False
1010        if len(self._items) != len(other):
1011            return False
1012        for k in self._items:
1013            if k not in other:
1014                return False
1015            if self._items[k] != other[k]:
1016                return False
1017        return True
1018
1019    def __ne__(self, other: Any) -> bool:
1020        """Indicate whether this collection object is not equal to another"""
1021        return not self.__eq__(other)
1022
1023    @synchronized
1024    def flush(self) -> None:
1025        """Upload any pending data to Keep"""
1026        for e in self.values():
1027            e.flush()

Base class for Collection classes

RichCollectionBase(parent=None)
163    def __init__(self, parent=None):
164        self.parent = parent
165        self._committed = False
166        self._has_remote_blocks = False
167        self._callback = None
168        self._items = {}
parent
def writable(self) -> bool:
179    def writable(self) -> bool:
180        """Indicate whether this collection object can be modified
181
182        This method returns `False` if this object is a `CollectionReader`,
183        else `True`.
184        """
185        raise NotImplementedError()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def root_collection(self) -> Collection:
187    def root_collection(self) -> 'Collection':
188        """Get this collection's root collection object
189
190        If you open a subcollection with `Collection.find`, calling this method
191        on that subcollection returns the source Collection object.
192        """
193        raise NotImplementedError()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def stream_name(self) -> str:
195    def stream_name(self) -> str:
196        """Get the name of the manifest stream represented by this collection
197
198        If you open a subcollection with `Collection.find`, calling this method
199        on that subcollection returns the name of the stream you opened.
200        """
201        raise NotImplementedError()

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def has_remote_blocks(self) -> bool:
203    @synchronized
204    def has_remote_blocks(self) -> bool:
205        """Indiciate whether the collection refers to remote data
206
207        Returns `True` if the collection manifest includes any Keep locators
208        with a remote hint (`+R`), else `False`.
209        """
210        if self._has_remote_blocks:
211            return True
212        for item in self:
213            if self[item].has_remote_blocks():
214                return True
215        return False

Indiciate whether the collection refers to remote data

Returns True if the collection manifest includes any Keep locators with a remote hint (+R), else False.

@synchronized
def set_has_remote_blocks(self, val: bool) -> None:
217    @synchronized
218    def set_has_remote_blocks(self, val: bool) -> None:
219        """Cache whether this collection refers to remote blocks
220
221        .. ATTENTION:: Internal
222           This method is only meant to be used by other Collection methods.
223
224        Set this collection's cached "has remote blocks" flag to the given
225        value.
226        """
227        self._has_remote_blocks = val
228        if self.parent:
229            self.parent.set_has_remote_blocks(val)

Cache whether this collection refers to remote blocks

Set this collection’s cached “has remote blocks” flag to the given value.

@must_be_writable
@synchronized
def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
231    @must_be_writable
232    @synchronized
233    def find_or_create(
234            self,
235            path: str,
236            create_type: CreateType,
237    ) -> CollectionItem:
238        """Get the item at the given path, creating it if necessary
239
240        If `path` refers to a stream in this collection, returns a
241        corresponding `Subcollection` object. If `path` refers to a file in
242        this collection, returns a corresponding
243        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
244        this collection, then this method creates a new object and returns
245        it, creating parent streams as needed. The type of object created is
246        determined by the value of `create_type`.
247
248        Arguments:
249
250        * path: str --- The path to find or create within this collection.
251
252        * create_type: Literal[COLLECTION, FILE] --- The type of object to
253          create at `path` if one does not exist. Passing `COLLECTION`
254          creates a stream and returns the corresponding
255          `Subcollection`. Passing `FILE` creates a new file and returns the
256          corresponding `arvados.arvfile.ArvadosFile`.
257        """
258        pathcomponents = path.split("/", 1)
259        if pathcomponents[0]:
260            item = self._items.get(pathcomponents[0])
261            if len(pathcomponents) == 1:
262                if item is None:
263                    # create new file
264                    if create_type == COLLECTION:
265                        item = Subcollection(self, pathcomponents[0])
266                    else:
267                        item = ArvadosFile(self, pathcomponents[0])
268                    self._items[pathcomponents[0]] = item
269                    self.set_committed(False)
270                    self.notify(ADD, self, pathcomponents[0], item)
271                return item
272            else:
273                if item is None:
274                    # create new collection
275                    item = Subcollection(self, pathcomponents[0])
276                    self._items[pathcomponents[0]] = item
277                    self.set_committed(False)
278                    self.notify(ADD, self, pathcomponents[0], item)
279                if isinstance(item, RichCollectionBase):
280                    return item.find_or_create(pathcomponents[1], create_type)
281                else:
282                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
283        else:
284            return self

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

@synchronized
def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
286    @synchronized
287    def find(self, path: str) -> CollectionItem:
288        """Get the item at the given path
289
290        If `path` refers to a stream in this collection, returns a
291        corresponding `Subcollection` object. If `path` refers to a file in
292        this collection, returns a corresponding
293        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
294        this collection, then this method raises `NotADirectoryError`.
295
296        Arguments:
297
298        * path: str --- The path to find or create within this collection.
299        """
300        if not path:
301            raise errors.ArgumentError("Parameter 'path' is empty.")
302
303        pathcomponents = path.split("/", 1)
304        if pathcomponents[0] == '':
305            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
306
307        item = self._items.get(pathcomponents[0])
308        if item is None:
309            return None
310        elif len(pathcomponents) == 1:
311            return item
312        else:
313            if isinstance(item, RichCollectionBase):
314                if pathcomponents[1]:
315                    return item.find(pathcomponents[1])
316                else:
317                    return item
318            else:
319                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
@synchronized
def mkdirs(self, path: str) -> Subcollection:
321    @synchronized
322    def mkdirs(self, path: str) -> 'Subcollection':
323        """Create and return a subcollection at `path`
324
325        If `path` exists within this collection, raises `FileExistsError`.
326        Otherwise, creates a stream at that path and returns the
327        corresponding `Subcollection`.
328        """
329        if self.find(path) != None:
330            raise IOError(errno.EEXIST, "Directory or file exists", path)
331
332        return self.find_or_create(path, COLLECTION)

Create and return a subcollection at path

If path exists within this collection, raises FileExistsError. Otherwise, creates a stream at that path and returns the corresponding Subcollection.

def open( self, path: str, mode: str = 'r', encoding: Optional[str] = None) -> <class 'IO'>:
334    def open(
335            self,
336            path: str,
337            mode: str="r",
338            encoding: Optional[str]=None
339    ) -> IO:
340        """Open a file-like object within the collection
341
342        This method returns a file-like object that can read and/or write the
343        file located at `path` within the collection. If you attempt to write
344        a `path` that does not exist, the file is created with `find_or_create`.
345        If the file cannot be opened for any other reason, this method raises
346        `OSError` with an appropriate errno.
347
348        Arguments:
349
350        * path: str --- The path of the file to open within this collection
351
352        * mode: str --- The mode to open this file. Supports all the same
353          values as `builtins.open`.
354
355        * encoding: str | None --- The text encoding of the file. Only used
356          when the file is opened in text mode. The default is
357          platform-dependent.
358
359        """
360        if not re.search(r'^[rwa][bt]?\+?$', mode):
361            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
362
363        if mode[0] == 'r' and '+' not in mode:
364            fclass = ArvadosFileReader
365            arvfile = self.find(path)
366        elif not self.writable():
367            raise IOError(errno.EROFS, "Collection is read only")
368        else:
369            fclass = ArvadosFileWriter
370            arvfile = self.find_or_create(path, FILE)
371
372        if arvfile is None:
373            raise IOError(errno.ENOENT, "File not found", path)
374        if not isinstance(arvfile, ArvadosFile):
375            raise IOError(errno.EISDIR, "Is a directory", path)
376
377        if mode[0] == 'w':
378            arvfile.truncate(0)
379
380        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
381        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
382        if 'b' not in mode:
383            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
384            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
385        return f

Open a file-like object within the collection

This method returns a file-like object that can read and/or write the file located at path within the collection. If you attempt to write a path that does not exist, the file is created with find_or_create. If the file cannot be opened for any other reason, this method raises OSError with an appropriate errno.

Arguments:

  • path: str — The path of the file to open within this collection

  • mode: str — The mode to open this file. Supports all the same values as builtins.open.

  • encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.

def modified(self) -> bool:
387    def modified(self) -> bool:
388        """Indicate whether this collection has an API server record
389
390        Returns `False` if this collection corresponds to a record loaded from
391        the API server, `True` otherwise.
392        """
393        return not self.committed()

Indicate whether this collection has an API server record

Returns False if this collection corresponds to a record loaded from the API server, True otherwise.

@synchronized
def committed(self):
395    @synchronized
396    def committed(self):
397        """Indicate whether this collection has an API server record
398
399        Returns `True` if this collection corresponds to a record loaded from
400        the API server, `False` otherwise.
401        """
402        return self._committed

Indicate whether this collection has an API server record

Returns True if this collection corresponds to a record loaded from the API server, False otherwise.

@synchronized
def set_committed(self, value: bool = True):
404    @synchronized
405    def set_committed(self, value: bool=True):
406        """Cache whether this collection has an API server record
407
408        .. ATTENTION:: Internal
409           This method is only meant to be used by other Collection methods.
410
411        Set this collection's cached "committed" flag to the given
412        value and propagates it as needed.
413        """
414        if value == self._committed:
415            return
416        if value:
417            for k,v in self._items.items():
418                v.set_committed(True)
419            self._committed = True
420        else:
421            self._committed = False
422            if self.parent is not None:
423                self.parent.set_committed(False)

Cache whether this collection has an API server record

Set this collection’s cached “committed” flag to the given value and propagates it as needed.

@synchronized
def keys(self) -> Iterator[str]:
473    @synchronized
474    def keys(self) -> Iterator[str]:
475        """Iterate names of streams and files in this collection
476
477        This method does not recurse. It only iterates the contents of this
478        collection's corresponding stream.
479        """
480        return self._items.keys()

Iterate names of streams and files in this collection

This method does not recurse. It only iterates the contents of this collection’s corresponding stream.

@synchronized
def values( self) -> List[Union[arvados.arvfile.ArvadosFile, Collection]]:
482    @synchronized
483    def values(self) -> List[CollectionItem]:
484        """Get a list of objects in this collection's stream
485
486        The return value includes a `Subcollection` for every stream, and an
487        `arvados.arvfile.ArvadosFile` for every file, directly within this
488        collection's stream.  This method does not recurse.
489        """
490        return list(self._items.values())

Get a list of objects in this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

@synchronized
def items( self) -> List[Tuple[str, Union[arvados.arvfile.ArvadosFile, Collection]]]:
492    @synchronized
493    def items(self) -> List[Tuple[str, CollectionItem]]:
494        """Get a list of `(name, object)` tuples from this collection's stream
495
496        The return value includes a `Subcollection` for every stream, and an
497        `arvados.arvfile.ArvadosFile` for every file, directly within this
498        collection's stream.  This method does not recurse.
499        """
500        return list(self._items.items())

Get a list of (name, object) tuples from this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

def exists(self, path: str) -> bool:
502    def exists(self, path: str) -> bool:
503        """Indicate whether this collection includes an item at `path`
504
505        This method returns `True` if `path` refers to a stream or file within
506        this collection, else `False`.
507
508        Arguments:
509
510        * path: str --- The path to check for existence within this collection
511        """
512        return self.find(path) is not None

Indicate whether this collection includes an item at path

This method returns True if path refers to a stream or file within this collection, else False.

Arguments:

  • path: str — The path to check for existence within this collection
@must_be_writable
@synchronized
def remove(self, path: str, recursive: bool = False) -> None:
514    @must_be_writable
515    @synchronized
516    def remove(self, path: str, recursive: bool=False) -> None:
517        """Remove the file or stream at `path`
518
519        Arguments:
520
521        * path: str --- The path of the item to remove from the collection
522
523        * recursive: bool --- Controls the method's behavior if `path` refers
524          to a nonempty stream. If `False` (the default), this method raises
525          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
526          items under the stream.
527        """
528        if not path:
529            raise errors.ArgumentError("Parameter 'path' is empty.")
530
531        pathcomponents = path.split("/", 1)
532        item = self._items.get(pathcomponents[0])
533        if item is None:
534            raise IOError(errno.ENOENT, "File not found", path)
535        if len(pathcomponents) == 1:
536            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
537                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
538            deleteditem = self._items[pathcomponents[0]]
539            del self._items[pathcomponents[0]]
540            self.set_committed(False)
541            self.notify(DEL, self, pathcomponents[0], deleteditem)
542        else:
543            item.remove(pathcomponents[1], recursive=recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

def clone(self):
549    def clone(self):
550        raise NotImplementedError()
@must_be_writable
@synchronized
def add( self, source_obj: Union[arvados.arvfile.ArvadosFile, Collection], target_name: str, overwrite: bool = False, reparent: bool = False) -> None:
552    @must_be_writable
553    @synchronized
554    def add(
555            self,
556            source_obj: CollectionItem,
557            target_name: str,
558            overwrite: bool=False,
559            reparent: bool=False,
560    ) -> None:
561        """Copy or move a file or subcollection object to this collection
562
563        Arguments:
564
565        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
566          to add to this collection
567
568        * target_name: str --- The path inside this collection where
569          `source_obj` should be added.
570
571        * overwrite: bool --- Controls the behavior of this method when the
572          collection already contains an object at `target_name`. If `False`
573          (the default), this method will raise `FileExistsError`. If `True`,
574          the object at `target_name` will be replaced with `source_obj`.
575
576        * reparent: bool --- Controls whether this method copies or moves
577          `source_obj`. If `False` (the default), `source_obj` is copied into
578          this collection. If `True`, `source_obj` is moved into this
579          collection.
580        """
581        if target_name in self and not overwrite:
582            raise IOError(errno.EEXIST, "File already exists", target_name)
583
584        modified_from = None
585        if target_name in self:
586            modified_from = self[target_name]
587
588        # Actually make the move or copy.
589        if reparent:
590            source_obj._reparent(self, target_name)
591            item = source_obj
592        else:
593            item = source_obj.clone(self, target_name)
594
595        self._items[target_name] = item
596        self.set_committed(False)
597        if not self._has_remote_blocks and source_obj.has_remote_blocks():
598            self.set_has_remote_blocks(True)
599
600        if modified_from:
601            self.notify(MOD, self, target_name, (modified_from, item))
602        else:
603            self.notify(ADD, self, target_name, item)

Copy or move a file or subcollection object to this collection

Arguments:

  • source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection

  • target_name: str — The path inside this collection where source_obj should be added.

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_name. If False (the default), this method will raise FileExistsError. If True, the object at target_name will be replaced with source_obj.

  • reparent: bool — Controls whether this method copies or moves source_obj. If False (the default), source_obj is copied into this collection. If True, source_obj is moved into this collection.

@must_be_writable
@synchronized
def copy( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
645    @must_be_writable
646    @synchronized
647    def copy(
648            self,
649            source: Union[str, CollectionItem],
650            target_path: str,
651            source_collection: Optional['RichCollectionBase']=None,
652            overwrite: bool=False,
653    ) -> None:
654        """Copy a file or subcollection object to this collection
655
656        Arguments:
657
658        * source: str | arvados.arvfile.ArvadosFile |
659          arvados.collection.Subcollection --- The file or subcollection to
660          add to this collection. If `source` is a str, the object will be
661          found by looking up this path from `source_collection` (see
662          below).
663
664        * target_path: str --- The path inside this collection where the
665          source object should be added.
666
667        * source_collection: arvados.collection.Collection | None --- The
668          collection to find the source object from when `source` is a
669          path. Defaults to the current collection (`self`).
670
671        * overwrite: bool --- Controls the behavior of this method when the
672          collection already contains an object at `target_path`. If `False`
673          (the default), this method will raise `FileExistsError`. If `True`,
674          the object at `target_path` will be replaced with `source_obj`.
675        """
676        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
677        target_dir.add(source_obj, target_name, overwrite, False)

Copy a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

@must_be_writable
@synchronized
def rename( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
679    @must_be_writable
680    @synchronized
681    def rename(
682            self,
683            source: Union[str, CollectionItem],
684            target_path: str,
685            source_collection: Optional['RichCollectionBase']=None,
686            overwrite: bool=False,
687    ) -> None:
688        """Move a file or subcollection object to this collection
689
690        Arguments:
691
692        * source: str | arvados.arvfile.ArvadosFile |
693          arvados.collection.Subcollection --- The file or subcollection to
694          add to this collection. If `source` is a str, the object will be
695          found by looking up this path from `source_collection` (see
696          below).
697
698        * target_path: str --- The path inside this collection where the
699          source object should be added.
700
701        * source_collection: arvados.collection.Collection | None --- The
702          collection to find the source object from when `source` is a
703          path. Defaults to the current collection (`self`).
704
705        * overwrite: bool --- Controls the behavior of this method when the
706          collection already contains an object at `target_path`. If `False`
707          (the default), this method will raise `FileExistsError`. If `True`,
708          the object at `target_path` will be replaced with `source_obj`.
709        """
710        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
711        if not source_obj.writable():
712            raise IOError(errno.EROFS, "Source collection is read only", source)
713        target_dir.add(source_obj, target_name, overwrite, True)

Move a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

def portable_manifest_text(self, stream_name: str = '.') -> str:
715    def portable_manifest_text(self, stream_name: str=".") -> str:
716        """Get the portable manifest text for this collection
717
718        The portable manifest text is normalized, and does not include access
719        tokens. This method does not flush outstanding blocks to Keep.
720
721        Arguments:
722
723        * stream_name: str --- The name to use for this collection's stream in
724          the generated manifest. Default `'.'`.
725        """
726        return self._get_manifest_text(stream_name, True, True)

Get the portable manifest text for this collection

The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.
@synchronized
def manifest_text( self, stream_name: str = '.', strip: bool = False, normalize: bool = False, only_committed: bool = False) -> str:
728    @synchronized
729    def manifest_text(
730            self,
731            stream_name: str=".",
732            strip: bool=False,
733            normalize: bool=False,
734            only_committed: bool=False,
735    ) -> str:
736        """Get the manifest text for this collection
737
738        Arguments:
739
740        * stream_name: str --- The name to use for this collection's stream in
741          the generated manifest. Default `'.'`.
742
743        * strip: bool --- Controls whether or not the returned manifest text
744          includes access tokens. If `False` (the default), the manifest text
745          will include access tokens. If `True`, the manifest text will not
746          include access tokens.
747
748        * normalize: bool --- Controls whether or not the returned manifest
749          text is normalized. Default `False`.
750
751        * only_committed: bool --- Controls whether or not this method uploads
752          pending data to Keep before building and returning the manifest text.
753          If `False` (the default), this method will finish uploading all data
754          to Keep, then return the final manifest. If `True`, this method will
755          build and return a manifest that only refers to the data that has
756          finished uploading at the time this method was called.
757        """
758        if not only_committed:
759            self._my_block_manager().commit_all()
760        return self._get_manifest_text(stream_name, strip, normalize,
761                                       only_committed=only_committed)

Get the manifest text for this collection

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.

  • strip: bool — Controls whether or not the returned manifest text includes access tokens. If False (the default), the manifest text will include access tokens. If True, the manifest text will not include access tokens.

  • normalize: bool — Controls whether or not the returned manifest text is normalized. Default False.

  • only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If False (the default), this method will finish uploading all data to Keep, then return the final manifest. If True, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.

@synchronized
def diff( self, end_collection: RichCollectionBase, prefix: str = '.', holding_collection: Optional[Collection] = None) -> List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]:
839    @synchronized
840    def diff(
841            self,
842            end_collection: 'RichCollectionBase',
843            prefix: str=".",
844            holding_collection: Optional['Collection']=None,
845    ) -> ChangeList:
846        """Build a list of differences between this collection and another
847
848        Arguments:
849
850        * end_collection: arvados.collection.RichCollectionBase --- A
851          collection object with the desired end state. The returned diff
852          list will describe how to go from the current collection object
853          `self` to `end_collection`.
854
855        * prefix: str --- The name to use for this collection's stream in
856          the diff list. Default `'.'`.
857
858        * holding_collection: arvados.collection.Collection | None --- A
859          collection object used to hold objects for the returned diff
860          list. By default, a new empty collection is created.
861        """
862        changes = []
863        if holding_collection is None:
864            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
865        for k in self:
866            if k not in end_collection:
867               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
868        for k in end_collection:
869            if k in self:
870                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
871                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
872                elif end_collection[k] != self[k]:
873                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
874                else:
875                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
876            else:
877                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
878        return changes

Build a list of differences between this collection and another

Arguments:

  • end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object self to end_collection.

  • prefix: str — The name to use for this collection’s stream in the diff list. Default '.'.

  • holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.

@must_be_writable
@synchronized
def apply( self, changes: List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]) -> None:
880    @must_be_writable
881    @synchronized
882    def apply(self, changes: ChangeList) -> None:
883        """Apply a list of changes from to this collection
884
885        This method takes a list of changes generated by
886        `RichCollectionBase.diff` and applies it to this
887        collection. Afterward, the state of this collection object will
888        match the state of `end_collection` passed to `diff`. If a change
889        conflicts with a local change, it will be saved to an alternate path
890        indicating the conflict.
891
892        Arguments:
893
894        * changes: arvados.collection.ChangeList --- The list of differences
895          generated by `RichCollectionBase.diff`.
896        """
897        if changes:
898            self.set_committed(False)
899        for change in changes:
900            event_type = change[0]
901            path = change[1]
902            initial = change[2]
903            local = self.find(path)
904            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
905                                                                    time.gmtime()))
906            if event_type == ADD:
907                if local is None:
908                    # No local file at path, safe to copy over new file
909                    self.copy(initial, path)
910                elif local is not None and local != initial:
911                    # There is already local file and it is different:
912                    # save change to conflict file.
913                    self.copy(initial, conflictpath)
914            elif event_type == MOD or event_type == TOK:
915                final = change[3]
916                if local == initial:
917                    # Local matches the "initial" item so it has not
918                    # changed locally and is safe to update.
919                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
920                        # Replace contents of local file with new contents
921                        local.replace_contents(final)
922                    else:
923                        # Overwrite path with new item; this can happen if
924                        # path was a file and is now a collection or vice versa
925                        self.copy(final, path, overwrite=True)
926                else:
927                    # Local is missing (presumably deleted) or local doesn't
928                    # match the "start" value, so save change to conflict file
929                    self.copy(final, conflictpath)
930            elif event_type == DEL:
931                if local == initial:
932                    # Local item matches "initial" value, so it is safe to remove.
933                    self.remove(path, recursive=True)
934                # else, the file is modified or already removed, in either
935                # case we don't want to try to remove it.

Apply a list of changes from to this collection

This method takes a list of changes generated by RichCollectionBase.diff and applies it to this collection. Afterward, the state of this collection object will match the state of end_collection passed to diff. If a change conflicts with a local change, it will be saved to an alternate path indicating the conflict.

Arguments:

def portable_data_hash(self) -> str:
937    def portable_data_hash(self) -> str:
938        """Get the portable data hash for this collection's manifest"""
939        if self._manifest_locator and self.committed():
940            # If the collection is already saved on the API server, and it's committed
941            # then return API server's PDH response.
942            return self._portable_data_hash
943        else:
944            stripped = self.portable_manifest_text().encode()
945            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))

Get the portable data hash for this collection’s manifest

@synchronized
def subscribe( self, callback: Callable[[Literal['add', 'del', 'mod', 'tok'], Collection, str, Union[arvados.arvfile.ArvadosFile, Collection]], object]) -> None:
947    @synchronized
948    def subscribe(self, callback: ChangeCallback) -> None:
949        """Set a notify callback for changes to this collection
950
951        Arguments:
952
953        * callback: arvados.collection.ChangeCallback --- The callable to
954          call each time the collection is changed.
955        """
956        if self._callback is None:
957            self._callback = callback
958        else:
959            raise errors.ArgumentError("A callback is already set on this collection.")

Set a notify callback for changes to this collection

Arguments:

@synchronized
def unsubscribe(self) -> None:
961    @synchronized
962    def unsubscribe(self) -> None:
963        """Remove any notify callback set for changes to this collection"""
964        if self._callback is not None:
965            self._callback = None

Remove any notify callback set for changes to this collection

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
 967    @synchronized
 968    def notify(
 969            self,
 970            event: ChangeType,
 971            collection: 'RichCollectionBase',
 972            name: str,
 973            item: CollectionItem,
 974    ) -> None:
 975        """Notify any subscribed callback about a change to this collection
 976
 977        .. ATTENTION:: Internal
 978           This method is only meant to be used by other Collection methods.
 979
 980        If a callback has been registered with `RichCollectionBase.subscribe`,
 981        it will be called with information about a change to this collection.
 982        Then this notification will be propagated to this collection's root.
 983
 984        Arguments:
 985
 986        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 987          the collection.
 988
 989        * collection: arvados.collection.RichCollectionBase --- The
 990          collection that was modified.
 991
 992        * name: str --- The name of the file or stream within `collection` that
 993          was modified.
 994
 995        * item: arvados.arvfile.ArvadosFile |
 996          arvados.collection.Subcollection --- The new contents at `name`
 997          within `collection`.
 998        """
 999        if self._callback:
1000            self._callback(event, collection, name, item)
1001        self.root_collection().notify(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

@synchronized
def flush(self) -> None:
1023    @synchronized
1024    def flush(self) -> None:
1025        """Upload any pending data to Keep"""
1026        for e in self.values():
1027            e.flush()

Upload any pending data to Keep

Inherited Members
CollectionBase
stripped_manifest
class Collection(RichCollectionBase):
1030class Collection(RichCollectionBase):
1031    """Read and manipulate an Arvados collection
1032
1033    This class provides a high-level interface to create, read, and update
1034    Arvados collections and their contents. Refer to the Arvados Python SDK
1035    cookbook for [an introduction to using the Collection class][cookbook].
1036
1037    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1038    """
1039
1040    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1041                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1042                 keep_client: Optional['arvados.keep.KeepClient']=None,
1043                 num_retries: int=10,
1044                 parent: Optional['Collection']=None,
1045                 apiconfig: Optional[Mapping[str, str]]=None,
1046                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1047                 replication_desired: Optional[int]=None,
1048                 storage_classes_desired: Optional[List[str]]=None,
1049                 put_threads: Optional[int]=None):
1050        """Initialize a Collection object
1051
1052        Arguments:
1053
1054        * manifest_locator_or_text: str | None --- This string can contain a
1055          collection manifest text, portable data hash, or UUID. When given a
1056          portable data hash or UUID, this instance will load a collection
1057          record from the API server. Otherwise, this instance will represent a
1058          new collection without an API server record. The default value `None`
1059          instantiates a new collection with an empty manifest.
1060
1061        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1062          Arvados API client object this instance uses to make requests. If
1063          none is given, this instance creates its own client using the
1064          settings from `apiconfig` (see below). If your client instantiates
1065          many Collection objects, you can help limit memory utilization by
1066          calling `arvados.api.api` to construct an
1067          `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1068          for every Collection.
1069
1070        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1071          object this instance uses to make requests. If none is given, this
1072          instance creates its own client using its `api_client`.
1073
1074        * num_retries: int --- The number of times that client requests are
1075          retried. Default 10.
1076
1077        * parent: arvados.collection.Collection | None --- The parent Collection
1078          object of this instance, if any. This argument is primarily used by
1079          other Collection methods; user client code shouldn't need to use it.
1080
1081        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1082          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1083          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1084          Collection object constructs one from these settings. If no
1085          mapping is provided, calls `arvados.config.settings` to get these
1086          parameters from user configuration.
1087
1088        * block_manager: arvados.arvfile._BlockManager | None --- The
1089          _BlockManager object used by this instance to coordinate reading
1090          and writing Keep data blocks. If none is given, this instance
1091          constructs its own. This argument is primarily used by other
1092          Collection methods; user client code shouldn't need to use it.
1093
1094        * replication_desired: int | None --- This controls both the value of
1095          the `replication_desired` field on API collection records saved by
1096          this class, as well as the number of Keep services that the object
1097          writes new data blocks to. If none is given, uses the default value
1098          configured for the cluster.
1099
1100        * storage_classes_desired: list[str] | None --- This controls both
1101          the value of the `storage_classes_desired` field on API collection
1102          records saved by this class, as well as selecting which specific
1103          Keep services the object writes new data blocks to. If none is
1104          given, defaults to an empty list.
1105
1106        * put_threads: int | None --- The number of threads to run
1107          simultaneously to upload data blocks to Keep. This value is used when
1108          building a new `block_manager`. It is unused when a `block_manager`
1109          is provided.
1110        """
1111
1112        if storage_classes_desired and type(storage_classes_desired) is not list:
1113            raise errors.ArgumentError("storage_classes_desired must be list type.")
1114
1115        super(Collection, self).__init__(parent)
1116        self._api_client = api_client
1117        self._keep_client = keep_client
1118
1119        # Use the keep client from ThreadSafeAPIClient
1120        if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1121            self._keep_client = self._api_client.keep
1122
1123        self._block_manager = block_manager
1124        self.replication_desired = replication_desired
1125        self._storage_classes_desired = storage_classes_desired
1126        self.put_threads = put_threads
1127
1128        if apiconfig:
1129            self._config = apiconfig
1130        else:
1131            self._config = config.settings()
1132
1133        self.num_retries = num_retries
1134        self._manifest_locator = None
1135        self._manifest_text = None
1136        self._portable_data_hash = None
1137        self._api_response = None
1138        self._past_versions = set()
1139
1140        self.lock = threading.RLock()
1141        self.events = None
1142
1143        if manifest_locator_or_text:
1144            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1145                self._manifest_locator = manifest_locator_or_text
1146            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1147                self._manifest_locator = manifest_locator_or_text
1148                if not self._has_local_collection_uuid():
1149                    self._has_remote_blocks = True
1150            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1151                self._manifest_text = manifest_locator_or_text
1152                if '+R' in self._manifest_text:
1153                    self._has_remote_blocks = True
1154            else:
1155                raise errors.ArgumentError(
1156                    "Argument to CollectionReader is not a manifest or a collection UUID")
1157
1158            try:
1159                self._populate()
1160            except errors.SyntaxError as e:
1161                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1162
1163    def storage_classes_desired(self) -> List[str]:
1164        """Get this collection's `storage_classes_desired` value"""
1165        return self._storage_classes_desired or []
1166
1167    def root_collection(self) -> 'Collection':
1168        return self
1169
1170    def get_properties(self) -> Properties:
1171        """Get this collection's properties
1172
1173        This method always returns a dict. If this collection object does not
1174        have an associated API record, or that record does not have any
1175        properties set, this method returns an empty dict.
1176        """
1177        if self._api_response and self._api_response["properties"]:
1178            return self._api_response["properties"]
1179        else:
1180            return {}
1181
1182    def get_trash_at(self) -> Optional[datetime.datetime]:
1183        """Get this collection's `trash_at` field
1184
1185        This method parses the `trash_at` field of the collection's API
1186        record and returns a datetime from it. If that field is not set, or
1187        this collection object does not have an associated API record,
1188        returns None.
1189        """
1190        if self._api_response and self._api_response["trash_at"]:
1191            try:
1192                return ciso8601.parse_datetime(self._api_response["trash_at"])
1193            except ValueError:
1194                return None
1195        else:
1196            return None
1197
1198    def stream_name(self) -> str:
1199        return "."
1200
1201    def writable(self) -> bool:
1202        return True
1203
1204    @synchronized
1205    def known_past_version(
1206            self,
1207            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1208    ) -> bool:
1209        """Indicate whether an API record for this collection has been seen before
1210
1211        As this collection object loads records from the API server, it records
1212        their `modified_at` and `portable_data_hash` fields. This method accepts
1213        a 2-tuple with values for those fields, and returns `True` if the
1214        combination was previously loaded.
1215        """
1216        return modified_at_and_portable_data_hash in self._past_versions
1217
1218    @synchronized
1219    @retry_method
1220    def update(
1221            self,
1222            other: Optional['Collection']=None,
1223            num_retries: Optional[int]=None,
1224    ) -> None:
1225        """Merge another collection's contents into this one
1226
1227        This method compares the manifest of this collection instance with
1228        another, then updates this instance's manifest with changes from the
1229        other, renaming files to flag conflicts where necessary.
1230
1231        When called without any arguments, this method reloads the collection's
1232        API record, and updates this instance with any changes that have
1233        appeared server-side. If this instance does not have a corresponding
1234        API record, this method raises `arvados.errors.ArgumentError`.
1235
1236        Arguments:
1237
1238        * other: arvados.collection.Collection | None --- The collection
1239          whose contents should be merged into this instance. When not
1240          provided, this method reloads this collection's API record and
1241          constructs a Collection object from it.  If this instance does not
1242          have a corresponding API record, this method raises
1243          `arvados.errors.ArgumentError`.
1244
1245        * num_retries: int | None --- The number of times to retry reloading
1246          the collection's API record from the API server. If not specified,
1247          uses the `num_retries` provided when this instance was constructed.
1248        """
1249        if other is None:
1250            if self._manifest_locator is None:
1251                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1252            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1253            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1254                response.get("portable_data_hash") != self.portable_data_hash()):
1255                # The record on the server is different from our current one, but we've seen it before,
1256                # so ignore it because it's already been merged.
1257                # However, if it's the same as our current record, proceed with the update, because we want to update
1258                # our tokens.
1259                return
1260            else:
1261                self._remember_api_response(response)
1262            other = CollectionReader(response["manifest_text"])
1263        baseline = CollectionReader(self._manifest_text)
1264        self.apply(baseline.diff(other))
1265        self._manifest_text = self.manifest_text()
1266
1267    @synchronized
1268    def _my_api(self):
1269        if self._api_client is None:
1270            self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1271            if self._keep_client is None:
1272                self._keep_client = self._api_client.keep
1273        return self._api_client
1274
1275    @synchronized
1276    def _my_keep(self):
1277        if self._keep_client is None:
1278            if self._api_client is None:
1279                self._my_api()
1280            else:
1281                self._keep_client = KeepClient(api_client=self._api_client)
1282        return self._keep_client
1283
1284    @synchronized
1285    def _my_block_manager(self):
1286        if self._block_manager is None:
1287            copies = (self.replication_desired or
1288                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1289                                                   2))
1290            self._block_manager = _BlockManager(self._my_keep(),
1291                                                copies=copies,
1292                                                put_threads=self.put_threads,
1293                                                num_retries=self.num_retries,
1294                                                storage_classes_func=self.storage_classes_desired)
1295        return self._block_manager
1296
1297    def _remember_api_response(self, response):
1298        self._api_response = response
1299        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1300
1301    def _populate_from_api_server(self):
1302        # As in KeepClient itself, we must wait until the last
1303        # possible moment to instantiate an API client, in order to
1304        # avoid tripping up clients that don't have access to an API
1305        # server.  If we do build one, make sure our Keep client uses
1306        # it.  If instantiation fails, we'll fall back to the except
1307        # clause, just like any other Collection lookup
1308        # failure. Return an exception, or None if successful.
1309        self._remember_api_response(self._my_api().collections().get(
1310            uuid=self._manifest_locator).execute(
1311                num_retries=self.num_retries))
1312        self._manifest_text = self._api_response['manifest_text']
1313        self._portable_data_hash = self._api_response['portable_data_hash']
1314        # If not overriden via kwargs, we should try to load the
1315        # replication_desired and storage_classes_desired from the API server
1316        if self.replication_desired is None:
1317            self.replication_desired = self._api_response.get('replication_desired', None)
1318        if self._storage_classes_desired is None:
1319            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1320
1321    def _populate(self):
1322        if self._manifest_text is None:
1323            if self._manifest_locator is None:
1324                return
1325            else:
1326                self._populate_from_api_server()
1327        self._baseline_manifest = self._manifest_text
1328        self._import_manifest(self._manifest_text)
1329
1330    def _has_collection_uuid(self):
1331        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1332
1333    def _has_local_collection_uuid(self):
1334        return self._has_collection_uuid and \
1335            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1336
1337    def __enter__(self):
1338        return self
1339
1340    def __exit__(self, exc_type, exc_value, traceback):
1341        """Exit a context with this collection instance
1342
1343        If no exception was raised inside the context block, and this
1344        collection is writable and has a corresponding API record, that
1345        record will be updated to match the state of this instance at the end
1346        of the block.
1347        """
1348        if exc_type is None:
1349            if self.writable() and self._has_collection_uuid():
1350                self.save()
1351        self.stop_threads()
1352
1353    def stop_threads(self) -> None:
1354        """Stop background Keep upload/download threads"""
1355        if self._block_manager is not None:
1356            self._block_manager.stop_threads()
1357
1358    @synchronized
1359    def manifest_locator(self) -> Optional[str]:
1360        """Get this collection's manifest locator, if any
1361
1362        * If this collection instance is associated with an API record with a
1363          UUID, return that.
1364        * Otherwise, if this collection instance was loaded from an API record
1365          by portable data hash, return that.
1366        * Otherwise, return `None`.
1367        """
1368        return self._manifest_locator
1369
1370    @synchronized
1371    def clone(
1372            self,
1373            new_parent: Optional['Collection']=None,
1374            new_name: Optional[str]=None,
1375            readonly: bool=False,
1376            new_config: Optional[Mapping[str, str]]=None,
1377    ) -> 'Collection':
1378        """Create a Collection object with the same contents as this instance
1379
1380        This method creates a new Collection object with contents that match
1381        this instance's. The new collection will not be associated with any API
1382        record.
1383
1384        Arguments:
1385
1386        * new_parent: arvados.collection.Collection | None --- This value is
1387          passed to the new Collection's constructor as the `parent`
1388          argument.
1389
1390        * new_name: str | None --- This value is unused.
1391
1392        * readonly: bool --- If this value is true, this method constructs and
1393          returns a `CollectionReader`. Otherwise, it returns a mutable
1394          `Collection`. Default `False`.
1395
1396        * new_config: Mapping[str, str] | None --- This value is passed to the
1397          new Collection's constructor as `apiconfig`. If no value is provided,
1398          defaults to the configuration passed to this instance's constructor.
1399        """
1400        if new_config is None:
1401            new_config = self._config
1402        if readonly:
1403            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1404        else:
1405            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1406
1407        newcollection._clonefrom(self)
1408        return newcollection
1409
1410    @synchronized
1411    def api_response(self) -> Optional[Dict[str, Any]]:
1412        """Get this instance's associated API record
1413
1414        If this Collection instance has an associated API record, return it.
1415        Otherwise, return `None`.
1416        """
1417        return self._api_response
1418
1419    def find_or_create(
1420            self,
1421            path: str,
1422            create_type: CreateType,
1423    ) -> CollectionItem:
1424        if path == ".":
1425            return self
1426        else:
1427            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1428
1429    def find(self, path: str) -> CollectionItem:
1430        if path == ".":
1431            return self
1432        else:
1433            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1434
1435    def remove(self, path: str, recursive: bool=False) -> None:
1436        if path == ".":
1437            raise errors.ArgumentError("Cannot remove '.'")
1438        else:
1439            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1440
1441    @must_be_writable
1442    @synchronized
1443    @retry_method
1444    def save(
1445            self,
1446            properties: Optional[Properties]=None,
1447            storage_classes: Optional[StorageClasses]=None,
1448            trash_at: Optional[datetime.datetime]=None,
1449            merge: bool=True,
1450            num_retries: Optional[int]=None,
1451            preserve_version: bool=False,
1452    ) -> str:
1453        """Save collection to an existing API record
1454
1455        This method updates the instance's corresponding API record to match
1456        the instance's state. If this instance does not have a corresponding API
1457        record yet, raises `AssertionError`. (To create a new API record, use
1458        `Collection.save_new`.) This method returns the saved collection
1459        manifest.
1460
1461        Arguments:
1462
1463        * properties: dict[str, Any] | None --- If provided, the API record will
1464          be updated with these properties. Note this will completely replace
1465          any existing properties.
1466
1467        * storage_classes: list[str] | None --- If provided, the API record will
1468          be updated with this value in the `storage_classes_desired` field.
1469          This value will also be saved on the instance and used for any
1470          changes that follow.
1471
1472        * trash_at: datetime.datetime | None --- If provided, the API record
1473          will be updated with this value in the `trash_at` field.
1474
1475        * merge: bool --- If `True` (the default), this method will first
1476          reload this collection's API record, and merge any new contents into
1477          this instance before saving changes. See `Collection.update` for
1478          details.
1479
1480        * num_retries: int | None --- The number of times to retry reloading
1481          the collection's API record from the API server. If not specified,
1482          uses the `num_retries` provided when this instance was constructed.
1483
1484        * preserve_version: bool --- This value will be passed to directly
1485          to the underlying API call. If `True`, the Arvados API will
1486          preserve the versions of this collection both immediately before
1487          and after the update. If `True` when the API server is not
1488          configured with collection versioning, this method raises
1489          `arvados.errors.ArgumentError`.
1490        """
1491        if properties and type(properties) is not dict:
1492            raise errors.ArgumentError("properties must be dictionary type.")
1493
1494        if storage_classes and type(storage_classes) is not list:
1495            raise errors.ArgumentError("storage_classes must be list type.")
1496        if storage_classes:
1497            self._storage_classes_desired = storage_classes
1498
1499        if trash_at and type(trash_at) is not datetime.datetime:
1500            raise errors.ArgumentError("trash_at must be datetime type.")
1501
1502        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1503            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1504
1505        body={}
1506        if properties:
1507            body["properties"] = properties
1508        if self.storage_classes_desired():
1509            body["storage_classes_desired"] = self.storage_classes_desired()
1510        if trash_at:
1511            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1512            body["trash_at"] = t
1513        if preserve_version:
1514            body["preserve_version"] = preserve_version
1515
1516        if not self.committed():
1517            if self._has_remote_blocks:
1518                # Copy any remote blocks to the local cluster.
1519                self._copy_remote_blocks(remote_blocks={})
1520                self._has_remote_blocks = False
1521            if not self._has_collection_uuid():
1522                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1523            elif not self._has_local_collection_uuid():
1524                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1525
1526            self._my_block_manager().commit_all()
1527
1528            if merge:
1529                self.update()
1530
1531            text = self.manifest_text(strip=False)
1532            body['manifest_text'] = text
1533
1534            self._remember_api_response(self._my_api().collections().update(
1535                uuid=self._manifest_locator,
1536                body=body
1537                ).execute(num_retries=num_retries))
1538            self._manifest_text = self._api_response["manifest_text"]
1539            self._portable_data_hash = self._api_response["portable_data_hash"]
1540            self.set_committed(True)
1541        elif body:
1542            self._remember_api_response(self._my_api().collections().update(
1543                uuid=self._manifest_locator,
1544                body=body
1545                ).execute(num_retries=num_retries))
1546
1547        return self._manifest_text
1548
1549
1550    @must_be_writable
1551    @synchronized
1552    @retry_method
1553    def save_new(
1554            self,
1555            name: Optional[str]=None,
1556            create_collection_record: bool=True,
1557            owner_uuid: Optional[str]=None,
1558            properties: Optional[Properties]=None,
1559            storage_classes: Optional[StorageClasses]=None,
1560            trash_at: Optional[datetime.datetime]=None,
1561            ensure_unique_name: bool=False,
1562            num_retries: Optional[int]=None,
1563            preserve_version: bool=False,
1564    ):
1565        """Save collection to a new API record
1566
1567        This method finishes uploading new data blocks and (optionally)
1568        creates a new API collection record with the provided data. If a new
1569        record is created, this instance becomes associated with that record
1570        for future updates like `save()`. This method returns the saved
1571        collection manifest.
1572
1573        Arguments:
1574
1575        * name: str | None --- The `name` field to use on the new collection
1576          record. If not specified, a generic default name is generated.
1577
1578        * create_collection_record: bool --- If `True` (the default), creates a
1579          collection record on the API server. If `False`, the method finishes
1580          all data uploads and only returns the resulting collection manifest
1581          without sending it to the API server.
1582
1583        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1584          new collection record.
1585
1586        * properties: dict[str, Any] | None --- The `properties` field to use on
1587          the new collection record.
1588
1589        * storage_classes: list[str] | None --- The
1590          `storage_classes_desired` field to use on the new collection record.
1591
1592        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1593          on the new collection record.
1594
1595        * ensure_unique_name: bool --- This value is passed directly to the
1596          Arvados API when creating the collection record. If `True`, the API
1597          server may modify the submitted `name` to ensure the collection's
1598          `name`+`owner_uuid` combination is unique. If `False` (the default),
1599          if a collection already exists with this same `name`+`owner_uuid`
1600          combination, creating a collection record will raise a validation
1601          error.
1602
1603        * num_retries: int | None --- The number of times to retry reloading
1604          the collection's API record from the API server. If not specified,
1605          uses the `num_retries` provided when this instance was constructed.
1606
1607        * preserve_version: bool --- This value will be passed to directly
1608          to the underlying API call. If `True`, the Arvados API will
1609          preserve the versions of this collection both immediately before
1610          and after the update. If `True` when the API server is not
1611          configured with collection versioning, this method raises
1612          `arvados.errors.ArgumentError`.
1613        """
1614        if properties and type(properties) is not dict:
1615            raise errors.ArgumentError("properties must be dictionary type.")
1616
1617        if storage_classes and type(storage_classes) is not list:
1618            raise errors.ArgumentError("storage_classes must be list type.")
1619
1620        if trash_at and type(trash_at) is not datetime.datetime:
1621            raise errors.ArgumentError("trash_at must be datetime type.")
1622
1623        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1624            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1625
1626        if self._has_remote_blocks:
1627            # Copy any remote blocks to the local cluster.
1628            self._copy_remote_blocks(remote_blocks={})
1629            self._has_remote_blocks = False
1630
1631        if storage_classes:
1632            self._storage_classes_desired = storage_classes
1633
1634        self._my_block_manager().commit_all()
1635        text = self.manifest_text(strip=False)
1636
1637        if create_collection_record:
1638            if name is None:
1639                name = "New collection"
1640                ensure_unique_name = True
1641
1642            body = {"manifest_text": text,
1643                    "name": name,
1644                    "replication_desired": self.replication_desired}
1645            if owner_uuid:
1646                body["owner_uuid"] = owner_uuid
1647            if properties:
1648                body["properties"] = properties
1649            if self.storage_classes_desired():
1650                body["storage_classes_desired"] = self.storage_classes_desired()
1651            if trash_at:
1652                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1653                body["trash_at"] = t
1654            if preserve_version:
1655                body["preserve_version"] = preserve_version
1656
1657            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1658            text = self._api_response["manifest_text"]
1659
1660            self._manifest_locator = self._api_response["uuid"]
1661            self._portable_data_hash = self._api_response["portable_data_hash"]
1662
1663            self._manifest_text = text
1664            self.set_committed(True)
1665
1666        return text
1667
1668    _token_re = re.compile(r'(\S+)(\s+|$)')
1669    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1670    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1671
1672    def _unescape_manifest_path(self, path):
1673        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1674
1675    @synchronized
1676    def _import_manifest(self, manifest_text):
1677        """Import a manifest into a `Collection`.
1678
1679        :manifest_text:
1680          The manifest text to import from.
1681
1682        """
1683        if len(self) > 0:
1684            raise ArgumentError("Can only import manifest into an empty collection")
1685
1686        STREAM_NAME = 0
1687        BLOCKS = 1
1688        SEGMENTS = 2
1689
1690        stream_name = None
1691        state = STREAM_NAME
1692
1693        for token_and_separator in self._token_re.finditer(manifest_text):
1694            tok = token_and_separator.group(1)
1695            sep = token_and_separator.group(2)
1696
1697            if state == STREAM_NAME:
1698                # starting a new stream
1699                stream_name = self._unescape_manifest_path(tok)
1700                blocks = []
1701                segments = []
1702                streamoffset = 0
1703                state = BLOCKS
1704                self.find_or_create(stream_name, COLLECTION)
1705                continue
1706
1707            if state == BLOCKS:
1708                block_locator = self._block_re.match(tok)
1709                if block_locator:
1710                    blocksize = int(block_locator.group(1))
1711                    blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1712                    streamoffset += blocksize
1713                else:
1714                    state = SEGMENTS
1715
1716            if state == SEGMENTS:
1717                file_segment = self._segment_re.match(tok)
1718                if file_segment:
1719                    pos = int(file_segment.group(1))
1720                    size = int(file_segment.group(2))
1721                    name = self._unescape_manifest_path(file_segment.group(3))
1722                    if name.split('/')[-1] == '.':
1723                        # placeholder for persisting an empty directory, not a real file
1724                        if len(name) > 2:
1725                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1726                    else:
1727                        filepath = os.path.join(stream_name, name)
1728                        try:
1729                            afile = self.find_or_create(filepath, FILE)
1730                        except IOError as e:
1731                            if e.errno == errno.ENOTDIR:
1732                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1733                            else:
1734                                raise e from None
1735                        if isinstance(afile, ArvadosFile):
1736                            afile.add_segment(blocks, pos, size)
1737                        else:
1738                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1739                else:
1740                    # error!
1741                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1742
1743            if sep == "\n":
1744                stream_name = None
1745                state = STREAM_NAME
1746
1747        self.set_committed(True)
1748
1749    @synchronized
1750    def notify(
1751            self,
1752            event: ChangeType,
1753            collection: 'RichCollectionBase',
1754            name: str,
1755            item: CollectionItem,
1756    ) -> None:
1757        if self._callback:
1758            self._callback(event, collection, name, item)

Read and manipulate an Arvados collection

This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

Collection( manifest_locator_or_text: Optional[str] = None, api_client: Optional[arvados.api_resources.ArvadosAPIClient] = None, keep_client: Optional[arvados.keep.KeepClient] = None, num_retries: int = 10, parent: Optional[Collection] = None, apiconfig: Optional[Mapping[str, str]] = None, block_manager: Optional[arvados.arvfile._BlockManager] = None, replication_desired: Optional[int] = None, storage_classes_desired: Optional[List[str]] = None, put_threads: Optional[int] = None)
1040    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1041                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1042                 keep_client: Optional['arvados.keep.KeepClient']=None,
1043                 num_retries: int=10,
1044                 parent: Optional['Collection']=None,
1045                 apiconfig: Optional[Mapping[str, str]]=None,
1046                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1047                 replication_desired: Optional[int]=None,
1048                 storage_classes_desired: Optional[List[str]]=None,
1049                 put_threads: Optional[int]=None):
1050        """Initialize a Collection object
1051
1052        Arguments:
1053
1054        * manifest_locator_or_text: str | None --- This string can contain a
1055          collection manifest text, portable data hash, or UUID. When given a
1056          portable data hash or UUID, this instance will load a collection
1057          record from the API server. Otherwise, this instance will represent a
1058          new collection without an API server record. The default value `None`
1059          instantiates a new collection with an empty manifest.
1060
1061        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1062          Arvados API client object this instance uses to make requests. If
1063          none is given, this instance creates its own client using the
1064          settings from `apiconfig` (see below). If your client instantiates
1065          many Collection objects, you can help limit memory utilization by
1066          calling `arvados.api.api` to construct an
1067          `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1068          for every Collection.
1069
1070        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1071          object this instance uses to make requests. If none is given, this
1072          instance creates its own client using its `api_client`.
1073
1074        * num_retries: int --- The number of times that client requests are
1075          retried. Default 10.
1076
1077        * parent: arvados.collection.Collection | None --- The parent Collection
1078          object of this instance, if any. This argument is primarily used by
1079          other Collection methods; user client code shouldn't need to use it.
1080
1081        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1082          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1083          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1084          Collection object constructs one from these settings. If no
1085          mapping is provided, calls `arvados.config.settings` to get these
1086          parameters from user configuration.
1087
1088        * block_manager: arvados.arvfile._BlockManager | None --- The
1089          _BlockManager object used by this instance to coordinate reading
1090          and writing Keep data blocks. If none is given, this instance
1091          constructs its own. This argument is primarily used by other
1092          Collection methods; user client code shouldn't need to use it.
1093
1094        * replication_desired: int | None --- This controls both the value of
1095          the `replication_desired` field on API collection records saved by
1096          this class, as well as the number of Keep services that the object
1097          writes new data blocks to. If none is given, uses the default value
1098          configured for the cluster.
1099
1100        * storage_classes_desired: list[str] | None --- This controls both
1101          the value of the `storage_classes_desired` field on API collection
1102          records saved by this class, as well as selecting which specific
1103          Keep services the object writes new data blocks to. If none is
1104          given, defaults to an empty list.
1105
1106        * put_threads: int | None --- The number of threads to run
1107          simultaneously to upload data blocks to Keep. This value is used when
1108          building a new `block_manager`. It is unused when a `block_manager`
1109          is provided.
1110        """
1111
1112        if storage_classes_desired and type(storage_classes_desired) is not list:
1113            raise errors.ArgumentError("storage_classes_desired must be list type.")
1114
1115        super(Collection, self).__init__(parent)
1116        self._api_client = api_client
1117        self._keep_client = keep_client
1118
1119        # Use the keep client from ThreadSafeAPIClient
1120        if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1121            self._keep_client = self._api_client.keep
1122
1123        self._block_manager = block_manager
1124        self.replication_desired = replication_desired
1125        self._storage_classes_desired = storage_classes_desired
1126        self.put_threads = put_threads
1127
1128        if apiconfig:
1129            self._config = apiconfig
1130        else:
1131            self._config = config.settings()
1132
1133        self.num_retries = num_retries
1134        self._manifest_locator = None
1135        self._manifest_text = None
1136        self._portable_data_hash = None
1137        self._api_response = None
1138        self._past_versions = set()
1139
1140        self.lock = threading.RLock()
1141        self.events = None
1142
1143        if manifest_locator_or_text:
1144            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1145                self._manifest_locator = manifest_locator_or_text
1146            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1147                self._manifest_locator = manifest_locator_or_text
1148                if not self._has_local_collection_uuid():
1149                    self._has_remote_blocks = True
1150            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1151                self._manifest_text = manifest_locator_or_text
1152                if '+R' in self._manifest_text:
1153                    self._has_remote_blocks = True
1154            else:
1155                raise errors.ArgumentError(
1156                    "Argument to CollectionReader is not a manifest or a collection UUID")
1157
1158            try:
1159                self._populate()
1160            except errors.SyntaxError as e:
1161                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.api.ThreadSafeAPIClient, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

replication_desired
put_threads
num_retries
lock
events
def storage_classes_desired(self) -> List[str]:
1163    def storage_classes_desired(self) -> List[str]:
1164        """Get this collection's `storage_classes_desired` value"""
1165        return self._storage_classes_desired or []

Get this collection’s storage_classes_desired value

def root_collection(self) -> Collection:
1167    def root_collection(self) -> 'Collection':
1168        return self

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def get_properties(self) -> Dict[str, Any]:
1170    def get_properties(self) -> Properties:
1171        """Get this collection's properties
1172
1173        This method always returns a dict. If this collection object does not
1174        have an associated API record, or that record does not have any
1175        properties set, this method returns an empty dict.
1176        """
1177        if self._api_response and self._api_response["properties"]:
1178            return self._api_response["properties"]
1179        else:
1180            return {}

Get this collection’s properties

This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.

def get_trash_at(self) -> Optional[datetime.datetime]:
1182    def get_trash_at(self) -> Optional[datetime.datetime]:
1183        """Get this collection's `trash_at` field
1184
1185        This method parses the `trash_at` field of the collection's API
1186        record and returns a datetime from it. If that field is not set, or
1187        this collection object does not have an associated API record,
1188        returns None.
1189        """
1190        if self._api_response and self._api_response["trash_at"]:
1191            try:
1192                return ciso8601.parse_datetime(self._api_response["trash_at"])
1193            except ValueError:
1194                return None
1195        else:
1196            return None

Get this collection’s trash_at field

This method parses the trash_at field of the collection’s API record and returns a datetime from it. If that field is not set, or this collection object does not have an associated API record, returns None.

def stream_name(self) -> str:
1198    def stream_name(self) -> str:
1199        return "."

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

def writable(self) -> bool:
1201    def writable(self) -> bool:
1202        return True

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

@synchronized
def known_past_version( self, modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]) -> bool:
1204    @synchronized
1205    def known_past_version(
1206            self,
1207            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1208    ) -> bool:
1209        """Indicate whether an API record for this collection has been seen before
1210
1211        As this collection object loads records from the API server, it records
1212        their `modified_at` and `portable_data_hash` fields. This method accepts
1213        a 2-tuple with values for those fields, and returns `True` if the
1214        combination was previously loaded.
1215        """
1216        return modified_at_and_portable_data_hash in self._past_versions

Indicate whether an API record for this collection has been seen before

As this collection object loads records from the API server, it records their modified_at and portable_data_hash fields. This method accepts a 2-tuple with values for those fields, and returns True if the combination was previously loaded.

@synchronized
@retry_method
def update( self, other: Optional[Collection] = None, num_retries: Optional[int] = None) -> None:
1218    @synchronized
1219    @retry_method
1220    def update(
1221            self,
1222            other: Optional['Collection']=None,
1223            num_retries: Optional[int]=None,
1224    ) -> None:
1225        """Merge another collection's contents into this one
1226
1227        This method compares the manifest of this collection instance with
1228        another, then updates this instance's manifest with changes from the
1229        other, renaming files to flag conflicts where necessary.
1230
1231        When called without any arguments, this method reloads the collection's
1232        API record, and updates this instance with any changes that have
1233        appeared server-side. If this instance does not have a corresponding
1234        API record, this method raises `arvados.errors.ArgumentError`.
1235
1236        Arguments:
1237
1238        * other: arvados.collection.Collection | None --- The collection
1239          whose contents should be merged into this instance. When not
1240          provided, this method reloads this collection's API record and
1241          constructs a Collection object from it.  If this instance does not
1242          have a corresponding API record, this method raises
1243          `arvados.errors.ArgumentError`.
1244
1245        * num_retries: int | None --- The number of times to retry reloading
1246          the collection's API record from the API server. If not specified,
1247          uses the `num_retries` provided when this instance was constructed.
1248        """
1249        if other is None:
1250            if self._manifest_locator is None:
1251                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1252            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1253            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1254                response.get("portable_data_hash") != self.portable_data_hash()):
1255                # The record on the server is different from our current one, but we've seen it before,
1256                # so ignore it because it's already been merged.
1257                # However, if it's the same as our current record, proceed with the update, because we want to update
1258                # our tokens.
1259                return
1260            else:
1261                self._remember_api_response(response)
1262            other = CollectionReader(response["manifest_text"])
1263        baseline = CollectionReader(self._manifest_text)
1264        self.apply(baseline.diff(other))
1265        self._manifest_text = self.manifest_text()

Merge another collection’s contents into this one

This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.

When called without any arguments, this method reloads the collection’s API record, and updates this instance with any changes that have appeared server-side. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

Arguments:

  • other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

def stop_threads(self) -> None:
1353    def stop_threads(self) -> None:
1354        """Stop background Keep upload/download threads"""
1355        if self._block_manager is not None:
1356            self._block_manager.stop_threads()

Stop background Keep upload/download threads

@synchronized
def manifest_locator(self) -> Optional[str]:
1358    @synchronized
1359    def manifest_locator(self) -> Optional[str]:
1360        """Get this collection's manifest locator, if any
1361
1362        * If this collection instance is associated with an API record with a
1363          UUID, return that.
1364        * Otherwise, if this collection instance was loaded from an API record
1365          by portable data hash, return that.
1366        * Otherwise, return `None`.
1367        """
1368        return self._manifest_locator

Get this collection’s manifest locator, if any

  • If this collection instance is associated with an API record with a UUID, return that.
  • Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
  • Otherwise, return None.
@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None, readonly: bool = False, new_config: Optional[Mapping[str, str]] = None) -> Collection:
1370    @synchronized
1371    def clone(
1372            self,
1373            new_parent: Optional['Collection']=None,
1374            new_name: Optional[str]=None,
1375            readonly: bool=False,
1376            new_config: Optional[Mapping[str, str]]=None,
1377    ) -> 'Collection':
1378        """Create a Collection object with the same contents as this instance
1379
1380        This method creates a new Collection object with contents that match
1381        this instance's. The new collection will not be associated with any API
1382        record.
1383
1384        Arguments:
1385
1386        * new_parent: arvados.collection.Collection | None --- This value is
1387          passed to the new Collection's constructor as the `parent`
1388          argument.
1389
1390        * new_name: str | None --- This value is unused.
1391
1392        * readonly: bool --- If this value is true, this method constructs and
1393          returns a `CollectionReader`. Otherwise, it returns a mutable
1394          `Collection`. Default `False`.
1395
1396        * new_config: Mapping[str, str] | None --- This value is passed to the
1397          new Collection's constructor as `apiconfig`. If no value is provided,
1398          defaults to the configuration passed to this instance's constructor.
1399        """
1400        if new_config is None:
1401            new_config = self._config
1402        if readonly:
1403            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1404        else:
1405            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1406
1407        newcollection._clonefrom(self)
1408        return newcollection

Create a Collection object with the same contents as this instance

This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.

Arguments:

  • new_parent: Collection | None — This value is passed to the new Collection’s constructor as the parent argument.

  • new_name: str | None — This value is unused.

  • readonly: bool — If this value is true, this method constructs and returns a CollectionReader. Otherwise, it returns a mutable Collection. Default False.

  • new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as apiconfig. If no value is provided, defaults to the configuration passed to this instance’s constructor.

@synchronized
def api_response(self) -> Optional[Dict[str, Any]]:
1410    @synchronized
1411    def api_response(self) -> Optional[Dict[str, Any]]:
1412        """Get this instance's associated API record
1413
1414        If this Collection instance has an associated API record, return it.
1415        Otherwise, return `None`.
1416        """
1417        return self._api_response

Get this instance’s associated API record

If this Collection instance has an associated API record, return it. Otherwise, return None.

def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1419    def find_or_create(
1420            self,
1421            path: str,
1422            create_type: CreateType,
1423    ) -> CollectionItem:
1424        if path == ".":
1425            return self
1426        else:
1427            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1429    def find(self, path: str) -> CollectionItem:
1430        if path == ".":
1431            return self
1432        else:
1433            return super(Collection, self).find(path[2:] if path.startswith("./") else path)

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
def remove(self, path: str, recursive: bool = False) -> None:
1435    def remove(self, path: str, recursive: bool=False) -> None:
1436        if path == ".":
1437            raise errors.ArgumentError("Cannot remove '.'")
1438        else:
1439            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

@must_be_writable
@synchronized
@retry_method
def save( self, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, merge: bool = True, num_retries: Optional[int] = None, preserve_version: bool = False) -> str:
1441    @must_be_writable
1442    @synchronized
1443    @retry_method
1444    def save(
1445            self,
1446            properties: Optional[Properties]=None,
1447            storage_classes: Optional[StorageClasses]=None,
1448            trash_at: Optional[datetime.datetime]=None,
1449            merge: bool=True,
1450            num_retries: Optional[int]=None,
1451            preserve_version: bool=False,
1452    ) -> str:
1453        """Save collection to an existing API record
1454
1455        This method updates the instance's corresponding API record to match
1456        the instance's state. If this instance does not have a corresponding API
1457        record yet, raises `AssertionError`. (To create a new API record, use
1458        `Collection.save_new`.) This method returns the saved collection
1459        manifest.
1460
1461        Arguments:
1462
1463        * properties: dict[str, Any] | None --- If provided, the API record will
1464          be updated with these properties. Note this will completely replace
1465          any existing properties.
1466
1467        * storage_classes: list[str] | None --- If provided, the API record will
1468          be updated with this value in the `storage_classes_desired` field.
1469          This value will also be saved on the instance and used for any
1470          changes that follow.
1471
1472        * trash_at: datetime.datetime | None --- If provided, the API record
1473          will be updated with this value in the `trash_at` field.
1474
1475        * merge: bool --- If `True` (the default), this method will first
1476          reload this collection's API record, and merge any new contents into
1477          this instance before saving changes. See `Collection.update` for
1478          details.
1479
1480        * num_retries: int | None --- The number of times to retry reloading
1481          the collection's API record from the API server. If not specified,
1482          uses the `num_retries` provided when this instance was constructed.
1483
1484        * preserve_version: bool --- This value will be passed to directly
1485          to the underlying API call. If `True`, the Arvados API will
1486          preserve the versions of this collection both immediately before
1487          and after the update. If `True` when the API server is not
1488          configured with collection versioning, this method raises
1489          `arvados.errors.ArgumentError`.
1490        """
1491        if properties and type(properties) is not dict:
1492            raise errors.ArgumentError("properties must be dictionary type.")
1493
1494        if storage_classes and type(storage_classes) is not list:
1495            raise errors.ArgumentError("storage_classes must be list type.")
1496        if storage_classes:
1497            self._storage_classes_desired = storage_classes
1498
1499        if trash_at and type(trash_at) is not datetime.datetime:
1500            raise errors.ArgumentError("trash_at must be datetime type.")
1501
1502        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1503            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1504
1505        body={}
1506        if properties:
1507            body["properties"] = properties
1508        if self.storage_classes_desired():
1509            body["storage_classes_desired"] = self.storage_classes_desired()
1510        if trash_at:
1511            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1512            body["trash_at"] = t
1513        if preserve_version:
1514            body["preserve_version"] = preserve_version
1515
1516        if not self.committed():
1517            if self._has_remote_blocks:
1518                # Copy any remote blocks to the local cluster.
1519                self._copy_remote_blocks(remote_blocks={})
1520                self._has_remote_blocks = False
1521            if not self._has_collection_uuid():
1522                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1523            elif not self._has_local_collection_uuid():
1524                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1525
1526            self._my_block_manager().commit_all()
1527
1528            if merge:
1529                self.update()
1530
1531            text = self.manifest_text(strip=False)
1532            body['manifest_text'] = text
1533
1534            self._remember_api_response(self._my_api().collections().update(
1535                uuid=self._manifest_locator,
1536                body=body
1537                ).execute(num_retries=num_retries))
1538            self._manifest_text = self._api_response["manifest_text"]
1539            self._portable_data_hash = self._api_response["portable_data_hash"]
1540            self.set_committed(True)
1541        elif body:
1542            self._remember_api_response(self._my_api().collections().update(
1543                uuid=self._manifest_locator,
1544                body=body
1545                ).execute(num_retries=num_retries))
1546
1547        return self._manifest_text

Save collection to an existing API record

This method updates the instance’s corresponding API record to match the instance’s state. If this instance does not have a corresponding API record yet, raises AssertionError. (To create a new API record, use Collection.save_new.) This method returns the saved collection manifest.

Arguments:

  • properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.

  • storage_classes: list[str] | None — If provided, the API record will be updated with this value in the storage_classes_desired field. This value will also be saved on the instance and used for any changes that follow.

  • trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the trash_at field.

  • merge: bool — If True (the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. See Collection.update for details.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@must_be_writable
@synchronized
@retry_method
def save_new( self, name: Optional[str] = None, create_collection_record: bool = True, owner_uuid: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, ensure_unique_name: bool = False, num_retries: Optional[int] = None, preserve_version: bool = False):
1550    @must_be_writable
1551    @synchronized
1552    @retry_method
1553    def save_new(
1554            self,
1555            name: Optional[str]=None,
1556            create_collection_record: bool=True,
1557            owner_uuid: Optional[str]=None,
1558            properties: Optional[Properties]=None,
1559            storage_classes: Optional[StorageClasses]=None,
1560            trash_at: Optional[datetime.datetime]=None,
1561            ensure_unique_name: bool=False,
1562            num_retries: Optional[int]=None,
1563            preserve_version: bool=False,
1564    ):
1565        """Save collection to a new API record
1566
1567        This method finishes uploading new data blocks and (optionally)
1568        creates a new API collection record with the provided data. If a new
1569        record is created, this instance becomes associated with that record
1570        for future updates like `save()`. This method returns the saved
1571        collection manifest.
1572
1573        Arguments:
1574
1575        * name: str | None --- The `name` field to use on the new collection
1576          record. If not specified, a generic default name is generated.
1577
1578        * create_collection_record: bool --- If `True` (the default), creates a
1579          collection record on the API server. If `False`, the method finishes
1580          all data uploads and only returns the resulting collection manifest
1581          without sending it to the API server.
1582
1583        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1584          new collection record.
1585
1586        * properties: dict[str, Any] | None --- The `properties` field to use on
1587          the new collection record.
1588
1589        * storage_classes: list[str] | None --- The
1590          `storage_classes_desired` field to use on the new collection record.
1591
1592        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1593          on the new collection record.
1594
1595        * ensure_unique_name: bool --- This value is passed directly to the
1596          Arvados API when creating the collection record. If `True`, the API
1597          server may modify the submitted `name` to ensure the collection's
1598          `name`+`owner_uuid` combination is unique. If `False` (the default),
1599          if a collection already exists with this same `name`+`owner_uuid`
1600          combination, creating a collection record will raise a validation
1601          error.
1602
1603        * num_retries: int | None --- The number of times to retry reloading
1604          the collection's API record from the API server. If not specified,
1605          uses the `num_retries` provided when this instance was constructed.
1606
1607        * preserve_version: bool --- This value will be passed to directly
1608          to the underlying API call. If `True`, the Arvados API will
1609          preserve the versions of this collection both immediately before
1610          and after the update. If `True` when the API server is not
1611          configured with collection versioning, this method raises
1612          `arvados.errors.ArgumentError`.
1613        """
1614        if properties and type(properties) is not dict:
1615            raise errors.ArgumentError("properties must be dictionary type.")
1616
1617        if storage_classes and type(storage_classes) is not list:
1618            raise errors.ArgumentError("storage_classes must be list type.")
1619
1620        if trash_at and type(trash_at) is not datetime.datetime:
1621            raise errors.ArgumentError("trash_at must be datetime type.")
1622
1623        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1624            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1625
1626        if self._has_remote_blocks:
1627            # Copy any remote blocks to the local cluster.
1628            self._copy_remote_blocks(remote_blocks={})
1629            self._has_remote_blocks = False
1630
1631        if storage_classes:
1632            self._storage_classes_desired = storage_classes
1633
1634        self._my_block_manager().commit_all()
1635        text = self.manifest_text(strip=False)
1636
1637        if create_collection_record:
1638            if name is None:
1639                name = "New collection"
1640                ensure_unique_name = True
1641
1642            body = {"manifest_text": text,
1643                    "name": name,
1644                    "replication_desired": self.replication_desired}
1645            if owner_uuid:
1646                body["owner_uuid"] = owner_uuid
1647            if properties:
1648                body["properties"] = properties
1649            if self.storage_classes_desired():
1650                body["storage_classes_desired"] = self.storage_classes_desired()
1651            if trash_at:
1652                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1653                body["trash_at"] = t
1654            if preserve_version:
1655                body["preserve_version"] = preserve_version
1656
1657            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1658            text = self._api_response["manifest_text"]
1659
1660            self._manifest_locator = self._api_response["uuid"]
1661            self._portable_data_hash = self._api_response["portable_data_hash"]
1662
1663            self._manifest_text = text
1664            self.set_committed(True)
1665
1666        return text

Save collection to a new API record

This method finishes uploading new data blocks and (optionally) creates a new API collection record with the provided data. If a new record is created, this instance becomes associated with that record for future updates like save(). This method returns the saved collection manifest.

Arguments:

  • name: str | None — The name field to use on the new collection record. If not specified, a generic default name is generated.

  • create_collection_record: bool — If True (the default), creates a collection record on the API server. If False, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.

  • owner_uuid: str | None — The owner_uuid field to use on the new collection record.

  • properties: dict[str, Any] | None — The properties field to use on the new collection record.

  • storage_classes: list[str] | None — The storage_classes_desired field to use on the new collection record.

  • trash_at: datetime.datetime | None — The trash_at field to use on the new collection record.

  • ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If True, the API server may modify the submitted name to ensure the collection’s name+owner_uuid combination is unique. If False (the default), if a collection already exists with this same name+owner_uuid combination, creating a collection record will raise a validation error.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
1749    @synchronized
1750    def notify(
1751            self,
1752            event: ChangeType,
1753            collection: 'RichCollectionBase',
1754            name: str,
1755            item: CollectionItem,
1756    ) -> None:
1757        if self._callback:
1758            self._callback(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

class Subcollection(RichCollectionBase):
1761class Subcollection(RichCollectionBase):
1762    """Read and manipulate a stream/directory within an Arvados collection
1763
1764    This class represents a single stream (like a directory) within an Arvados
1765    `Collection`. It is returned by `Collection.find` and provides the same API.
1766    Operations that work on the API collection record propagate to the parent
1767    `Collection` object.
1768    """
1769
1770    def __init__(self, parent, name):
1771        super(Subcollection, self).__init__(parent)
1772        self.lock = self.root_collection().lock
1773        self._manifest_text = None
1774        self.name = name
1775        self.num_retries = parent.num_retries
1776
1777    def root_collection(self) -> 'Collection':
1778        return self.parent.root_collection()
1779
1780    def writable(self) -> bool:
1781        return self.root_collection().writable()
1782
1783    def _my_api(self):
1784        return self.root_collection()._my_api()
1785
1786    def _my_keep(self):
1787        return self.root_collection()._my_keep()
1788
1789    def _my_block_manager(self):
1790        return self.root_collection()._my_block_manager()
1791
1792    def stream_name(self) -> str:
1793        return os.path.join(self.parent.stream_name(), self.name)
1794
1795    @synchronized
1796    def clone(
1797            self,
1798            new_parent: Optional['Collection']=None,
1799            new_name: Optional[str]=None,
1800    ) -> 'Subcollection':
1801        c = Subcollection(new_parent, new_name)
1802        c._clonefrom(self)
1803        return c
1804
1805    @must_be_writable
1806    @synchronized
1807    def _reparent(self, newparent, newname):
1808        self.set_committed(False)
1809        self.flush()
1810        self.parent.remove(self.name, recursive=True)
1811        self.parent = newparent
1812        self.name = newname
1813        self.lock = self.parent.root_collection().lock
1814
1815    @synchronized
1816    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1817        """Encode empty directories by using an \056-named (".") empty file"""
1818        if len(self._items) == 0:
1819            return "%s %s 0:0:\\056\n" % (
1820                streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1821        return super(Subcollection, self)._get_manifest_text(stream_name,
1822                                                             strip, normalize,
1823                                                             only_committed)

Read and manipulate a stream/directory within an Arvados collection

This class represents a single stream (like a directory) within an Arvados Collection. It is returned by Collection.find and provides the same API. Operations that work on the API collection record propagate to the parent Collection object.

Subcollection(parent, name)
1770    def __init__(self, parent, name):
1771        super(Subcollection, self).__init__(parent)
1772        self.lock = self.root_collection().lock
1773        self._manifest_text = None
1774        self.name = name
1775        self.num_retries = parent.num_retries
lock
name
num_retries
def root_collection(self) -> Collection:
1777    def root_collection(self) -> 'Collection':
1778        return self.parent.root_collection()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def writable(self) -> bool:
1780    def writable(self) -> bool:
1781        return self.root_collection().writable()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def stream_name(self) -> str:
1792    def stream_name(self) -> str:
1793        return os.path.join(self.parent.stream_name(), self.name)

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None) -> Subcollection:
1795    @synchronized
1796    def clone(
1797            self,
1798            new_parent: Optional['Collection']=None,
1799            new_name: Optional[str]=None,
1800    ) -> 'Subcollection':
1801        c = Subcollection(new_parent, new_name)
1802        c._clonefrom(self)
1803        return c
class CollectionReader(Collection):
1826class CollectionReader(Collection):
1827    """Read-only `Collection` subclass
1828
1829    This class will never create or update any API collection records. You can
1830    use this class for additional code safety when you only need to read
1831    existing collections.
1832    """
1833    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1834        self._in_init = True
1835        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1836        self._in_init = False
1837
1838        # Forego any locking since it should never change once initialized.
1839        self.lock = NoopLock()
1840
1841        # Backwards compatability with old CollectionReader
1842        # all_streams() and all_files()
1843        self._streams = None
1844
1845    def writable(self) -> bool:
1846        return self._in_init

Read-only Collection subclass

This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.

CollectionReader(manifest_locator_or_text, *args, **kwargs)
1833    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1834        self._in_init = True
1835        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1836        self._in_init = False
1837
1838        # Forego any locking since it should never change once initialized.
1839        self.lock = NoopLock()
1840
1841        # Backwards compatability with old CollectionReader
1842        # all_streams() and all_files()
1843        self._streams = None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.api.ThreadSafeAPIClient, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

lock
def writable(self) -> bool:
1845    def writable(self) -> bool:
1846        return self._in_init

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.