arvados.collection

Tools to work with Arvados collections

This module provides high-level interfaces to create, read, and update Arvados collections. Most users will want to instantiate Collection objects, and use methods like Collection.open and Collection.mkdirs to read and write data in the collection. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4"""Tools to work with Arvados collections
   5
   6This module provides high-level interfaces to create, read, and update
   7Arvados collections. Most users will want to instantiate `Collection`
   8objects, and use methods like `Collection.open` and `Collection.mkdirs` to
   9read and write data in the collection. Refer to the Arvados Python SDK
  10cookbook for [an introduction to using the Collection class][cookbook].
  11
  12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
  13"""
  14
  15from __future__ import absolute_import
  16from future.utils import listitems, listvalues, viewkeys
  17from builtins import str
  18from past.builtins import basestring
  19from builtins import object
  20import ciso8601
  21import datetime
  22import errno
  23import functools
  24import hashlib
  25import io
  26import logging
  27import os
  28import re
  29import sys
  30import threading
  31import time
  32
  33from collections import deque
  34from stat import *
  35
  36from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
  37from .keep import KeepLocator, KeepClient
  38from .stream import StreamReader
  39from ._normalize_stream import normalize_stream, escape
  40from ._ranges import Range, LocatorAndRange
  41from .safeapi import ThreadSafeApiCache
  42import arvados.config as config
  43import arvados.errors as errors
  44import arvados.util
  45import arvados.events as events
  46from arvados.retry import retry_method
  47
  48from typing import (
  49    Any,
  50    Callable,
  51    Dict,
  52    IO,
  53    Iterator,
  54    List,
  55    Mapping,
  56    Optional,
  57    Tuple,
  58    Union,
  59)
  60
  61if sys.version_info < (3, 8):
  62    from typing_extensions import Literal
  63else:
  64    from typing import Literal
  65
  66_logger = logging.getLogger('arvados.collection')
  67
  68ADD = "add"
  69"""Argument value for `Collection` methods to represent an added item"""
  70DEL = "del"
  71"""Argument value for `Collection` methods to represent a removed item"""
  72MOD = "mod"
  73"""Argument value for `Collection` methods to represent a modified item"""
  74TOK = "tok"
  75"""Argument value for `Collection` methods to represent an item with token differences"""
  76FILE = "file"
  77"""`create_type` value for `Collection.find_or_create`"""
  78COLLECTION = "collection"
  79"""`create_type` value for `Collection.find_or_create`"""
  80
  81ChangeList = List[Union[
  82    Tuple[Literal[ADD, DEL], str, 'Collection'],
  83    Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
  84]]
  85ChangeType = Literal[ADD, DEL, MOD, TOK]
  86CollectionItem = Union[ArvadosFile, 'Collection']
  87ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
  88CreateType = Literal[COLLECTION, FILE]
  89Properties = Dict[str, Any]
  90StorageClasses = List[str]
  91
  92class CollectionBase(object):
  93    """Abstract base class for Collection classes
  94
  95    .. ATTENTION:: Internal
  96       This class is meant to be used by other parts of the SDK. User code
  97       should instantiate or subclass `Collection` or one of its subclasses
  98       directly.
  99    """
 100
 101    def __enter__(self):
 102        """Enter a context block with this collection instance"""
 103        return self
 104
 105    def __exit__(self, exc_type, exc_value, traceback):
 106        """Exit a context block with this collection instance"""
 107        pass
 108
 109    def _my_keep(self):
 110        if self._keep_client is None:
 111            self._keep_client = KeepClient(api_client=self._api_client,
 112                                           num_retries=self.num_retries)
 113        return self._keep_client
 114
 115    def stripped_manifest(self) -> str:
 116        """Create a copy of the collection manifest with only size hints
 117
 118        This method returns a string with the current collection's manifest
 119        text with all non-portable locator hints like permission hints and
 120        remote cluster hints removed. The only hints in the returned manifest
 121        will be size hints.
 122        """
 123        raw = self.manifest_text()
 124        clean = []
 125        for line in raw.split("\n"):
 126            fields = line.split()
 127            if fields:
 128                clean_fields = fields[:1] + [
 129                    (re.sub(r'\+[^\d][^\+]*', '', x)
 130                     if re.match(arvados.util.keep_locator_pattern, x)
 131                     else x)
 132                    for x in fields[1:]]
 133                clean += [' '.join(clean_fields), "\n"]
 134        return ''.join(clean)
 135
 136
 137class _WriterFile(_FileLikeObjectBase):
 138    def __init__(self, coll_writer, name):
 139        super(_WriterFile, self).__init__(name, 'wb')
 140        self.dest = coll_writer
 141
 142    def close(self):
 143        super(_WriterFile, self).close()
 144        self.dest.finish_current_file()
 145
 146    @_FileLikeObjectBase._before_close
 147    def write(self, data):
 148        self.dest.write(data)
 149
 150    @_FileLikeObjectBase._before_close
 151    def writelines(self, seq):
 152        for data in seq:
 153            self.write(data)
 154
 155    @_FileLikeObjectBase._before_close
 156    def flush(self):
 157        self.dest.flush_data()
 158
 159
 160class RichCollectionBase(CollectionBase):
 161    """Base class for Collection classes
 162
 163    .. ATTENTION:: Internal
 164       This class is meant to be used by other parts of the SDK. User code
 165       should instantiate or subclass `Collection` or one of its subclasses
 166       directly.
 167    """
 168
 169    def __init__(self, parent=None):
 170        self.parent = parent
 171        self._committed = False
 172        self._has_remote_blocks = False
 173        self._callback = None
 174        self._items = {}
 175
 176    def _my_api(self):
 177        raise NotImplementedError()
 178
 179    def _my_keep(self):
 180        raise NotImplementedError()
 181
 182    def _my_block_manager(self):
 183        raise NotImplementedError()
 184
 185    def writable(self) -> bool:
 186        """Indicate whether this collection object can be modified
 187
 188        This method returns `False` if this object is a `CollectionReader`,
 189        else `True`.
 190        """
 191        raise NotImplementedError()
 192
 193    def root_collection(self) -> 'Collection':
 194        """Get this collection's root collection object
 195
 196        If you open a subcollection with `Collection.find`, calling this method
 197        on that subcollection returns the source Collection object.
 198        """
 199        raise NotImplementedError()
 200
 201    def stream_name(self) -> str:
 202        """Get the name of the manifest stream represented by this collection
 203
 204        If you open a subcollection with `Collection.find`, calling this method
 205        on that subcollection returns the name of the stream you opened.
 206        """
 207        raise NotImplementedError()
 208
 209    @synchronized
 210    def has_remote_blocks(self) -> bool:
 211        """Indiciate whether the collection refers to remote data
 212
 213        Returns `True` if the collection manifest includes any Keep locators
 214        with a remote hint (`+R`), else `False`.
 215        """
 216        if self._has_remote_blocks:
 217            return True
 218        for item in self:
 219            if self[item].has_remote_blocks():
 220                return True
 221        return False
 222
 223    @synchronized
 224    def set_has_remote_blocks(self, val: bool) -> None:
 225        """Cache whether this collection refers to remote blocks
 226
 227        .. ATTENTION:: Internal
 228           This method is only meant to be used by other Collection methods.
 229
 230        Set this collection's cached "has remote blocks" flag to the given
 231        value.
 232        """
 233        self._has_remote_blocks = val
 234        if self.parent:
 235            self.parent.set_has_remote_blocks(val)
 236
 237    @must_be_writable
 238    @synchronized
 239    def find_or_create(
 240            self,
 241            path: str,
 242            create_type: CreateType,
 243    ) -> CollectionItem:
 244        """Get the item at the given path, creating it if necessary
 245
 246        If `path` refers to a stream in this collection, returns a
 247        corresponding `Subcollection` object. If `path` refers to a file in
 248        this collection, returns a corresponding
 249        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 250        this collection, then this method creates a new object and returns
 251        it, creating parent streams as needed. The type of object created is
 252        determined by the value of `create_type`.
 253
 254        Arguments:
 255
 256        * path: str --- The path to find or create within this collection.
 257
 258        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 259          create at `path` if one does not exist. Passing `COLLECTION`
 260          creates a stream and returns the corresponding
 261          `Subcollection`. Passing `FILE` creates a new file and returns the
 262          corresponding `arvados.arvfile.ArvadosFile`.
 263        """
 264        pathcomponents = path.split("/", 1)
 265        if pathcomponents[0]:
 266            item = self._items.get(pathcomponents[0])
 267            if len(pathcomponents) == 1:
 268                if item is None:
 269                    # create new file
 270                    if create_type == COLLECTION:
 271                        item = Subcollection(self, pathcomponents[0])
 272                    else:
 273                        item = ArvadosFile(self, pathcomponents[0])
 274                    self._items[pathcomponents[0]] = item
 275                    self.set_committed(False)
 276                    self.notify(ADD, self, pathcomponents[0], item)
 277                return item
 278            else:
 279                if item is None:
 280                    # create new collection
 281                    item = Subcollection(self, pathcomponents[0])
 282                    self._items[pathcomponents[0]] = item
 283                    self.set_committed(False)
 284                    self.notify(ADD, self, pathcomponents[0], item)
 285                if isinstance(item, RichCollectionBase):
 286                    return item.find_or_create(pathcomponents[1], create_type)
 287                else:
 288                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 289        else:
 290            return self
 291
 292    @synchronized
 293    def find(self, path: str) -> CollectionItem:
 294        """Get the item at the given path
 295
 296        If `path` refers to a stream in this collection, returns a
 297        corresponding `Subcollection` object. If `path` refers to a file in
 298        this collection, returns a corresponding
 299        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 300        this collection, then this method raises `NotADirectoryError`.
 301
 302        Arguments:
 303
 304        * path: str --- The path to find or create within this collection.
 305        """
 306        if not path:
 307            raise errors.ArgumentError("Parameter 'path' is empty.")
 308
 309        pathcomponents = path.split("/", 1)
 310        if pathcomponents[0] == '':
 311            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 312
 313        item = self._items.get(pathcomponents[0])
 314        if item is None:
 315            return None
 316        elif len(pathcomponents) == 1:
 317            return item
 318        else:
 319            if isinstance(item, RichCollectionBase):
 320                if pathcomponents[1]:
 321                    return item.find(pathcomponents[1])
 322                else:
 323                    return item
 324            else:
 325                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 326
 327    @synchronized
 328    def mkdirs(self, path: str) -> 'Subcollection':
 329        """Create and return a subcollection at `path`
 330
 331        If `path` exists within this collection, raises `FileExistsError`.
 332        Otherwise, creates a stream at that path and returns the
 333        corresponding `Subcollection`.
 334        """
 335        if self.find(path) != None:
 336            raise IOError(errno.EEXIST, "Directory or file exists", path)
 337
 338        return self.find_or_create(path, COLLECTION)
 339
 340    def open(
 341            self,
 342            path: str,
 343            mode: str="r",
 344            encoding: Optional[str]=None,
 345    ) -> IO:
 346        """Open a file-like object within the collection
 347
 348        This method returns a file-like object that can read and/or write the
 349        file located at `path` within the collection. If you attempt to write
 350        a `path` that does not exist, the file is created with `find_or_create`.
 351        If the file cannot be opened for any other reason, this method raises
 352        `OSError` with an appropriate errno.
 353
 354        Arguments:
 355
 356        * path: str --- The path of the file to open within this collection
 357
 358        * mode: str --- The mode to open this file. Supports all the same
 359          values as `builtins.open`.
 360
 361        * encoding: str | None --- The text encoding of the file. Only used
 362          when the file is opened in text mode. The default is
 363          platform-dependent.
 364        """
 365        if not re.search(r'^[rwa][bt]?\+?$', mode):
 366            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 367
 368        if mode[0] == 'r' and '+' not in mode:
 369            fclass = ArvadosFileReader
 370            arvfile = self.find(path)
 371        elif not self.writable():
 372            raise IOError(errno.EROFS, "Collection is read only")
 373        else:
 374            fclass = ArvadosFileWriter
 375            arvfile = self.find_or_create(path, FILE)
 376
 377        if arvfile is None:
 378            raise IOError(errno.ENOENT, "File not found", path)
 379        if not isinstance(arvfile, ArvadosFile):
 380            raise IOError(errno.EISDIR, "Is a directory", path)
 381
 382        if mode[0] == 'w':
 383            arvfile.truncate(0)
 384
 385        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 386        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 387        if 'b' not in mode:
 388            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 389            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 390        return f
 391
 392    def modified(self) -> bool:
 393        """Indicate whether this collection has an API server record
 394
 395        Returns `False` if this collection corresponds to a record loaded from
 396        the API server, `True` otherwise.
 397        """
 398        return not self.committed()
 399
 400    @synchronized
 401    def committed(self):
 402        """Indicate whether this collection has an API server record
 403
 404        Returns `True` if this collection corresponds to a record loaded from
 405        the API server, `False` otherwise.
 406        """
 407        return self._committed
 408
 409    @synchronized
 410    def set_committed(self, value: bool=True):
 411        """Cache whether this collection has an API server record
 412
 413        .. ATTENTION:: Internal
 414           This method is only meant to be used by other Collection methods.
 415
 416        Set this collection's cached "committed" flag to the given
 417        value and propagates it as needed.
 418        """
 419        if value == self._committed:
 420            return
 421        if value:
 422            for k,v in listitems(self._items):
 423                v.set_committed(True)
 424            self._committed = True
 425        else:
 426            self._committed = False
 427            if self.parent is not None:
 428                self.parent.set_committed(False)
 429
 430    @synchronized
 431    def __iter__(self) -> Iterator[str]:
 432        """Iterate names of streams and files in this collection
 433
 434        This method does not recurse. It only iterates the contents of this
 435        collection's corresponding stream.
 436        """
 437        return iter(viewkeys(self._items))
 438
 439    @synchronized
 440    def __getitem__(self, k: str) -> CollectionItem:
 441        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 442
 443        This method does not recurse. If you want to search a path, use
 444        `RichCollectionBase.find` instead.
 445        """
 446        return self._items[k]
 447
 448    @synchronized
 449    def __contains__(self, k: str) -> bool:
 450        """Indicate whether this collection has an item with this name
 451
 452        This method does not recurse. It you want to check a path, use
 453        `RichCollectionBase.exists` instead.
 454        """
 455        return k in self._items
 456
 457    @synchronized
 458    def __len__(self):
 459        """Get the number of items directly contained in this collection
 460
 461        This method does not recurse. It only counts the streams and files
 462        in this collection's corresponding stream.
 463        """
 464        return len(self._items)
 465
 466    @must_be_writable
 467    @synchronized
 468    def __delitem__(self, p: str) -> None:
 469        """Delete an item from this collection's stream
 470
 471        This method does not recurse. If you want to remove an item by a
 472        path, use `RichCollectionBase.remove` instead.
 473        """
 474        del self._items[p]
 475        self.set_committed(False)
 476        self.notify(DEL, self, p, None)
 477
 478    @synchronized
 479    def keys(self) -> Iterator[str]:
 480        """Iterate names of streams and files in this collection
 481
 482        This method does not recurse. It only iterates the contents of this
 483        collection's corresponding stream.
 484        """
 485        return self._items.keys()
 486
 487    @synchronized
 488    def values(self) -> List[CollectionItem]:
 489        """Get a list of objects in this collection's stream
 490
 491        The return value includes a `Subcollection` for every stream, and an
 492        `arvados.arvfile.ArvadosFile` for every file, directly within this
 493        collection's stream.  This method does not recurse.
 494        """
 495        return listvalues(self._items)
 496
 497    @synchronized
 498    def items(self) -> List[Tuple[str, CollectionItem]]:
 499        """Get a list of `(name, object)` tuples from this collection's stream
 500
 501        The return value includes a `Subcollection` for every stream, and an
 502        `arvados.arvfile.ArvadosFile` for every file, directly within this
 503        collection's stream.  This method does not recurse.
 504        """
 505        return listitems(self._items)
 506
 507    def exists(self, path: str) -> bool:
 508        """Indicate whether this collection includes an item at `path`
 509
 510        This method returns `True` if `path` refers to a stream or file within
 511        this collection, else `False`.
 512
 513        Arguments:
 514
 515        * path: str --- The path to check for existence within this collection
 516        """
 517        return self.find(path) is not None
 518
 519    @must_be_writable
 520    @synchronized
 521    def remove(self, path: str, recursive: bool=False) -> None:
 522        """Remove the file or stream at `path`
 523
 524        Arguments:
 525
 526        * path: str --- The path of the item to remove from the collection
 527
 528        * recursive: bool --- Controls the method's behavior if `path` refers
 529          to a nonempty stream. If `False` (the default), this method raises
 530          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 531          items under the stream.
 532        """
 533        if not path:
 534            raise errors.ArgumentError("Parameter 'path' is empty.")
 535
 536        pathcomponents = path.split("/", 1)
 537        item = self._items.get(pathcomponents[0])
 538        if item is None:
 539            raise IOError(errno.ENOENT, "File not found", path)
 540        if len(pathcomponents) == 1:
 541            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 542                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 543            deleteditem = self._items[pathcomponents[0]]
 544            del self._items[pathcomponents[0]]
 545            self.set_committed(False)
 546            self.notify(DEL, self, pathcomponents[0], deleteditem)
 547        else:
 548            item.remove(pathcomponents[1], recursive=recursive)
 549
 550    def _clonefrom(self, source):
 551        for k,v in listitems(source):
 552            self._items[k] = v.clone(self, k)
 553
 554    def clone(self):
 555        raise NotImplementedError()
 556
 557    @must_be_writable
 558    @synchronized
 559    def add(
 560            self,
 561            source_obj: CollectionItem,
 562            target_name: str,
 563            overwrite: bool=False,
 564            reparent: bool=False,
 565    ) -> None:
 566        """Copy or move a file or subcollection object to this collection
 567
 568        Arguments:
 569
 570        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 571          to add to this collection
 572
 573        * target_name: str --- The path inside this collection where
 574          `source_obj` should be added.
 575
 576        * overwrite: bool --- Controls the behavior of this method when the
 577          collection already contains an object at `target_name`. If `False`
 578          (the default), this method will raise `FileExistsError`. If `True`,
 579          the object at `target_name` will be replaced with `source_obj`.
 580
 581        * reparent: bool --- Controls whether this method copies or moves
 582          `source_obj`. If `False` (the default), `source_obj` is copied into
 583          this collection. If `True`, `source_obj` is moved into this
 584          collection.
 585        """
 586        if target_name in self and not overwrite:
 587            raise IOError(errno.EEXIST, "File already exists", target_name)
 588
 589        modified_from = None
 590        if target_name in self:
 591            modified_from = self[target_name]
 592
 593        # Actually make the move or copy.
 594        if reparent:
 595            source_obj._reparent(self, target_name)
 596            item = source_obj
 597        else:
 598            item = source_obj.clone(self, target_name)
 599
 600        self._items[target_name] = item
 601        self.set_committed(False)
 602        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 603            self.set_has_remote_blocks(True)
 604
 605        if modified_from:
 606            self.notify(MOD, self, target_name, (modified_from, item))
 607        else:
 608            self.notify(ADD, self, target_name, item)
 609
 610    def _get_src_target(self, source, target_path, source_collection, create_dest):
 611        if source_collection is None:
 612            source_collection = self
 613
 614        # Find the object
 615        if isinstance(source, basestring):
 616            source_obj = source_collection.find(source)
 617            if source_obj is None:
 618                raise IOError(errno.ENOENT, "File not found", source)
 619            sourcecomponents = source.split("/")
 620        else:
 621            source_obj = source
 622            sourcecomponents = None
 623
 624        # Find parent collection the target path
 625        targetcomponents = target_path.split("/")
 626
 627        # Determine the name to use.
 628        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 629
 630        if not target_name:
 631            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 632
 633        if create_dest:
 634            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 635        else:
 636            if len(targetcomponents) > 1:
 637                target_dir = self.find("/".join(targetcomponents[0:-1]))
 638            else:
 639                target_dir = self
 640
 641        if target_dir is None:
 642            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 643
 644        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 645            target_dir = target_dir[target_name]
 646            target_name = sourcecomponents[-1]
 647
 648        return (source_obj, target_dir, target_name)
 649
 650    @must_be_writable
 651    @synchronized
 652    def copy(
 653            self,
 654            source: Union[str, CollectionItem],
 655            target_path: str,
 656            source_collection: Optional['RichCollectionBase']=None,
 657            overwrite: bool=False,
 658    ) -> None:
 659        """Copy a file or subcollection object to this collection
 660
 661        Arguments:
 662
 663        * source: str | arvados.arvfile.ArvadosFile |
 664          arvados.collection.Subcollection --- The file or subcollection to
 665          add to this collection. If `source` is a str, the object will be
 666          found by looking up this path from `source_collection` (see
 667          below).
 668
 669        * target_path: str --- The path inside this collection where the
 670          source object should be added.
 671
 672        * source_collection: arvados.collection.Collection | None --- The
 673          collection to find the source object from when `source` is a
 674          path. Defaults to the current collection (`self`).
 675
 676        * overwrite: bool --- Controls the behavior of this method when the
 677          collection already contains an object at `target_path`. If `False`
 678          (the default), this method will raise `FileExistsError`. If `True`,
 679          the object at `target_path` will be replaced with `source_obj`.
 680        """
 681        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 682        target_dir.add(source_obj, target_name, overwrite, False)
 683
 684    @must_be_writable
 685    @synchronized
 686    def rename(
 687            self,
 688            source: Union[str, CollectionItem],
 689            target_path: str,
 690            source_collection: Optional['RichCollectionBase']=None,
 691            overwrite: bool=False,
 692    ) -> None:
 693        """Move a file or subcollection object to this collection
 694
 695        Arguments:
 696
 697        * source: str | arvados.arvfile.ArvadosFile |
 698          arvados.collection.Subcollection --- The file or subcollection to
 699          add to this collection. If `source` is a str, the object will be
 700          found by looking up this path from `source_collection` (see
 701          below).
 702
 703        * target_path: str --- The path inside this collection where the
 704          source object should be added.
 705
 706        * source_collection: arvados.collection.Collection | None --- The
 707          collection to find the source object from when `source` is a
 708          path. Defaults to the current collection (`self`).
 709
 710        * overwrite: bool --- Controls the behavior of this method when the
 711          collection already contains an object at `target_path`. If `False`
 712          (the default), this method will raise `FileExistsError`. If `True`,
 713          the object at `target_path` will be replaced with `source_obj`.
 714        """
 715        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 716        if not source_obj.writable():
 717            raise IOError(errno.EROFS, "Source collection is read only", source)
 718        target_dir.add(source_obj, target_name, overwrite, True)
 719
 720    def portable_manifest_text(self, stream_name: str=".") -> str:
 721        """Get the portable manifest text for this collection
 722
 723        The portable manifest text is normalized, and does not include access
 724        tokens. This method does not flush outstanding blocks to Keep.
 725
 726        Arguments:
 727
 728        * stream_name: str --- The name to use for this collection's stream in
 729          the generated manifest. Default `'.'`.
 730        """
 731        return self._get_manifest_text(stream_name, True, True)
 732
 733    @synchronized
 734    def manifest_text(
 735            self,
 736            stream_name: str=".",
 737            strip: bool=False,
 738            normalize: bool=False,
 739            only_committed: bool=False,
 740    ) -> str:
 741        """Get the manifest text for this collection
 742
 743        Arguments:
 744
 745        * stream_name: str --- The name to use for this collection's stream in
 746          the generated manifest. Default `'.'`.
 747
 748        * strip: bool --- Controls whether or not the returned manifest text
 749          includes access tokens. If `False` (the default), the manifest text
 750          will include access tokens. If `True`, the manifest text will not
 751          include access tokens.
 752
 753        * normalize: bool --- Controls whether or not the returned manifest
 754          text is normalized. Default `False`.
 755
 756        * only_committed: bool --- Controls whether or not this method uploads
 757          pending data to Keep before building and returning the manifest text.
 758          If `False` (the default), this method will finish uploading all data
 759          to Keep, then return the final manifest. If `True`, this method will
 760          build and return a manifest that only refers to the data that has
 761          finished uploading at the time this method was called.
 762        """
 763        if not only_committed:
 764            self._my_block_manager().commit_all()
 765        return self._get_manifest_text(stream_name, strip, normalize,
 766                                       only_committed=only_committed)
 767
 768    @synchronized
 769    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 770        """Get the manifest text for this collection, sub collections and files.
 771
 772        :stream_name:
 773          Name to use for this stream (directory)
 774
 775        :strip:
 776          If True, remove signing tokens from block locators if present.
 777          If False (default), block locators are left unchanged.
 778
 779        :normalize:
 780          If True, always export the manifest text in normalized form
 781          even if the Collection is not modified.  If False (default) and the collection
 782          is not modified, return the original manifest text even if it is not
 783          in normalized form.
 784
 785        :only_committed:
 786          If True, only include blocks that were already committed to Keep.
 787
 788        """
 789
 790        if not self.committed() or self._manifest_text is None or normalize:
 791            stream = {}
 792            buf = []
 793            sorted_keys = sorted(self.keys())
 794            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 795                # Create a stream per file `k`
 796                arvfile = self[filename]
 797                filestream = []
 798                for segment in arvfile.segments():
 799                    loc = segment.locator
 800                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 801                        if only_committed:
 802                            continue
 803                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 804                    if strip:
 805                        loc = KeepLocator(loc).stripped()
 806                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
 807                                         segment.segment_offset, segment.range_size))
 808                stream[filename] = filestream
 809            if stream:
 810                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
 811            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 812                buf.append(self[dirname].manifest_text(
 813                    stream_name=os.path.join(stream_name, dirname),
 814                    strip=strip, normalize=True, only_committed=only_committed))
 815            return "".join(buf)
 816        else:
 817            if strip:
 818                return self.stripped_manifest()
 819            else:
 820                return self._manifest_text
 821
 822    @synchronized
 823    def _copy_remote_blocks(self, remote_blocks={}):
 824        """Scan through the entire collection and ask Keep to copy remote blocks.
 825
 826        When accessing a remote collection, blocks will have a remote signature
 827        (+R instead of +A). Collect these signatures and request Keep to copy the
 828        blocks to the local cluster, returning local (+A) signatures.
 829
 830        :remote_blocks:
 831          Shared cache of remote to local block mappings. This is used to avoid
 832          doing extra work when blocks are shared by more than one file in
 833          different subdirectories.
 834
 835        """
 836        for item in self:
 837            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 838        return remote_blocks
 839
 840    @synchronized
 841    def diff(
 842            self,
 843            end_collection: 'RichCollectionBase',
 844            prefix: str=".",
 845            holding_collection: Optional['Collection']=None,
 846    ) -> ChangeList:
 847        """Build a list of differences between this collection and another
 848
 849        Arguments:
 850
 851        * end_collection: arvados.collection.RichCollectionBase --- A
 852          collection object with the desired end state. The returned diff
 853          list will describe how to go from the current collection object
 854          `self` to `end_collection`.
 855
 856        * prefix: str --- The name to use for this collection's stream in
 857          the diff list. Default `'.'`.
 858
 859        * holding_collection: arvados.collection.Collection | None --- A
 860          collection object used to hold objects for the returned diff
 861          list. By default, a new empty collection is created.
 862        """
 863        changes = []
 864        if holding_collection is None:
 865            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 866        for k in self:
 867            if k not in end_collection:
 868               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 869        for k in end_collection:
 870            if k in self:
 871                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 872                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 873                elif end_collection[k] != self[k]:
 874                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 875                else:
 876                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 877            else:
 878                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 879        return changes
 880
 881    @must_be_writable
 882    @synchronized
 883    def apply(self, changes: ChangeList) -> None:
 884        """Apply a list of changes from to this collection
 885
 886        This method takes a list of changes generated by
 887        `RichCollectionBase.diff` and applies it to this
 888        collection. Afterward, the state of this collection object will
 889        match the state of `end_collection` passed to `diff`. If a change
 890        conflicts with a local change, it will be saved to an alternate path
 891        indicating the conflict.
 892
 893        Arguments:
 894
 895        * changes: arvados.collection.ChangeList --- The list of differences
 896          generated by `RichCollectionBase.diff`.
 897        """
 898        if changes:
 899            self.set_committed(False)
 900        for change in changes:
 901            event_type = change[0]
 902            path = change[1]
 903            initial = change[2]
 904            local = self.find(path)
 905            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 906                                                                    time.gmtime()))
 907            if event_type == ADD:
 908                if local is None:
 909                    # No local file at path, safe to copy over new file
 910                    self.copy(initial, path)
 911                elif local is not None and local != initial:
 912                    # There is already local file and it is different:
 913                    # save change to conflict file.
 914                    self.copy(initial, conflictpath)
 915            elif event_type == MOD or event_type == TOK:
 916                final = change[3]
 917                if local == initial:
 918                    # Local matches the "initial" item so it has not
 919                    # changed locally and is safe to update.
 920                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 921                        # Replace contents of local file with new contents
 922                        local.replace_contents(final)
 923                    else:
 924                        # Overwrite path with new item; this can happen if
 925                        # path was a file and is now a collection or vice versa
 926                        self.copy(final, path, overwrite=True)
 927                else:
 928                    # Local is missing (presumably deleted) or local doesn't
 929                    # match the "start" value, so save change to conflict file
 930                    self.copy(final, conflictpath)
 931            elif event_type == DEL:
 932                if local == initial:
 933                    # Local item matches "initial" value, so it is safe to remove.
 934                    self.remove(path, recursive=True)
 935                # else, the file is modified or already removed, in either
 936                # case we don't want to try to remove it.
 937
 938    def portable_data_hash(self) -> str:
 939        """Get the portable data hash for this collection's manifest"""
 940        if self._manifest_locator and self.committed():
 941            # If the collection is already saved on the API server, and it's committed
 942            # then return API server's PDH response.
 943            return self._portable_data_hash
 944        else:
 945            stripped = self.portable_manifest_text().encode()
 946            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 947
 948    @synchronized
 949    def subscribe(self, callback: ChangeCallback) -> None:
 950        """Set a notify callback for changes to this collection
 951
 952        Arguments:
 953
 954        * callback: arvados.collection.ChangeCallback --- The callable to
 955          call each time the collection is changed.
 956        """
 957        if self._callback is None:
 958            self._callback = callback
 959        else:
 960            raise errors.ArgumentError("A callback is already set on this collection.")
 961
 962    @synchronized
 963    def unsubscribe(self) -> None:
 964        """Remove any notify callback set for changes to this collection"""
 965        if self._callback is not None:
 966            self._callback = None
 967
 968    @synchronized
 969    def notify(
 970            self,
 971            event: ChangeType,
 972            collection: 'RichCollectionBase',
 973            name: str,
 974            item: CollectionItem,
 975    ) -> None:
 976        """Notify any subscribed callback about a change to this collection
 977
 978        .. ATTENTION:: Internal
 979           This method is only meant to be used by other Collection methods.
 980
 981        If a callback has been registered with `RichCollectionBase.subscribe`,
 982        it will be called with information about a change to this collection.
 983        Then this notification will be propagated to this collection's root.
 984
 985        Arguments:
 986
 987        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 988          the collection.
 989
 990        * collection: arvados.collection.RichCollectionBase --- The
 991          collection that was modified.
 992
 993        * name: str --- The name of the file or stream within `collection` that
 994          was modified.
 995
 996        * item: arvados.arvfile.ArvadosFile |
 997          arvados.collection.Subcollection --- The new contents at `name`
 998          within `collection`.
 999        """
1000        if self._callback:
1001            self._callback(event, collection, name, item)
1002        self.root_collection().notify(event, collection, name, item)
1003
1004    @synchronized
1005    def __eq__(self, other: Any) -> bool:
1006        """Indicate whether this collection object is equal to another"""
1007        if other is self:
1008            return True
1009        if not isinstance(other, RichCollectionBase):
1010            return False
1011        if len(self._items) != len(other):
1012            return False
1013        for k in self._items:
1014            if k not in other:
1015                return False
1016            if self._items[k] != other[k]:
1017                return False
1018        return True
1019
1020    def __ne__(self, other: Any) -> bool:
1021        """Indicate whether this collection object is not equal to another"""
1022        return not self.__eq__(other)
1023
1024    @synchronized
1025    def flush(self) -> None:
1026        """Upload any pending data to Keep"""
1027        for e in listvalues(self):
1028            e.flush()
1029
1030
1031class Collection(RichCollectionBase):
1032    """Read and manipulate an Arvados collection
1033
1034    This class provides a high-level interface to create, read, and update
1035    Arvados collections and their contents. Refer to the Arvados Python SDK
1036    cookbook for [an introduction to using the Collection class][cookbook].
1037
1038    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1039    """
1040
1041    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1042                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1043                 keep_client: Optional['arvados.keep.KeepClient']=None,
1044                 num_retries: int=10,
1045                 parent: Optional['Collection']=None,
1046                 apiconfig: Optional[Mapping[str, str]]=None,
1047                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1048                 replication_desired: Optional[int]=None,
1049                 storage_classes_desired: Optional[List[str]]=None,
1050                 put_threads: Optional[int]=None):
1051        """Initialize a Collection object
1052
1053        Arguments:
1054
1055        * manifest_locator_or_text: str | None --- This string can contain a
1056          collection manifest text, portable data hash, or UUID. When given a
1057          portable data hash or UUID, this instance will load a collection
1058          record from the API server. Otherwise, this instance will represent a
1059          new collection without an API server record. The default value `None`
1060          instantiates a new collection with an empty manifest.
1061
1062        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1063          Arvados API client object this instance uses to make requests. If
1064          none is given, this instance creates its own client using the
1065          settings from `apiconfig` (see below). If your client instantiates
1066          many Collection objects, you can help limit memory utilization by
1067          calling `arvados.api.api` to construct an
1068          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1069          for every Collection.
1070
1071        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1072          object this instance uses to make requests. If none is given, this
1073          instance creates its own client using its `api_client`.
1074
1075        * num_retries: int --- The number of times that client requests are
1076          retried. Default 10.
1077
1078        * parent: arvados.collection.Collection | None --- The parent Collection
1079          object of this instance, if any. This argument is primarily used by
1080          other Collection methods; user client code shouldn't need to use it.
1081
1082        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1083          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1084          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1085          Collection object constructs one from these settings. If no
1086          mapping is provided, calls `arvados.config.settings` to get these
1087          parameters from user configuration.
1088
1089        * block_manager: arvados.arvfile._BlockManager | None --- The
1090          _BlockManager object used by this instance to coordinate reading
1091          and writing Keep data blocks. If none is given, this instance
1092          constructs its own. This argument is primarily used by other
1093          Collection methods; user client code shouldn't need to use it.
1094
1095        * replication_desired: int | None --- This controls both the value of
1096          the `replication_desired` field on API collection records saved by
1097          this class, as well as the number of Keep services that the object
1098          writes new data blocks to. If none is given, uses the default value
1099          configured for the cluster.
1100
1101        * storage_classes_desired: list[str] | None --- This controls both
1102          the value of the `storage_classes_desired` field on API collection
1103          records saved by this class, as well as selecting which specific
1104          Keep services the object writes new data blocks to. If none is
1105          given, defaults to an empty list.
1106
1107        * put_threads: int | None --- The number of threads to run
1108          simultaneously to upload data blocks to Keep. This value is used when
1109          building a new `block_manager`. It is unused when a `block_manager`
1110          is provided.
1111        """
1112
1113        if storage_classes_desired and type(storage_classes_desired) is not list:
1114            raise errors.ArgumentError("storage_classes_desired must be list type.")
1115
1116        super(Collection, self).__init__(parent)
1117        self._api_client = api_client
1118        self._keep_client = keep_client
1119
1120        # Use the keep client from ThreadSafeApiCache
1121        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1122            self._keep_client = self._api_client.keep
1123
1124        self._block_manager = block_manager
1125        self.replication_desired = replication_desired
1126        self._storage_classes_desired = storage_classes_desired
1127        self.put_threads = put_threads
1128
1129        if apiconfig:
1130            self._config = apiconfig
1131        else:
1132            self._config = config.settings()
1133
1134        self.num_retries = num_retries
1135        self._manifest_locator = None
1136        self._manifest_text = None
1137        self._portable_data_hash = None
1138        self._api_response = None
1139        self._past_versions = set()
1140
1141        self.lock = threading.RLock()
1142        self.events = None
1143
1144        if manifest_locator_or_text:
1145            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1146                self._manifest_locator = manifest_locator_or_text
1147            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1148                self._manifest_locator = manifest_locator_or_text
1149                if not self._has_local_collection_uuid():
1150                    self._has_remote_blocks = True
1151            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1152                self._manifest_text = manifest_locator_or_text
1153                if '+R' in self._manifest_text:
1154                    self._has_remote_blocks = True
1155            else:
1156                raise errors.ArgumentError(
1157                    "Argument to CollectionReader is not a manifest or a collection UUID")
1158
1159            try:
1160                self._populate()
1161            except errors.SyntaxError as e:
1162                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1163
1164    def storage_classes_desired(self) -> List[str]:
1165        """Get this collection's `storage_classes_desired` value"""
1166        return self._storage_classes_desired or []
1167
1168    def root_collection(self) -> 'Collection':
1169        return self
1170
1171    def get_properties(self) -> Properties:
1172        """Get this collection's properties
1173
1174        This method always returns a dict. If this collection object does not
1175        have an associated API record, or that record does not have any
1176        properties set, this method returns an empty dict.
1177        """
1178        if self._api_response and self._api_response["properties"]:
1179            return self._api_response["properties"]
1180        else:
1181            return {}
1182
1183    def get_trash_at(self) -> Optional[datetime.datetime]:
1184        """Get this collection's `trash_at` field
1185
1186        This method parses the `trash_at` field of the collection's API
1187        record and returns a datetime from it. If that field is not set, or
1188        this collection object does not have an associated API record,
1189        returns None.
1190        """
1191        if self._api_response and self._api_response["trash_at"]:
1192            try:
1193                return ciso8601.parse_datetime(self._api_response["trash_at"])
1194            except ValueError:
1195                return None
1196        else:
1197            return None
1198
1199    def stream_name(self) -> str:
1200        return "."
1201
1202    def writable(self) -> bool:
1203        return True
1204
1205    @synchronized
1206    def known_past_version(
1207            self,
1208            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1209    ) -> bool:
1210        """Indicate whether an API record for this collection has been seen before
1211
1212        As this collection object loads records from the API server, it records
1213        their `modified_at` and `portable_data_hash` fields. This method accepts
1214        a 2-tuple with values for those fields, and returns `True` if the
1215        combination was previously loaded.
1216        """
1217        return modified_at_and_portable_data_hash in self._past_versions
1218
1219    @synchronized
1220    @retry_method
1221    def update(
1222            self,
1223            other: Optional['Collection']=None,
1224            num_retries: Optional[int]=None,
1225    ) -> None:
1226        """Merge another collection's contents into this one
1227
1228        This method compares the manifest of this collection instance with
1229        another, then updates this instance's manifest with changes from the
1230        other, renaming files to flag conflicts where necessary.
1231
1232        When called without any arguments, this method reloads the collection's
1233        API record, and updates this instance with any changes that have
1234        appeared server-side. If this instance does not have a corresponding
1235        API record, this method raises `arvados.errors.ArgumentError`.
1236
1237        Arguments:
1238
1239        * other: arvados.collection.Collection | None --- The collection
1240          whose contents should be merged into this instance. When not
1241          provided, this method reloads this collection's API record and
1242          constructs a Collection object from it.  If this instance does not
1243          have a corresponding API record, this method raises
1244          `arvados.errors.ArgumentError`.
1245
1246        * num_retries: int | None --- The number of times to retry reloading
1247          the collection's API record from the API server. If not specified,
1248          uses the `num_retries` provided when this instance was constructed.
1249        """
1250        if other is None:
1251            if self._manifest_locator is None:
1252                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1253            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1254            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1255                response.get("portable_data_hash") != self.portable_data_hash()):
1256                # The record on the server is different from our current one, but we've seen it before,
1257                # so ignore it because it's already been merged.
1258                # However, if it's the same as our current record, proceed with the update, because we want to update
1259                # our tokens.
1260                return
1261            else:
1262                self._remember_api_response(response)
1263            other = CollectionReader(response["manifest_text"])
1264        baseline = CollectionReader(self._manifest_text)
1265        self.apply(baseline.diff(other))
1266        self._manifest_text = self.manifest_text()
1267
1268    @synchronized
1269    def _my_api(self):
1270        if self._api_client is None:
1271            self._api_client = ThreadSafeApiCache(self._config, version='v1')
1272            if self._keep_client is None:
1273                self._keep_client = self._api_client.keep
1274        return self._api_client
1275
1276    @synchronized
1277    def _my_keep(self):
1278        if self._keep_client is None:
1279            if self._api_client is None:
1280                self._my_api()
1281            else:
1282                self._keep_client = KeepClient(api_client=self._api_client)
1283        return self._keep_client
1284
1285    @synchronized
1286    def _my_block_manager(self):
1287        if self._block_manager is None:
1288            copies = (self.replication_desired or
1289                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1290                                                   2))
1291            self._block_manager = _BlockManager(self._my_keep(),
1292                                                copies=copies,
1293                                                put_threads=self.put_threads,
1294                                                num_retries=self.num_retries,
1295                                                storage_classes_func=self.storage_classes_desired)
1296        return self._block_manager
1297
1298    def _remember_api_response(self, response):
1299        self._api_response = response
1300        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1301
1302    def _populate_from_api_server(self):
1303        # As in KeepClient itself, we must wait until the last
1304        # possible moment to instantiate an API client, in order to
1305        # avoid tripping up clients that don't have access to an API
1306        # server.  If we do build one, make sure our Keep client uses
1307        # it.  If instantiation fails, we'll fall back to the except
1308        # clause, just like any other Collection lookup
1309        # failure. Return an exception, or None if successful.
1310        self._remember_api_response(self._my_api().collections().get(
1311            uuid=self._manifest_locator).execute(
1312                num_retries=self.num_retries))
1313        self._manifest_text = self._api_response['manifest_text']
1314        self._portable_data_hash = self._api_response['portable_data_hash']
1315        # If not overriden via kwargs, we should try to load the
1316        # replication_desired and storage_classes_desired from the API server
1317        if self.replication_desired is None:
1318            self.replication_desired = self._api_response.get('replication_desired', None)
1319        if self._storage_classes_desired is None:
1320            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1321
1322    def _populate(self):
1323        if self._manifest_text is None:
1324            if self._manifest_locator is None:
1325                return
1326            else:
1327                self._populate_from_api_server()
1328        self._baseline_manifest = self._manifest_text
1329        self._import_manifest(self._manifest_text)
1330
1331    def _has_collection_uuid(self):
1332        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1333
1334    def _has_local_collection_uuid(self):
1335        return self._has_collection_uuid and \
1336            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1337
1338    def __enter__(self):
1339        return self
1340
1341    def __exit__(self, exc_type, exc_value, traceback):
1342        """Exit a context with this collection instance
1343
1344        If no exception was raised inside the context block, and this
1345        collection is writable and has a corresponding API record, that
1346        record will be updated to match the state of this instance at the end
1347        of the block.
1348        """
1349        if exc_type is None:
1350            if self.writable() and self._has_collection_uuid():
1351                self.save()
1352        self.stop_threads()
1353
1354    def stop_threads(self) -> None:
1355        """Stop background Keep upload/download threads"""
1356        if self._block_manager is not None:
1357            self._block_manager.stop_threads()
1358
1359    @synchronized
1360    def manifest_locator(self) -> Optional[str]:
1361        """Get this collection's manifest locator, if any
1362
1363        * If this collection instance is associated with an API record with a
1364          UUID, return that.
1365        * Otherwise, if this collection instance was loaded from an API record
1366          by portable data hash, return that.
1367        * Otherwise, return `None`.
1368        """
1369        return self._manifest_locator
1370
1371    @synchronized
1372    def clone(
1373            self,
1374            new_parent: Optional['Collection']=None,
1375            new_name: Optional[str]=None,
1376            readonly: bool=False,
1377            new_config: Optional[Mapping[str, str]]=None,
1378    ) -> 'Collection':
1379        """Create a Collection object with the same contents as this instance
1380
1381        This method creates a new Collection object with contents that match
1382        this instance's. The new collection will not be associated with any API
1383        record.
1384
1385        Arguments:
1386
1387        * new_parent: arvados.collection.Collection | None --- This value is
1388          passed to the new Collection's constructor as the `parent`
1389          argument.
1390
1391        * new_name: str | None --- This value is unused.
1392
1393        * readonly: bool --- If this value is true, this method constructs and
1394          returns a `CollectionReader`. Otherwise, it returns a mutable
1395          `Collection`. Default `False`.
1396
1397        * new_config: Mapping[str, str] | None --- This value is passed to the
1398          new Collection's constructor as `apiconfig`. If no value is provided,
1399          defaults to the configuration passed to this instance's constructor.
1400        """
1401        if new_config is None:
1402            new_config = self._config
1403        if readonly:
1404            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1405        else:
1406            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1407
1408        newcollection._clonefrom(self)
1409        return newcollection
1410
1411    @synchronized
1412    def api_response(self) -> Optional[Dict[str, Any]]:
1413        """Get this instance's associated API record
1414
1415        If this Collection instance has an associated API record, return it.
1416        Otherwise, return `None`.
1417        """
1418        return self._api_response
1419
1420    def find_or_create(
1421            self,
1422            path: str,
1423            create_type: CreateType,
1424    ) -> CollectionItem:
1425        if path == ".":
1426            return self
1427        else:
1428            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1429
1430    def find(self, path: str) -> CollectionItem:
1431        if path == ".":
1432            return self
1433        else:
1434            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1435
1436    def remove(self, path: str, recursive: bool=False) -> None:
1437        if path == ".":
1438            raise errors.ArgumentError("Cannot remove '.'")
1439        else:
1440            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1441
1442    @must_be_writable
1443    @synchronized
1444    @retry_method
1445    def save(
1446            self,
1447            properties: Optional[Properties]=None,
1448            storage_classes: Optional[StorageClasses]=None,
1449            trash_at: Optional[datetime.datetime]=None,
1450            merge: bool=True,
1451            num_retries: Optional[int]=None,
1452            preserve_version: bool=False,
1453    ) -> str:
1454        """Save collection to an existing API record
1455
1456        This method updates the instance's corresponding API record to match
1457        the instance's state. If this instance does not have a corresponding API
1458        record yet, raises `AssertionError`. (To create a new API record, use
1459        `Collection.save_new`.) This method returns the saved collection
1460        manifest.
1461
1462        Arguments:
1463
1464        * properties: dict[str, Any] | None --- If provided, the API record will
1465          be updated with these properties. Note this will completely replace
1466          any existing properties.
1467
1468        * storage_classes: list[str] | None --- If provided, the API record will
1469          be updated with this value in the `storage_classes_desired` field.
1470          This value will also be saved on the instance and used for any
1471          changes that follow.
1472
1473        * trash_at: datetime.datetime | None --- If provided, the API record
1474          will be updated with this value in the `trash_at` field.
1475
1476        * merge: bool --- If `True` (the default), this method will first
1477          reload this collection's API record, and merge any new contents into
1478          this instance before saving changes. See `Collection.update` for
1479          details.
1480
1481        * num_retries: int | None --- The number of times to retry reloading
1482          the collection's API record from the API server. If not specified,
1483          uses the `num_retries` provided when this instance was constructed.
1484
1485        * preserve_version: bool --- This value will be passed to directly
1486          to the underlying API call. If `True`, the Arvados API will
1487          preserve the versions of this collection both immediately before
1488          and after the update. If `True` when the API server is not
1489          configured with collection versioning, this method raises
1490          `arvados.errors.ArgumentError`.
1491        """
1492        if properties and type(properties) is not dict:
1493            raise errors.ArgumentError("properties must be dictionary type.")
1494
1495        if storage_classes and type(storage_classes) is not list:
1496            raise errors.ArgumentError("storage_classes must be list type.")
1497        if storage_classes:
1498            self._storage_classes_desired = storage_classes
1499
1500        if trash_at and type(trash_at) is not datetime.datetime:
1501            raise errors.ArgumentError("trash_at must be datetime type.")
1502
1503        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1504            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1505
1506        body={}
1507        if properties:
1508            body["properties"] = properties
1509        if self.storage_classes_desired():
1510            body["storage_classes_desired"] = self.storage_classes_desired()
1511        if trash_at:
1512            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1513            body["trash_at"] = t
1514        if preserve_version:
1515            body["preserve_version"] = preserve_version
1516
1517        if not self.committed():
1518            if self._has_remote_blocks:
1519                # Copy any remote blocks to the local cluster.
1520                self._copy_remote_blocks(remote_blocks={})
1521                self._has_remote_blocks = False
1522            if not self._has_collection_uuid():
1523                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1524            elif not self._has_local_collection_uuid():
1525                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1526
1527            self._my_block_manager().commit_all()
1528
1529            if merge:
1530                self.update()
1531
1532            text = self.manifest_text(strip=False)
1533            body['manifest_text'] = text
1534
1535            self._remember_api_response(self._my_api().collections().update(
1536                uuid=self._manifest_locator,
1537                body=body
1538                ).execute(num_retries=num_retries))
1539            self._manifest_text = self._api_response["manifest_text"]
1540            self._portable_data_hash = self._api_response["portable_data_hash"]
1541            self.set_committed(True)
1542        elif body:
1543            self._remember_api_response(self._my_api().collections().update(
1544                uuid=self._manifest_locator,
1545                body=body
1546                ).execute(num_retries=num_retries))
1547
1548        return self._manifest_text
1549
1550
1551    @must_be_writable
1552    @synchronized
1553    @retry_method
1554    def save_new(
1555            self,
1556            name: Optional[str]=None,
1557            create_collection_record: bool=True,
1558            owner_uuid: Optional[str]=None,
1559            properties: Optional[Properties]=None,
1560            storage_classes: Optional[StorageClasses]=None,
1561            trash_at: Optional[datetime.datetime]=None,
1562            ensure_unique_name: bool=False,
1563            num_retries: Optional[int]=None,
1564            preserve_version: bool=False,
1565    ):
1566        """Save collection to a new API record
1567
1568        This method finishes uploading new data blocks and (optionally)
1569        creates a new API collection record with the provided data. If a new
1570        record is created, this instance becomes associated with that record
1571        for future updates like `save()`. This method returns the saved
1572        collection manifest.
1573
1574        Arguments:
1575
1576        * name: str | None --- The `name` field to use on the new collection
1577          record. If not specified, a generic default name is generated.
1578
1579        * create_collection_record: bool --- If `True` (the default), creates a
1580          collection record on the API server. If `False`, the method finishes
1581          all data uploads and only returns the resulting collection manifest
1582          without sending it to the API server.
1583
1584        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1585          new collection record.
1586
1587        * properties: dict[str, Any] | None --- The `properties` field to use on
1588          the new collection record.
1589
1590        * storage_classes: list[str] | None --- The
1591          `storage_classes_desired` field to use on the new collection record.
1592
1593        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1594          on the new collection record.
1595
1596        * ensure_unique_name: bool --- This value is passed directly to the
1597          Arvados API when creating the collection record. If `True`, the API
1598          server may modify the submitted `name` to ensure the collection's
1599          `name`+`owner_uuid` combination is unique. If `False` (the default),
1600          if a collection already exists with this same `name`+`owner_uuid`
1601          combination, creating a collection record will raise a validation
1602          error.
1603
1604        * num_retries: int | None --- The number of times to retry reloading
1605          the collection's API record from the API server. If not specified,
1606          uses the `num_retries` provided when this instance was constructed.
1607
1608        * preserve_version: bool --- This value will be passed to directly
1609          to the underlying API call. If `True`, the Arvados API will
1610          preserve the versions of this collection both immediately before
1611          and after the update. If `True` when the API server is not
1612          configured with collection versioning, this method raises
1613          `arvados.errors.ArgumentError`.
1614        """
1615        if properties and type(properties) is not dict:
1616            raise errors.ArgumentError("properties must be dictionary type.")
1617
1618        if storage_classes and type(storage_classes) is not list:
1619            raise errors.ArgumentError("storage_classes must be list type.")
1620
1621        if trash_at and type(trash_at) is not datetime.datetime:
1622            raise errors.ArgumentError("trash_at must be datetime type.")
1623
1624        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1625            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1626
1627        if self._has_remote_blocks:
1628            # Copy any remote blocks to the local cluster.
1629            self._copy_remote_blocks(remote_blocks={})
1630            self._has_remote_blocks = False
1631
1632        if storage_classes:
1633            self._storage_classes_desired = storage_classes
1634
1635        self._my_block_manager().commit_all()
1636        text = self.manifest_text(strip=False)
1637
1638        if create_collection_record:
1639            if name is None:
1640                name = "New collection"
1641                ensure_unique_name = True
1642
1643            body = {"manifest_text": text,
1644                    "name": name,
1645                    "replication_desired": self.replication_desired}
1646            if owner_uuid:
1647                body["owner_uuid"] = owner_uuid
1648            if properties:
1649                body["properties"] = properties
1650            if self.storage_classes_desired():
1651                body["storage_classes_desired"] = self.storage_classes_desired()
1652            if trash_at:
1653                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1654                body["trash_at"] = t
1655            if preserve_version:
1656                body["preserve_version"] = preserve_version
1657
1658            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1659            text = self._api_response["manifest_text"]
1660
1661            self._manifest_locator = self._api_response["uuid"]
1662            self._portable_data_hash = self._api_response["portable_data_hash"]
1663
1664            self._manifest_text = text
1665            self.set_committed(True)
1666
1667        return text
1668
1669    _token_re = re.compile(r'(\S+)(\s+|$)')
1670    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1671    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1672
1673    def _unescape_manifest_path(self, path):
1674        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1675
1676    @synchronized
1677    def _import_manifest(self, manifest_text):
1678        """Import a manifest into a `Collection`.
1679
1680        :manifest_text:
1681          The manifest text to import from.
1682
1683        """
1684        if len(self) > 0:
1685            raise ArgumentError("Can only import manifest into an empty collection")
1686
1687        STREAM_NAME = 0
1688        BLOCKS = 1
1689        SEGMENTS = 2
1690
1691        stream_name = None
1692        state = STREAM_NAME
1693
1694        for token_and_separator in self._token_re.finditer(manifest_text):
1695            tok = token_and_separator.group(1)
1696            sep = token_and_separator.group(2)
1697
1698            if state == STREAM_NAME:
1699                # starting a new stream
1700                stream_name = self._unescape_manifest_path(tok)
1701                blocks = []
1702                segments = []
1703                streamoffset = 0
1704                state = BLOCKS
1705                self.find_or_create(stream_name, COLLECTION)
1706                continue
1707
1708            if state == BLOCKS:
1709                block_locator = self._block_re.match(tok)
1710                if block_locator:
1711                    blocksize = int(block_locator.group(1))
1712                    blocks.append(Range(tok, streamoffset, blocksize, 0))
1713                    streamoffset += blocksize
1714                else:
1715                    state = SEGMENTS
1716
1717            if state == SEGMENTS:
1718                file_segment = self._segment_re.match(tok)
1719                if file_segment:
1720                    pos = int(file_segment.group(1))
1721                    size = int(file_segment.group(2))
1722                    name = self._unescape_manifest_path(file_segment.group(3))
1723                    if name.split('/')[-1] == '.':
1724                        # placeholder for persisting an empty directory, not a real file
1725                        if len(name) > 2:
1726                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1727                    else:
1728                        filepath = os.path.join(stream_name, name)
1729                        try:
1730                            afile = self.find_or_create(filepath, FILE)
1731                        except IOError as e:
1732                            if e.errno == errno.ENOTDIR:
1733                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1734                            else:
1735                                raise e from None
1736                        if isinstance(afile, ArvadosFile):
1737                            afile.add_segment(blocks, pos, size)
1738                        else:
1739                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1740                else:
1741                    # error!
1742                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1743
1744            if sep == "\n":
1745                stream_name = None
1746                state = STREAM_NAME
1747
1748        self.set_committed(True)
1749
1750    @synchronized
1751    def notify(
1752            self,
1753            event: ChangeType,
1754            collection: 'RichCollectionBase',
1755            name: str,
1756            item: CollectionItem,
1757    ) -> None:
1758        if self._callback:
1759            self._callback(event, collection, name, item)
1760
1761
1762class Subcollection(RichCollectionBase):
1763    """Read and manipulate a stream/directory within an Arvados collection
1764
1765    This class represents a single stream (like a directory) within an Arvados
1766    `Collection`. It is returned by `Collection.find` and provides the same API.
1767    Operations that work on the API collection record propagate to the parent
1768    `Collection` object.
1769    """
1770
1771    def __init__(self, parent, name):
1772        super(Subcollection, self).__init__(parent)
1773        self.lock = self.root_collection().lock
1774        self._manifest_text = None
1775        self.name = name
1776        self.num_retries = parent.num_retries
1777
1778    def root_collection(self) -> 'Collection':
1779        return self.parent.root_collection()
1780
1781    def writable(self) -> bool:
1782        return self.root_collection().writable()
1783
1784    def _my_api(self):
1785        return self.root_collection()._my_api()
1786
1787    def _my_keep(self):
1788        return self.root_collection()._my_keep()
1789
1790    def _my_block_manager(self):
1791        return self.root_collection()._my_block_manager()
1792
1793    def stream_name(self) -> str:
1794        return os.path.join(self.parent.stream_name(), self.name)
1795
1796    @synchronized
1797    def clone(
1798            self,
1799            new_parent: Optional['Collection']=None,
1800            new_name: Optional[str]=None,
1801    ) -> 'Subcollection':
1802        c = Subcollection(new_parent, new_name)
1803        c._clonefrom(self)
1804        return c
1805
1806    @must_be_writable
1807    @synchronized
1808    def _reparent(self, newparent, newname):
1809        self.set_committed(False)
1810        self.flush()
1811        self.parent.remove(self.name, recursive=True)
1812        self.parent = newparent
1813        self.name = newname
1814        self.lock = self.parent.root_collection().lock
1815
1816    @synchronized
1817    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1818        """Encode empty directories by using an \056-named (".") empty file"""
1819        if len(self._items) == 0:
1820            return "%s %s 0:0:\\056\n" % (
1821                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1822        return super(Subcollection, self)._get_manifest_text(stream_name,
1823                                                             strip, normalize,
1824                                                             only_committed)
1825
1826
1827class CollectionReader(Collection):
1828    """Read-only `Collection` subclass
1829
1830    This class will never create or update any API collection records. You can
1831    use this class for additional code safety when you only need to read
1832    existing collections.
1833    """
1834    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1835        self._in_init = True
1836        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1837        self._in_init = False
1838
1839        # Forego any locking since it should never change once initialized.
1840        self.lock = NoopLock()
1841
1842        # Backwards compatability with old CollectionReader
1843        # all_streams() and all_files()
1844        self._streams = None
1845
1846    def writable(self) -> bool:
1847        return self._in_init
1848
1849    def _populate_streams(orig_func):
1850        @functools.wraps(orig_func)
1851        def populate_streams_wrapper(self, *args, **kwargs):
1852            # Defer populating self._streams until needed since it creates a copy of the manifest.
1853            if self._streams is None:
1854                if self._manifest_text:
1855                    self._streams = [sline.split()
1856                                     for sline in self._manifest_text.split("\n")
1857                                     if sline]
1858                else:
1859                    self._streams = []
1860            return orig_func(self, *args, **kwargs)
1861        return populate_streams_wrapper
1862
1863    @arvados.util._deprecated('3.0', 'Collection iteration')
1864    @_populate_streams
1865    def normalize(self):
1866        """Normalize the streams returned by `all_streams`"""
1867        streams = {}
1868        for s in self.all_streams():
1869            for f in s.all_files():
1870                streamname, filename = split(s.name() + "/" + f.name())
1871                if streamname not in streams:
1872                    streams[streamname] = {}
1873                if filename not in streams[streamname]:
1874                    streams[streamname][filename] = []
1875                for r in f.segments:
1876                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1877
1878        self._streams = [normalize_stream(s, streams[s])
1879                         for s in sorted(streams)]
1880
1881    @arvados.util._deprecated('3.0', 'Collection iteration')
1882    @_populate_streams
1883    def all_streams(self):
1884        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1885                for s in self._streams]
1886
1887    @arvados.util._deprecated('3.0', 'Collection iteration')
1888    @_populate_streams
1889    def all_files(self):
1890        for s in self.all_streams():
1891            for f in s.all_files():
1892                yield f
1893
1894
1895class CollectionWriter(CollectionBase):
1896    """Create a new collection from scratch
1897
1898    .. WARNING:: Deprecated
1899       This class is deprecated. Prefer `arvados.collection.Collection`
1900       instead.
1901    """
1902
1903    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1904    def __init__(self, api_client=None, num_retries=0, replication=None):
1905        """Instantiate a CollectionWriter.
1906
1907        CollectionWriter lets you build a new Arvados Collection from scratch.
1908        Write files to it.  The CollectionWriter will upload data to Keep as
1909        appropriate, and provide you with the Collection manifest text when
1910        you're finished.
1911
1912        Arguments:
1913        * api_client: The API client to use to look up Collections.  If not
1914          provided, CollectionReader will build one from available Arvados
1915          configuration.
1916        * num_retries: The default number of times to retry failed
1917          service requests.  Default 0.  You may change this value
1918          after instantiation, but note those changes may not
1919          propagate to related objects like the Keep client.
1920        * replication: The number of copies of each block to store.
1921          If this argument is None or not supplied, replication is
1922          the server-provided default if available, otherwise 2.
1923        """
1924        self._api_client = api_client
1925        self.num_retries = num_retries
1926        self.replication = (2 if replication is None else replication)
1927        self._keep_client = None
1928        self._data_buffer = []
1929        self._data_buffer_len = 0
1930        self._current_stream_files = []
1931        self._current_stream_length = 0
1932        self._current_stream_locators = []
1933        self._current_stream_name = '.'
1934        self._current_file_name = None
1935        self._current_file_pos = 0
1936        self._finished_streams = []
1937        self._close_file = None
1938        self._queued_file = None
1939        self._queued_dirents = deque()
1940        self._queued_trees = deque()
1941        self._last_open = None
1942
1943    def __exit__(self, exc_type, exc_value, traceback):
1944        if exc_type is None:
1945            self.finish()
1946
1947    def do_queued_work(self):
1948        # The work queue consists of three pieces:
1949        # * _queued_file: The file object we're currently writing to the
1950        #   Collection.
1951        # * _queued_dirents: Entries under the current directory
1952        #   (_queued_trees[0]) that we want to write or recurse through.
1953        #   This may contain files from subdirectories if
1954        #   max_manifest_depth == 0 for this directory.
1955        # * _queued_trees: Directories that should be written as separate
1956        #   streams to the Collection.
1957        # This function handles the smallest piece of work currently queued
1958        # (current file, then current directory, then next directory) until
1959        # no work remains.  The _work_THING methods each do a unit of work on
1960        # THING.  _queue_THING methods add a THING to the work queue.
1961        while True:
1962            if self._queued_file:
1963                self._work_file()
1964            elif self._queued_dirents:
1965                self._work_dirents()
1966            elif self._queued_trees:
1967                self._work_trees()
1968            else:
1969                break
1970
1971    def _work_file(self):
1972        while True:
1973            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1974            if not buf:
1975                break
1976            self.write(buf)
1977        self.finish_current_file()
1978        if self._close_file:
1979            self._queued_file.close()
1980        self._close_file = None
1981        self._queued_file = None
1982
1983    def _work_dirents(self):
1984        path, stream_name, max_manifest_depth = self._queued_trees[0]
1985        if stream_name != self.current_stream_name():
1986            self.start_new_stream(stream_name)
1987        while self._queued_dirents:
1988            dirent = self._queued_dirents.popleft()
1989            target = os.path.join(path, dirent)
1990            if os.path.isdir(target):
1991                self._queue_tree(target,
1992                                 os.path.join(stream_name, dirent),
1993                                 max_manifest_depth - 1)
1994            else:
1995                self._queue_file(target, dirent)
1996                break
1997        if not self._queued_dirents:
1998            self._queued_trees.popleft()
1999
2000    def _work_trees(self):
2001        path, stream_name, max_manifest_depth = self._queued_trees[0]
2002        d = arvados.util.listdir_recursive(
2003            path, max_depth = (None if max_manifest_depth == 0 else 0))
2004        if d:
2005            self._queue_dirents(stream_name, d)
2006        else:
2007            self._queued_trees.popleft()
2008
2009    def _queue_file(self, source, filename=None):
2010        assert (self._queued_file is None), "tried to queue more than one file"
2011        if not hasattr(source, 'read'):
2012            source = open(source, 'rb')
2013            self._close_file = True
2014        else:
2015            self._close_file = False
2016        if filename is None:
2017            filename = os.path.basename(source.name)
2018        self.start_new_file(filename)
2019        self._queued_file = source
2020
2021    def _queue_dirents(self, stream_name, dirents):
2022        assert (not self._queued_dirents), "tried to queue more than one tree"
2023        self._queued_dirents = deque(sorted(dirents))
2024
2025    def _queue_tree(self, path, stream_name, max_manifest_depth):
2026        self._queued_trees.append((path, stream_name, max_manifest_depth))
2027
2028    def write_file(self, source, filename=None):
2029        self._queue_file(source, filename)
2030        self.do_queued_work()
2031
2032    def write_directory_tree(self,
2033                             path, stream_name='.', max_manifest_depth=-1):
2034        self._queue_tree(path, stream_name, max_manifest_depth)
2035        self.do_queued_work()
2036
2037    def write(self, newdata):
2038        if isinstance(newdata, bytes):
2039            pass
2040        elif isinstance(newdata, str):
2041            newdata = newdata.encode()
2042        elif hasattr(newdata, '__iter__'):
2043            for s in newdata:
2044                self.write(s)
2045            return
2046        self._data_buffer.append(newdata)
2047        self._data_buffer_len += len(newdata)
2048        self._current_stream_length += len(newdata)
2049        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2050            self.flush_data()
2051
2052    def open(self, streampath, filename=None):
2053        """open(streampath[, filename]) -> file-like object
2054
2055        Pass in the path of a file to write to the Collection, either as a
2056        single string or as two separate stream name and file name arguments.
2057        This method returns a file-like object you can write to add it to the
2058        Collection.
2059
2060        You may only have one file object from the Collection open at a time,
2061        so be sure to close the object when you're done.  Using the object in
2062        a with statement makes that easy:
2063
2064            with cwriter.open('./doc/page1.txt') as outfile:
2065                outfile.write(page1_data)
2066            with cwriter.open('./doc/page2.txt') as outfile:
2067                outfile.write(page2_data)
2068        """
2069        if filename is None:
2070            streampath, filename = split(streampath)
2071        if self._last_open and not self._last_open.closed:
2072            raise errors.AssertionError(
2073                u"can't open '{}' when '{}' is still open".format(
2074                    filename, self._last_open.name))
2075        if streampath != self.current_stream_name():
2076            self.start_new_stream(streampath)
2077        self.set_current_file_name(filename)
2078        self._last_open = _WriterFile(self, filename)
2079        return self._last_open
2080
2081    def flush_data(self):
2082        data_buffer = b''.join(self._data_buffer)
2083        if data_buffer:
2084            self._current_stream_locators.append(
2085                self._my_keep().put(
2086                    data_buffer[0:config.KEEP_BLOCK_SIZE],
2087                    copies=self.replication))
2088            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2089            self._data_buffer_len = len(self._data_buffer[0])
2090
2091    def start_new_file(self, newfilename=None):
2092        self.finish_current_file()
2093        self.set_current_file_name(newfilename)
2094
2095    def set_current_file_name(self, newfilename):
2096        if re.search(r'[\t\n]', newfilename):
2097            raise errors.AssertionError(
2098                "Manifest filenames cannot contain whitespace: %s" %
2099                newfilename)
2100        elif re.search(r'\x00', newfilename):
2101            raise errors.AssertionError(
2102                "Manifest filenames cannot contain NUL characters: %s" %
2103                newfilename)
2104        self._current_file_name = newfilename
2105
2106    def current_file_name(self):
2107        return self._current_file_name
2108
2109    def finish_current_file(self):
2110        if self._current_file_name is None:
2111            if self._current_file_pos == self._current_stream_length:
2112                return
2113            raise errors.AssertionError(
2114                "Cannot finish an unnamed file " +
2115                "(%d bytes at offset %d in '%s' stream)" %
2116                (self._current_stream_length - self._current_file_pos,
2117                 self._current_file_pos,
2118                 self._current_stream_name))
2119        self._current_stream_files.append([
2120                self._current_file_pos,
2121                self._current_stream_length - self._current_file_pos,
2122                self._current_file_name])
2123        self._current_file_pos = self._current_stream_length
2124        self._current_file_name = None
2125
2126    def start_new_stream(self, newstreamname='.'):
2127        self.finish_current_stream()
2128        self.set_current_stream_name(newstreamname)
2129
2130    def set_current_stream_name(self, newstreamname):
2131        if re.search(r'[\t\n]', newstreamname):
2132            raise errors.AssertionError(
2133                "Manifest stream names cannot contain whitespace: '%s'" %
2134                (newstreamname))
2135        self._current_stream_name = '.' if newstreamname=='' else newstreamname
2136
2137    def current_stream_name(self):
2138        return self._current_stream_name
2139
2140    def finish_current_stream(self):
2141        self.finish_current_file()
2142        self.flush_data()
2143        if not self._current_stream_files:
2144            pass
2145        elif self._current_stream_name is None:
2146            raise errors.AssertionError(
2147                "Cannot finish an unnamed stream (%d bytes in %d files)" %
2148                (self._current_stream_length, len(self._current_stream_files)))
2149        else:
2150            if not self._current_stream_locators:
2151                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2152            self._finished_streams.append([self._current_stream_name,
2153                                           self._current_stream_locators,
2154                                           self._current_stream_files])
2155        self._current_stream_files = []
2156        self._current_stream_length = 0
2157        self._current_stream_locators = []
2158        self._current_stream_name = None
2159        self._current_file_pos = 0
2160        self._current_file_name = None
2161
2162    def finish(self):
2163        """Store the manifest in Keep and return its locator.
2164
2165        This is useful for storing manifest fragments (task outputs)
2166        temporarily in Keep during a Crunch job.
2167
2168        In other cases you should make a collection instead, by
2169        sending manifest_text() to the API server's "create
2170        collection" endpoint.
2171        """
2172        return self._my_keep().put(self.manifest_text().encode(),
2173                                   copies=self.replication)
2174
2175    def portable_data_hash(self):
2176        stripped = self.stripped_manifest().encode()
2177        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2178
2179    def manifest_text(self):
2180        self.finish_current_stream()
2181        manifest = ''
2182
2183        for stream in self._finished_streams:
2184            if not re.search(r'^\.(/.*)?$', stream[0]):
2185                manifest += './'
2186            manifest += stream[0].replace(' ', '\\040')
2187            manifest += ' ' + ' '.join(stream[1])
2188            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2189            manifest += "\n"
2190
2191        return manifest
2192
2193    def data_locators(self):
2194        ret = []
2195        for name, locators, files in self._finished_streams:
2196            ret += locators
2197        return ret
2198
2199    def save_new(self, name=None):
2200        return self._api_client.collections().create(
2201            ensure_unique_name=True,
2202            body={
2203                'name': name,
2204                'manifest_text': self.manifest_text(),
2205            }).execute(num_retries=self.num_retries)
2206
2207
2208class ResumableCollectionWriter(CollectionWriter):
2209    """CollectionWriter that can serialize internal state to disk
2210
2211    .. WARNING:: Deprecated
2212       This class is deprecated. Prefer `arvados.collection.Collection`
2213       instead.
2214    """
2215
2216    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2217                   '_current_stream_locators', '_current_stream_name',
2218                   '_current_file_name', '_current_file_pos', '_close_file',
2219                   '_data_buffer', '_dependencies', '_finished_streams',
2220                   '_queued_dirents', '_queued_trees']
2221
2222    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2223    def __init__(self, api_client=None, **kwargs):
2224        self._dependencies = {}
2225        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2226
2227    @classmethod
2228    def from_state(cls, state, *init_args, **init_kwargs):
2229        # Try to build a new writer from scratch with the given state.
2230        # If the state is not suitable to resume (because files have changed,
2231        # been deleted, aren't predictable, etc.), raise a
2232        # StaleWriterStateError.  Otherwise, return the initialized writer.
2233        # The caller is responsible for calling writer.do_queued_work()
2234        # appropriately after it's returned.
2235        writer = cls(*init_args, **init_kwargs)
2236        for attr_name in cls.STATE_PROPS:
2237            attr_value = state[attr_name]
2238            attr_class = getattr(writer, attr_name).__class__
2239            # Coerce the value into the same type as the initial value, if
2240            # needed.
2241            if attr_class not in (type(None), attr_value.__class__):
2242                attr_value = attr_class(attr_value)
2243            setattr(writer, attr_name, attr_value)
2244        # Check dependencies before we try to resume anything.
2245        if any(KeepLocator(ls).permission_expired()
2246               for ls in writer._current_stream_locators):
2247            raise errors.StaleWriterStateError(
2248                "locators include expired permission hint")
2249        writer.check_dependencies()
2250        if state['_current_file'] is not None:
2251            path, pos = state['_current_file']
2252            try:
2253                writer._queued_file = open(path, 'rb')
2254                writer._queued_file.seek(pos)
2255            except IOError as error:
2256                raise errors.StaleWriterStateError(
2257                    u"failed to reopen active file {}: {}".format(path, error))
2258        return writer
2259
2260    def check_dependencies(self):
2261        for path, orig_stat in listitems(self._dependencies):
2262            if not S_ISREG(orig_stat[ST_MODE]):
2263                raise errors.StaleWriterStateError(u"{} not file".format(path))
2264            try:
2265                now_stat = tuple(os.stat(path))
2266            except OSError as error:
2267                raise errors.StaleWriterStateError(
2268                    u"failed to stat {}: {}".format(path, error))
2269            if ((not S_ISREG(now_stat[ST_MODE])) or
2270                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2271                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2272                raise errors.StaleWriterStateError(u"{} changed".format(path))
2273
2274    def dump_state(self, copy_func=lambda x: x):
2275        state = {attr: copy_func(getattr(self, attr))
2276                 for attr in self.STATE_PROPS}
2277        if self._queued_file is None:
2278            state['_current_file'] = None
2279        else:
2280            state['_current_file'] = (os.path.realpath(self._queued_file.name),
2281                                      self._queued_file.tell())
2282        return state
2283
2284    def _queue_file(self, source, filename=None):
2285        try:
2286            src_path = os.path.realpath(source)
2287        except Exception:
2288            raise errors.AssertionError(u"{} not a file path".format(source))
2289        try:
2290            path_stat = os.stat(src_path)
2291        except OSError as stat_error:
2292            path_stat = None
2293        super(ResumableCollectionWriter, self)._queue_file(source, filename)
2294        fd_stat = os.fstat(self._queued_file.fileno())
2295        if not S_ISREG(fd_stat.st_mode):
2296            # We won't be able to resume from this cache anyway, so don't
2297            # worry about further checks.
2298            self._dependencies[source] = tuple(fd_stat)
2299        elif path_stat is None:
2300            raise errors.AssertionError(
2301                u"could not stat {}: {}".format(source, stat_error))
2302        elif path_stat.st_ino != fd_stat.st_ino:
2303            raise errors.AssertionError(
2304                u"{} changed between open and stat calls".format(source))
2305        else:
2306            self._dependencies[src_path] = tuple(fd_stat)
2307
2308    def write(self, data):
2309        if self._queued_file is None:
2310            raise errors.AssertionError(
2311                "resumable writer can't accept unsourced data")
2312        return super(ResumableCollectionWriter, self).write(data)
ADD = 'add'

Argument value for Collection methods to represent an added item

DEL = 'del'

Argument value for Collection methods to represent a removed item

MOD = 'mod'

Argument value for Collection methods to represent a modified item

TOK = 'tok'

Argument value for Collection methods to represent an item with token differences

FILE = 'file'

create_type value for Collection.find_or_create

COLLECTION = 'collection'

create_type value for Collection.find_or_create

ChangeList = typing.List[typing.Union[typing.Tuple[typing.Literal['add', 'del'], str, ForwardRef('Collection')], typing.Tuple[typing.Literal['mod', 'tok'], str, ForwardRef('Collection'), ForwardRef('Collection')]]]
ChangeType = typing.Literal['add', 'del', 'mod', 'tok']
CollectionItem = typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]
ChangeCallback = typing.Callable[[typing.Literal['add', 'del', 'mod', 'tok'], ForwardRef('Collection'), str, typing.Union[arvados.arvfile.ArvadosFile, ForwardRef('Collection')]], object]
CreateType = typing.Literal['collection', 'file']
Properties = typing.Dict[str, typing.Any]
StorageClasses = typing.List[str]
class CollectionBase:
 93class CollectionBase(object):
 94    """Abstract base class for Collection classes
 95
 96    .. ATTENTION:: Internal
 97       This class is meant to be used by other parts of the SDK. User code
 98       should instantiate or subclass `Collection` or one of its subclasses
 99       directly.
100    """
101
102    def __enter__(self):
103        """Enter a context block with this collection instance"""
104        return self
105
106    def __exit__(self, exc_type, exc_value, traceback):
107        """Exit a context block with this collection instance"""
108        pass
109
110    def _my_keep(self):
111        if self._keep_client is None:
112            self._keep_client = KeepClient(api_client=self._api_client,
113                                           num_retries=self.num_retries)
114        return self._keep_client
115
116    def stripped_manifest(self) -> str:
117        """Create a copy of the collection manifest with only size hints
118
119        This method returns a string with the current collection's manifest
120        text with all non-portable locator hints like permission hints and
121        remote cluster hints removed. The only hints in the returned manifest
122        will be size hints.
123        """
124        raw = self.manifest_text()
125        clean = []
126        for line in raw.split("\n"):
127            fields = line.split()
128            if fields:
129                clean_fields = fields[:1] + [
130                    (re.sub(r'\+[^\d][^\+]*', '', x)
131                     if re.match(arvados.util.keep_locator_pattern, x)
132                     else x)
133                    for x in fields[1:]]
134                clean += [' '.join(clean_fields), "\n"]
135        return ''.join(clean)

Abstract base class for Collection classes

def stripped_manifest(self) -> str:
116    def stripped_manifest(self) -> str:
117        """Create a copy of the collection manifest with only size hints
118
119        This method returns a string with the current collection's manifest
120        text with all non-portable locator hints like permission hints and
121        remote cluster hints removed. The only hints in the returned manifest
122        will be size hints.
123        """
124        raw = self.manifest_text()
125        clean = []
126        for line in raw.split("\n"):
127            fields = line.split()
128            if fields:
129                clean_fields = fields[:1] + [
130                    (re.sub(r'\+[^\d][^\+]*', '', x)
131                     if re.match(arvados.util.keep_locator_pattern, x)
132                     else x)
133                    for x in fields[1:]]
134                clean += [' '.join(clean_fields), "\n"]
135        return ''.join(clean)

Create a copy of the collection manifest with only size hints

This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.

class RichCollectionBase(CollectionBase):
 161class RichCollectionBase(CollectionBase):
 162    """Base class for Collection classes
 163
 164    .. ATTENTION:: Internal
 165       This class is meant to be used by other parts of the SDK. User code
 166       should instantiate or subclass `Collection` or one of its subclasses
 167       directly.
 168    """
 169
 170    def __init__(self, parent=None):
 171        self.parent = parent
 172        self._committed = False
 173        self._has_remote_blocks = False
 174        self._callback = None
 175        self._items = {}
 176
 177    def _my_api(self):
 178        raise NotImplementedError()
 179
 180    def _my_keep(self):
 181        raise NotImplementedError()
 182
 183    def _my_block_manager(self):
 184        raise NotImplementedError()
 185
 186    def writable(self) -> bool:
 187        """Indicate whether this collection object can be modified
 188
 189        This method returns `False` if this object is a `CollectionReader`,
 190        else `True`.
 191        """
 192        raise NotImplementedError()
 193
 194    def root_collection(self) -> 'Collection':
 195        """Get this collection's root collection object
 196
 197        If you open a subcollection with `Collection.find`, calling this method
 198        on that subcollection returns the source Collection object.
 199        """
 200        raise NotImplementedError()
 201
 202    def stream_name(self) -> str:
 203        """Get the name of the manifest stream represented by this collection
 204
 205        If you open a subcollection with `Collection.find`, calling this method
 206        on that subcollection returns the name of the stream you opened.
 207        """
 208        raise NotImplementedError()
 209
 210    @synchronized
 211    def has_remote_blocks(self) -> bool:
 212        """Indiciate whether the collection refers to remote data
 213
 214        Returns `True` if the collection manifest includes any Keep locators
 215        with a remote hint (`+R`), else `False`.
 216        """
 217        if self._has_remote_blocks:
 218            return True
 219        for item in self:
 220            if self[item].has_remote_blocks():
 221                return True
 222        return False
 223
 224    @synchronized
 225    def set_has_remote_blocks(self, val: bool) -> None:
 226        """Cache whether this collection refers to remote blocks
 227
 228        .. ATTENTION:: Internal
 229           This method is only meant to be used by other Collection methods.
 230
 231        Set this collection's cached "has remote blocks" flag to the given
 232        value.
 233        """
 234        self._has_remote_blocks = val
 235        if self.parent:
 236            self.parent.set_has_remote_blocks(val)
 237
 238    @must_be_writable
 239    @synchronized
 240    def find_or_create(
 241            self,
 242            path: str,
 243            create_type: CreateType,
 244    ) -> CollectionItem:
 245        """Get the item at the given path, creating it if necessary
 246
 247        If `path` refers to a stream in this collection, returns a
 248        corresponding `Subcollection` object. If `path` refers to a file in
 249        this collection, returns a corresponding
 250        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 251        this collection, then this method creates a new object and returns
 252        it, creating parent streams as needed. The type of object created is
 253        determined by the value of `create_type`.
 254
 255        Arguments:
 256
 257        * path: str --- The path to find or create within this collection.
 258
 259        * create_type: Literal[COLLECTION, FILE] --- The type of object to
 260          create at `path` if one does not exist. Passing `COLLECTION`
 261          creates a stream and returns the corresponding
 262          `Subcollection`. Passing `FILE` creates a new file and returns the
 263          corresponding `arvados.arvfile.ArvadosFile`.
 264        """
 265        pathcomponents = path.split("/", 1)
 266        if pathcomponents[0]:
 267            item = self._items.get(pathcomponents[0])
 268            if len(pathcomponents) == 1:
 269                if item is None:
 270                    # create new file
 271                    if create_type == COLLECTION:
 272                        item = Subcollection(self, pathcomponents[0])
 273                    else:
 274                        item = ArvadosFile(self, pathcomponents[0])
 275                    self._items[pathcomponents[0]] = item
 276                    self.set_committed(False)
 277                    self.notify(ADD, self, pathcomponents[0], item)
 278                return item
 279            else:
 280                if item is None:
 281                    # create new collection
 282                    item = Subcollection(self, pathcomponents[0])
 283                    self._items[pathcomponents[0]] = item
 284                    self.set_committed(False)
 285                    self.notify(ADD, self, pathcomponents[0], item)
 286                if isinstance(item, RichCollectionBase):
 287                    return item.find_or_create(pathcomponents[1], create_type)
 288                else:
 289                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 290        else:
 291            return self
 292
 293    @synchronized
 294    def find(self, path: str) -> CollectionItem:
 295        """Get the item at the given path
 296
 297        If `path` refers to a stream in this collection, returns a
 298        corresponding `Subcollection` object. If `path` refers to a file in
 299        this collection, returns a corresponding
 300        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
 301        this collection, then this method raises `NotADirectoryError`.
 302
 303        Arguments:
 304
 305        * path: str --- The path to find or create within this collection.
 306        """
 307        if not path:
 308            raise errors.ArgumentError("Parameter 'path' is empty.")
 309
 310        pathcomponents = path.split("/", 1)
 311        if pathcomponents[0] == '':
 312            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 313
 314        item = self._items.get(pathcomponents[0])
 315        if item is None:
 316            return None
 317        elif len(pathcomponents) == 1:
 318            return item
 319        else:
 320            if isinstance(item, RichCollectionBase):
 321                if pathcomponents[1]:
 322                    return item.find(pathcomponents[1])
 323                else:
 324                    return item
 325            else:
 326                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 327
 328    @synchronized
 329    def mkdirs(self, path: str) -> 'Subcollection':
 330        """Create and return a subcollection at `path`
 331
 332        If `path` exists within this collection, raises `FileExistsError`.
 333        Otherwise, creates a stream at that path and returns the
 334        corresponding `Subcollection`.
 335        """
 336        if self.find(path) != None:
 337            raise IOError(errno.EEXIST, "Directory or file exists", path)
 338
 339        return self.find_or_create(path, COLLECTION)
 340
 341    def open(
 342            self,
 343            path: str,
 344            mode: str="r",
 345            encoding: Optional[str]=None,
 346    ) -> IO:
 347        """Open a file-like object within the collection
 348
 349        This method returns a file-like object that can read and/or write the
 350        file located at `path` within the collection. If you attempt to write
 351        a `path` that does not exist, the file is created with `find_or_create`.
 352        If the file cannot be opened for any other reason, this method raises
 353        `OSError` with an appropriate errno.
 354
 355        Arguments:
 356
 357        * path: str --- The path of the file to open within this collection
 358
 359        * mode: str --- The mode to open this file. Supports all the same
 360          values as `builtins.open`.
 361
 362        * encoding: str | None --- The text encoding of the file. Only used
 363          when the file is opened in text mode. The default is
 364          platform-dependent.
 365        """
 366        if not re.search(r'^[rwa][bt]?\+?$', mode):
 367            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
 368
 369        if mode[0] == 'r' and '+' not in mode:
 370            fclass = ArvadosFileReader
 371            arvfile = self.find(path)
 372        elif not self.writable():
 373            raise IOError(errno.EROFS, "Collection is read only")
 374        else:
 375            fclass = ArvadosFileWriter
 376            arvfile = self.find_or_create(path, FILE)
 377
 378        if arvfile is None:
 379            raise IOError(errno.ENOENT, "File not found", path)
 380        if not isinstance(arvfile, ArvadosFile):
 381            raise IOError(errno.EISDIR, "Is a directory", path)
 382
 383        if mode[0] == 'w':
 384            arvfile.truncate(0)
 385
 386        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
 387        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
 388        if 'b' not in mode:
 389            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
 390            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
 391        return f
 392
 393    def modified(self) -> bool:
 394        """Indicate whether this collection has an API server record
 395
 396        Returns `False` if this collection corresponds to a record loaded from
 397        the API server, `True` otherwise.
 398        """
 399        return not self.committed()
 400
 401    @synchronized
 402    def committed(self):
 403        """Indicate whether this collection has an API server record
 404
 405        Returns `True` if this collection corresponds to a record loaded from
 406        the API server, `False` otherwise.
 407        """
 408        return self._committed
 409
 410    @synchronized
 411    def set_committed(self, value: bool=True):
 412        """Cache whether this collection has an API server record
 413
 414        .. ATTENTION:: Internal
 415           This method is only meant to be used by other Collection methods.
 416
 417        Set this collection's cached "committed" flag to the given
 418        value and propagates it as needed.
 419        """
 420        if value == self._committed:
 421            return
 422        if value:
 423            for k,v in listitems(self._items):
 424                v.set_committed(True)
 425            self._committed = True
 426        else:
 427            self._committed = False
 428            if self.parent is not None:
 429                self.parent.set_committed(False)
 430
 431    @synchronized
 432    def __iter__(self) -> Iterator[str]:
 433        """Iterate names of streams and files in this collection
 434
 435        This method does not recurse. It only iterates the contents of this
 436        collection's corresponding stream.
 437        """
 438        return iter(viewkeys(self._items))
 439
 440    @synchronized
 441    def __getitem__(self, k: str) -> CollectionItem:
 442        """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
 443
 444        This method does not recurse. If you want to search a path, use
 445        `RichCollectionBase.find` instead.
 446        """
 447        return self._items[k]
 448
 449    @synchronized
 450    def __contains__(self, k: str) -> bool:
 451        """Indicate whether this collection has an item with this name
 452
 453        This method does not recurse. It you want to check a path, use
 454        `RichCollectionBase.exists` instead.
 455        """
 456        return k in self._items
 457
 458    @synchronized
 459    def __len__(self):
 460        """Get the number of items directly contained in this collection
 461
 462        This method does not recurse. It only counts the streams and files
 463        in this collection's corresponding stream.
 464        """
 465        return len(self._items)
 466
 467    @must_be_writable
 468    @synchronized
 469    def __delitem__(self, p: str) -> None:
 470        """Delete an item from this collection's stream
 471
 472        This method does not recurse. If you want to remove an item by a
 473        path, use `RichCollectionBase.remove` instead.
 474        """
 475        del self._items[p]
 476        self.set_committed(False)
 477        self.notify(DEL, self, p, None)
 478
 479    @synchronized
 480    def keys(self) -> Iterator[str]:
 481        """Iterate names of streams and files in this collection
 482
 483        This method does not recurse. It only iterates the contents of this
 484        collection's corresponding stream.
 485        """
 486        return self._items.keys()
 487
 488    @synchronized
 489    def values(self) -> List[CollectionItem]:
 490        """Get a list of objects in this collection's stream
 491
 492        The return value includes a `Subcollection` for every stream, and an
 493        `arvados.arvfile.ArvadosFile` for every file, directly within this
 494        collection's stream.  This method does not recurse.
 495        """
 496        return listvalues(self._items)
 497
 498    @synchronized
 499    def items(self) -> List[Tuple[str, CollectionItem]]:
 500        """Get a list of `(name, object)` tuples from this collection's stream
 501
 502        The return value includes a `Subcollection` for every stream, and an
 503        `arvados.arvfile.ArvadosFile` for every file, directly within this
 504        collection's stream.  This method does not recurse.
 505        """
 506        return listitems(self._items)
 507
 508    def exists(self, path: str) -> bool:
 509        """Indicate whether this collection includes an item at `path`
 510
 511        This method returns `True` if `path` refers to a stream or file within
 512        this collection, else `False`.
 513
 514        Arguments:
 515
 516        * path: str --- The path to check for existence within this collection
 517        """
 518        return self.find(path) is not None
 519
 520    @must_be_writable
 521    @synchronized
 522    def remove(self, path: str, recursive: bool=False) -> None:
 523        """Remove the file or stream at `path`
 524
 525        Arguments:
 526
 527        * path: str --- The path of the item to remove from the collection
 528
 529        * recursive: bool --- Controls the method's behavior if `path` refers
 530          to a nonempty stream. If `False` (the default), this method raises
 531          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
 532          items under the stream.
 533        """
 534        if not path:
 535            raise errors.ArgumentError("Parameter 'path' is empty.")
 536
 537        pathcomponents = path.split("/", 1)
 538        item = self._items.get(pathcomponents[0])
 539        if item is None:
 540            raise IOError(errno.ENOENT, "File not found", path)
 541        if len(pathcomponents) == 1:
 542            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
 543                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
 544            deleteditem = self._items[pathcomponents[0]]
 545            del self._items[pathcomponents[0]]
 546            self.set_committed(False)
 547            self.notify(DEL, self, pathcomponents[0], deleteditem)
 548        else:
 549            item.remove(pathcomponents[1], recursive=recursive)
 550
 551    def _clonefrom(self, source):
 552        for k,v in listitems(source):
 553            self._items[k] = v.clone(self, k)
 554
 555    def clone(self):
 556        raise NotImplementedError()
 557
 558    @must_be_writable
 559    @synchronized
 560    def add(
 561            self,
 562            source_obj: CollectionItem,
 563            target_name: str,
 564            overwrite: bool=False,
 565            reparent: bool=False,
 566    ) -> None:
 567        """Copy or move a file or subcollection object to this collection
 568
 569        Arguments:
 570
 571        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
 572          to add to this collection
 573
 574        * target_name: str --- The path inside this collection where
 575          `source_obj` should be added.
 576
 577        * overwrite: bool --- Controls the behavior of this method when the
 578          collection already contains an object at `target_name`. If `False`
 579          (the default), this method will raise `FileExistsError`. If `True`,
 580          the object at `target_name` will be replaced with `source_obj`.
 581
 582        * reparent: bool --- Controls whether this method copies or moves
 583          `source_obj`. If `False` (the default), `source_obj` is copied into
 584          this collection. If `True`, `source_obj` is moved into this
 585          collection.
 586        """
 587        if target_name in self and not overwrite:
 588            raise IOError(errno.EEXIST, "File already exists", target_name)
 589
 590        modified_from = None
 591        if target_name in self:
 592            modified_from = self[target_name]
 593
 594        # Actually make the move or copy.
 595        if reparent:
 596            source_obj._reparent(self, target_name)
 597            item = source_obj
 598        else:
 599            item = source_obj.clone(self, target_name)
 600
 601        self._items[target_name] = item
 602        self.set_committed(False)
 603        if not self._has_remote_blocks and source_obj.has_remote_blocks():
 604            self.set_has_remote_blocks(True)
 605
 606        if modified_from:
 607            self.notify(MOD, self, target_name, (modified_from, item))
 608        else:
 609            self.notify(ADD, self, target_name, item)
 610
 611    def _get_src_target(self, source, target_path, source_collection, create_dest):
 612        if source_collection is None:
 613            source_collection = self
 614
 615        # Find the object
 616        if isinstance(source, basestring):
 617            source_obj = source_collection.find(source)
 618            if source_obj is None:
 619                raise IOError(errno.ENOENT, "File not found", source)
 620            sourcecomponents = source.split("/")
 621        else:
 622            source_obj = source
 623            sourcecomponents = None
 624
 625        # Find parent collection the target path
 626        targetcomponents = target_path.split("/")
 627
 628        # Determine the name to use.
 629        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
 630
 631        if not target_name:
 632            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 633
 634        if create_dest:
 635            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 636        else:
 637            if len(targetcomponents) > 1:
 638                target_dir = self.find("/".join(targetcomponents[0:-1]))
 639            else:
 640                target_dir = self
 641
 642        if target_dir is None:
 643            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 644
 645        if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
 646            target_dir = target_dir[target_name]
 647            target_name = sourcecomponents[-1]
 648
 649        return (source_obj, target_dir, target_name)
 650
 651    @must_be_writable
 652    @synchronized
 653    def copy(
 654            self,
 655            source: Union[str, CollectionItem],
 656            target_path: str,
 657            source_collection: Optional['RichCollectionBase']=None,
 658            overwrite: bool=False,
 659    ) -> None:
 660        """Copy a file or subcollection object to this collection
 661
 662        Arguments:
 663
 664        * source: str | arvados.arvfile.ArvadosFile |
 665          arvados.collection.Subcollection --- The file or subcollection to
 666          add to this collection. If `source` is a str, the object will be
 667          found by looking up this path from `source_collection` (see
 668          below).
 669
 670        * target_path: str --- The path inside this collection where the
 671          source object should be added.
 672
 673        * source_collection: arvados.collection.Collection | None --- The
 674          collection to find the source object from when `source` is a
 675          path. Defaults to the current collection (`self`).
 676
 677        * overwrite: bool --- Controls the behavior of this method when the
 678          collection already contains an object at `target_path`. If `False`
 679          (the default), this method will raise `FileExistsError`. If `True`,
 680          the object at `target_path` will be replaced with `source_obj`.
 681        """
 682        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 683        target_dir.add(source_obj, target_name, overwrite, False)
 684
 685    @must_be_writable
 686    @synchronized
 687    def rename(
 688            self,
 689            source: Union[str, CollectionItem],
 690            target_path: str,
 691            source_collection: Optional['RichCollectionBase']=None,
 692            overwrite: bool=False,
 693    ) -> None:
 694        """Move a file or subcollection object to this collection
 695
 696        Arguments:
 697
 698        * source: str | arvados.arvfile.ArvadosFile |
 699          arvados.collection.Subcollection --- The file or subcollection to
 700          add to this collection. If `source` is a str, the object will be
 701          found by looking up this path from `source_collection` (see
 702          below).
 703
 704        * target_path: str --- The path inside this collection where the
 705          source object should be added.
 706
 707        * source_collection: arvados.collection.Collection | None --- The
 708          collection to find the source object from when `source` is a
 709          path. Defaults to the current collection (`self`).
 710
 711        * overwrite: bool --- Controls the behavior of this method when the
 712          collection already contains an object at `target_path`. If `False`
 713          (the default), this method will raise `FileExistsError`. If `True`,
 714          the object at `target_path` will be replaced with `source_obj`.
 715        """
 716        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 717        if not source_obj.writable():
 718            raise IOError(errno.EROFS, "Source collection is read only", source)
 719        target_dir.add(source_obj, target_name, overwrite, True)
 720
 721    def portable_manifest_text(self, stream_name: str=".") -> str:
 722        """Get the portable manifest text for this collection
 723
 724        The portable manifest text is normalized, and does not include access
 725        tokens. This method does not flush outstanding blocks to Keep.
 726
 727        Arguments:
 728
 729        * stream_name: str --- The name to use for this collection's stream in
 730          the generated manifest. Default `'.'`.
 731        """
 732        return self._get_manifest_text(stream_name, True, True)
 733
 734    @synchronized
 735    def manifest_text(
 736            self,
 737            stream_name: str=".",
 738            strip: bool=False,
 739            normalize: bool=False,
 740            only_committed: bool=False,
 741    ) -> str:
 742        """Get the manifest text for this collection
 743
 744        Arguments:
 745
 746        * stream_name: str --- The name to use for this collection's stream in
 747          the generated manifest. Default `'.'`.
 748
 749        * strip: bool --- Controls whether or not the returned manifest text
 750          includes access tokens. If `False` (the default), the manifest text
 751          will include access tokens. If `True`, the manifest text will not
 752          include access tokens.
 753
 754        * normalize: bool --- Controls whether or not the returned manifest
 755          text is normalized. Default `False`.
 756
 757        * only_committed: bool --- Controls whether or not this method uploads
 758          pending data to Keep before building and returning the manifest text.
 759          If `False` (the default), this method will finish uploading all data
 760          to Keep, then return the final manifest. If `True`, this method will
 761          build and return a manifest that only refers to the data that has
 762          finished uploading at the time this method was called.
 763        """
 764        if not only_committed:
 765            self._my_block_manager().commit_all()
 766        return self._get_manifest_text(stream_name, strip, normalize,
 767                                       only_committed=only_committed)
 768
 769    @synchronized
 770    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
 771        """Get the manifest text for this collection, sub collections and files.
 772
 773        :stream_name:
 774          Name to use for this stream (directory)
 775
 776        :strip:
 777          If True, remove signing tokens from block locators if present.
 778          If False (default), block locators are left unchanged.
 779
 780        :normalize:
 781          If True, always export the manifest text in normalized form
 782          even if the Collection is not modified.  If False (default) and the collection
 783          is not modified, return the original manifest text even if it is not
 784          in normalized form.
 785
 786        :only_committed:
 787          If True, only include blocks that were already committed to Keep.
 788
 789        """
 790
 791        if not self.committed() or self._manifest_text is None or normalize:
 792            stream = {}
 793            buf = []
 794            sorted_keys = sorted(self.keys())
 795            for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
 796                # Create a stream per file `k`
 797                arvfile = self[filename]
 798                filestream = []
 799                for segment in arvfile.segments():
 800                    loc = segment.locator
 801                    if arvfile.parent._my_block_manager().is_bufferblock(loc):
 802                        if only_committed:
 803                            continue
 804                        loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
 805                    if strip:
 806                        loc = KeepLocator(loc).stripped()
 807                    filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
 808                                         segment.segment_offset, segment.range_size))
 809                stream[filename] = filestream
 810            if stream:
 811                buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
 812            for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
 813                buf.append(self[dirname].manifest_text(
 814                    stream_name=os.path.join(stream_name, dirname),
 815                    strip=strip, normalize=True, only_committed=only_committed))
 816            return "".join(buf)
 817        else:
 818            if strip:
 819                return self.stripped_manifest()
 820            else:
 821                return self._manifest_text
 822
 823    @synchronized
 824    def _copy_remote_blocks(self, remote_blocks={}):
 825        """Scan through the entire collection and ask Keep to copy remote blocks.
 826
 827        When accessing a remote collection, blocks will have a remote signature
 828        (+R instead of +A). Collect these signatures and request Keep to copy the
 829        blocks to the local cluster, returning local (+A) signatures.
 830
 831        :remote_blocks:
 832          Shared cache of remote to local block mappings. This is used to avoid
 833          doing extra work when blocks are shared by more than one file in
 834          different subdirectories.
 835
 836        """
 837        for item in self:
 838            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
 839        return remote_blocks
 840
 841    @synchronized
 842    def diff(
 843            self,
 844            end_collection: 'RichCollectionBase',
 845            prefix: str=".",
 846            holding_collection: Optional['Collection']=None,
 847    ) -> ChangeList:
 848        """Build a list of differences between this collection and another
 849
 850        Arguments:
 851
 852        * end_collection: arvados.collection.RichCollectionBase --- A
 853          collection object with the desired end state. The returned diff
 854          list will describe how to go from the current collection object
 855          `self` to `end_collection`.
 856
 857        * prefix: str --- The name to use for this collection's stream in
 858          the diff list. Default `'.'`.
 859
 860        * holding_collection: arvados.collection.Collection | None --- A
 861          collection object used to hold objects for the returned diff
 862          list. By default, a new empty collection is created.
 863        """
 864        changes = []
 865        if holding_collection is None:
 866            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
 867        for k in self:
 868            if k not in end_collection:
 869               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
 870        for k in end_collection:
 871            if k in self:
 872                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
 873                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
 874                elif end_collection[k] != self[k]:
 875                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 876                else:
 877                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
 878            else:
 879                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
 880        return changes
 881
 882    @must_be_writable
 883    @synchronized
 884    def apply(self, changes: ChangeList) -> None:
 885        """Apply a list of changes from to this collection
 886
 887        This method takes a list of changes generated by
 888        `RichCollectionBase.diff` and applies it to this
 889        collection. Afterward, the state of this collection object will
 890        match the state of `end_collection` passed to `diff`. If a change
 891        conflicts with a local change, it will be saved to an alternate path
 892        indicating the conflict.
 893
 894        Arguments:
 895
 896        * changes: arvados.collection.ChangeList --- The list of differences
 897          generated by `RichCollectionBase.diff`.
 898        """
 899        if changes:
 900            self.set_committed(False)
 901        for change in changes:
 902            event_type = change[0]
 903            path = change[1]
 904            initial = change[2]
 905            local = self.find(path)
 906            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
 907                                                                    time.gmtime()))
 908            if event_type == ADD:
 909                if local is None:
 910                    # No local file at path, safe to copy over new file
 911                    self.copy(initial, path)
 912                elif local is not None and local != initial:
 913                    # There is already local file and it is different:
 914                    # save change to conflict file.
 915                    self.copy(initial, conflictpath)
 916            elif event_type == MOD or event_type == TOK:
 917                final = change[3]
 918                if local == initial:
 919                    # Local matches the "initial" item so it has not
 920                    # changed locally and is safe to update.
 921                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
 922                        # Replace contents of local file with new contents
 923                        local.replace_contents(final)
 924                    else:
 925                        # Overwrite path with new item; this can happen if
 926                        # path was a file and is now a collection or vice versa
 927                        self.copy(final, path, overwrite=True)
 928                else:
 929                    # Local is missing (presumably deleted) or local doesn't
 930                    # match the "start" value, so save change to conflict file
 931                    self.copy(final, conflictpath)
 932            elif event_type == DEL:
 933                if local == initial:
 934                    # Local item matches "initial" value, so it is safe to remove.
 935                    self.remove(path, recursive=True)
 936                # else, the file is modified or already removed, in either
 937                # case we don't want to try to remove it.
 938
 939    def portable_data_hash(self) -> str:
 940        """Get the portable data hash for this collection's manifest"""
 941        if self._manifest_locator and self.committed():
 942            # If the collection is already saved on the API server, and it's committed
 943            # then return API server's PDH response.
 944            return self._portable_data_hash
 945        else:
 946            stripped = self.portable_manifest_text().encode()
 947            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 948
 949    @synchronized
 950    def subscribe(self, callback: ChangeCallback) -> None:
 951        """Set a notify callback for changes to this collection
 952
 953        Arguments:
 954
 955        * callback: arvados.collection.ChangeCallback --- The callable to
 956          call each time the collection is changed.
 957        """
 958        if self._callback is None:
 959            self._callback = callback
 960        else:
 961            raise errors.ArgumentError("A callback is already set on this collection.")
 962
 963    @synchronized
 964    def unsubscribe(self) -> None:
 965        """Remove any notify callback set for changes to this collection"""
 966        if self._callback is not None:
 967            self._callback = None
 968
 969    @synchronized
 970    def notify(
 971            self,
 972            event: ChangeType,
 973            collection: 'RichCollectionBase',
 974            name: str,
 975            item: CollectionItem,
 976    ) -> None:
 977        """Notify any subscribed callback about a change to this collection
 978
 979        .. ATTENTION:: Internal
 980           This method is only meant to be used by other Collection methods.
 981
 982        If a callback has been registered with `RichCollectionBase.subscribe`,
 983        it will be called with information about a change to this collection.
 984        Then this notification will be propagated to this collection's root.
 985
 986        Arguments:
 987
 988        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 989          the collection.
 990
 991        * collection: arvados.collection.RichCollectionBase --- The
 992          collection that was modified.
 993
 994        * name: str --- The name of the file or stream within `collection` that
 995          was modified.
 996
 997        * item: arvados.arvfile.ArvadosFile |
 998          arvados.collection.Subcollection --- The new contents at `name`
 999          within `collection`.
1000        """
1001        if self._callback:
1002            self._callback(event, collection, name, item)
1003        self.root_collection().notify(event, collection, name, item)
1004
1005    @synchronized
1006    def __eq__(self, other: Any) -> bool:
1007        """Indicate whether this collection object is equal to another"""
1008        if other is self:
1009            return True
1010        if not isinstance(other, RichCollectionBase):
1011            return False
1012        if len(self._items) != len(other):
1013            return False
1014        for k in self._items:
1015            if k not in other:
1016                return False
1017            if self._items[k] != other[k]:
1018                return False
1019        return True
1020
1021    def __ne__(self, other: Any) -> bool:
1022        """Indicate whether this collection object is not equal to another"""
1023        return not self.__eq__(other)
1024
1025    @synchronized
1026    def flush(self) -> None:
1027        """Upload any pending data to Keep"""
1028        for e in listvalues(self):
1029            e.flush()

Base class for Collection classes

RichCollectionBase(parent=None)
170    def __init__(self, parent=None):
171        self.parent = parent
172        self._committed = False
173        self._has_remote_blocks = False
174        self._callback = None
175        self._items = {}
parent
def writable(self) -> bool:
186    def writable(self) -> bool:
187        """Indicate whether this collection object can be modified
188
189        This method returns `False` if this object is a `CollectionReader`,
190        else `True`.
191        """
192        raise NotImplementedError()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def root_collection(self) -> Collection:
194    def root_collection(self) -> 'Collection':
195        """Get this collection's root collection object
196
197        If you open a subcollection with `Collection.find`, calling this method
198        on that subcollection returns the source Collection object.
199        """
200        raise NotImplementedError()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def stream_name(self) -> str:
202    def stream_name(self) -> str:
203        """Get the name of the manifest stream represented by this collection
204
205        If you open a subcollection with `Collection.find`, calling this method
206        on that subcollection returns the name of the stream you opened.
207        """
208        raise NotImplementedError()

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def has_remote_blocks(self) -> bool:
210    @synchronized
211    def has_remote_blocks(self) -> bool:
212        """Indiciate whether the collection refers to remote data
213
214        Returns `True` if the collection manifest includes any Keep locators
215        with a remote hint (`+R`), else `False`.
216        """
217        if self._has_remote_blocks:
218            return True
219        for item in self:
220            if self[item].has_remote_blocks():
221                return True
222        return False

Indiciate whether the collection refers to remote data

Returns True if the collection manifest includes any Keep locators with a remote hint (+R), else False.

@synchronized
def set_has_remote_blocks(self, val: bool) -> None:
224    @synchronized
225    def set_has_remote_blocks(self, val: bool) -> None:
226        """Cache whether this collection refers to remote blocks
227
228        .. ATTENTION:: Internal
229           This method is only meant to be used by other Collection methods.
230
231        Set this collection's cached "has remote blocks" flag to the given
232        value.
233        """
234        self._has_remote_blocks = val
235        if self.parent:
236            self.parent.set_has_remote_blocks(val)

Cache whether this collection refers to remote blocks

Set this collection’s cached “has remote blocks” flag to the given value.

@must_be_writable
@synchronized
def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
238    @must_be_writable
239    @synchronized
240    def find_or_create(
241            self,
242            path: str,
243            create_type: CreateType,
244    ) -> CollectionItem:
245        """Get the item at the given path, creating it if necessary
246
247        If `path` refers to a stream in this collection, returns a
248        corresponding `Subcollection` object. If `path` refers to a file in
249        this collection, returns a corresponding
250        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
251        this collection, then this method creates a new object and returns
252        it, creating parent streams as needed. The type of object created is
253        determined by the value of `create_type`.
254
255        Arguments:
256
257        * path: str --- The path to find or create within this collection.
258
259        * create_type: Literal[COLLECTION, FILE] --- The type of object to
260          create at `path` if one does not exist. Passing `COLLECTION`
261          creates a stream and returns the corresponding
262          `Subcollection`. Passing `FILE` creates a new file and returns the
263          corresponding `arvados.arvfile.ArvadosFile`.
264        """
265        pathcomponents = path.split("/", 1)
266        if pathcomponents[0]:
267            item = self._items.get(pathcomponents[0])
268            if len(pathcomponents) == 1:
269                if item is None:
270                    # create new file
271                    if create_type == COLLECTION:
272                        item = Subcollection(self, pathcomponents[0])
273                    else:
274                        item = ArvadosFile(self, pathcomponents[0])
275                    self._items[pathcomponents[0]] = item
276                    self.set_committed(False)
277                    self.notify(ADD, self, pathcomponents[0], item)
278                return item
279            else:
280                if item is None:
281                    # create new collection
282                    item = Subcollection(self, pathcomponents[0])
283                    self._items[pathcomponents[0]] = item
284                    self.set_committed(False)
285                    self.notify(ADD, self, pathcomponents[0], item)
286                if isinstance(item, RichCollectionBase):
287                    return item.find_or_create(pathcomponents[1], create_type)
288                else:
289                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
290        else:
291            return self

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

@synchronized
def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
293    @synchronized
294    def find(self, path: str) -> CollectionItem:
295        """Get the item at the given path
296
297        If `path` refers to a stream in this collection, returns a
298        corresponding `Subcollection` object. If `path` refers to a file in
299        this collection, returns a corresponding
300        `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
301        this collection, then this method raises `NotADirectoryError`.
302
303        Arguments:
304
305        * path: str --- The path to find or create within this collection.
306        """
307        if not path:
308            raise errors.ArgumentError("Parameter 'path' is empty.")
309
310        pathcomponents = path.split("/", 1)
311        if pathcomponents[0] == '':
312            raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
313
314        item = self._items.get(pathcomponents[0])
315        if item is None:
316            return None
317        elif len(pathcomponents) == 1:
318            return item
319        else:
320            if isinstance(item, RichCollectionBase):
321                if pathcomponents[1]:
322                    return item.find(pathcomponents[1])
323                else:
324                    return item
325            else:
326                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
@synchronized
def mkdirs(self, path: str) -> Subcollection:
328    @synchronized
329    def mkdirs(self, path: str) -> 'Subcollection':
330        """Create and return a subcollection at `path`
331
332        If `path` exists within this collection, raises `FileExistsError`.
333        Otherwise, creates a stream at that path and returns the
334        corresponding `Subcollection`.
335        """
336        if self.find(path) != None:
337            raise IOError(errno.EEXIST, "Directory or file exists", path)
338
339        return self.find_or_create(path, COLLECTION)

Create and return a subcollection at path

If path exists within this collection, raises FileExistsError. Otherwise, creates a stream at that path and returns the corresponding Subcollection.

def open( self, path: str, mode: str = 'r', encoding: Optional[str] = None) -> <class 'IO'>:
341    def open(
342            self,
343            path: str,
344            mode: str="r",
345            encoding: Optional[str]=None,
346    ) -> IO:
347        """Open a file-like object within the collection
348
349        This method returns a file-like object that can read and/or write the
350        file located at `path` within the collection. If you attempt to write
351        a `path` that does not exist, the file is created with `find_or_create`.
352        If the file cannot be opened for any other reason, this method raises
353        `OSError` with an appropriate errno.
354
355        Arguments:
356
357        * path: str --- The path of the file to open within this collection
358
359        * mode: str --- The mode to open this file. Supports all the same
360          values as `builtins.open`.
361
362        * encoding: str | None --- The text encoding of the file. Only used
363          when the file is opened in text mode. The default is
364          platform-dependent.
365        """
366        if not re.search(r'^[rwa][bt]?\+?$', mode):
367            raise errors.ArgumentError("Invalid mode {!r}".format(mode))
368
369        if mode[0] == 'r' and '+' not in mode:
370            fclass = ArvadosFileReader
371            arvfile = self.find(path)
372        elif not self.writable():
373            raise IOError(errno.EROFS, "Collection is read only")
374        else:
375            fclass = ArvadosFileWriter
376            arvfile = self.find_or_create(path, FILE)
377
378        if arvfile is None:
379            raise IOError(errno.ENOENT, "File not found", path)
380        if not isinstance(arvfile, ArvadosFile):
381            raise IOError(errno.EISDIR, "Is a directory", path)
382
383        if mode[0] == 'w':
384            arvfile.truncate(0)
385
386        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
387        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
388        if 'b' not in mode:
389            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
390            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
391        return f

Open a file-like object within the collection

This method returns a file-like object that can read and/or write the file located at path within the collection. If you attempt to write a path that does not exist, the file is created with find_or_create. If the file cannot be opened for any other reason, this method raises OSError with an appropriate errno.

Arguments:

  • path: str — The path of the file to open within this collection

  • mode: str — The mode to open this file. Supports all the same values as builtins.open.

  • encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.

def modified(self) -> bool:
393    def modified(self) -> bool:
394        """Indicate whether this collection has an API server record
395
396        Returns `False` if this collection corresponds to a record loaded from
397        the API server, `True` otherwise.
398        """
399        return not self.committed()

Indicate whether this collection has an API server record

Returns False if this collection corresponds to a record loaded from the API server, True otherwise.

@synchronized
def committed(self):
401    @synchronized
402    def committed(self):
403        """Indicate whether this collection has an API server record
404
405        Returns `True` if this collection corresponds to a record loaded from
406        the API server, `False` otherwise.
407        """
408        return self._committed

Indicate whether this collection has an API server record

Returns True if this collection corresponds to a record loaded from the API server, False otherwise.

@synchronized
def set_committed(self, value: bool = True):
410    @synchronized
411    def set_committed(self, value: bool=True):
412        """Cache whether this collection has an API server record
413
414        .. ATTENTION:: Internal
415           This method is only meant to be used by other Collection methods.
416
417        Set this collection's cached "committed" flag to the given
418        value and propagates it as needed.
419        """
420        if value == self._committed:
421            return
422        if value:
423            for k,v in listitems(self._items):
424                v.set_committed(True)
425            self._committed = True
426        else:
427            self._committed = False
428            if self.parent is not None:
429                self.parent.set_committed(False)

Cache whether this collection has an API server record

Set this collection’s cached “committed” flag to the given value and propagates it as needed.

@synchronized
def keys(self) -> Iterator[str]:
479    @synchronized
480    def keys(self) -> Iterator[str]:
481        """Iterate names of streams and files in this collection
482
483        This method does not recurse. It only iterates the contents of this
484        collection's corresponding stream.
485        """
486        return self._items.keys()

Iterate names of streams and files in this collection

This method does not recurse. It only iterates the contents of this collection’s corresponding stream.

@synchronized
def values( self) -> List[Union[arvados.arvfile.ArvadosFile, Collection]]:
488    @synchronized
489    def values(self) -> List[CollectionItem]:
490        """Get a list of objects in this collection's stream
491
492        The return value includes a `Subcollection` for every stream, and an
493        `arvados.arvfile.ArvadosFile` for every file, directly within this
494        collection's stream.  This method does not recurse.
495        """
496        return listvalues(self._items)

Get a list of objects in this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

@synchronized
def items( self) -> List[Tuple[str, Union[arvados.arvfile.ArvadosFile, Collection]]]:
498    @synchronized
499    def items(self) -> List[Tuple[str, CollectionItem]]:
500        """Get a list of `(name, object)` tuples from this collection's stream
501
502        The return value includes a `Subcollection` for every stream, and an
503        `arvados.arvfile.ArvadosFile` for every file, directly within this
504        collection's stream.  This method does not recurse.
505        """
506        return listitems(self._items)

Get a list of (name, object) tuples from this collection’s stream

The return value includes a Subcollection for every stream, and an arvados.arvfile.ArvadosFile for every file, directly within this collection’s stream. This method does not recurse.

def exists(self, path: str) -> bool:
508    def exists(self, path: str) -> bool:
509        """Indicate whether this collection includes an item at `path`
510
511        This method returns `True` if `path` refers to a stream or file within
512        this collection, else `False`.
513
514        Arguments:
515
516        * path: str --- The path to check for existence within this collection
517        """
518        return self.find(path) is not None

Indicate whether this collection includes an item at path

This method returns True if path refers to a stream or file within this collection, else False.

Arguments:

  • path: str — The path to check for existence within this collection
@must_be_writable
@synchronized
def remove(self, path: str, recursive: bool = False) -> None:
520    @must_be_writable
521    @synchronized
522    def remove(self, path: str, recursive: bool=False) -> None:
523        """Remove the file or stream at `path`
524
525        Arguments:
526
527        * path: str --- The path of the item to remove from the collection
528
529        * recursive: bool --- Controls the method's behavior if `path` refers
530          to a nonempty stream. If `False` (the default), this method raises
531          `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
532          items under the stream.
533        """
534        if not path:
535            raise errors.ArgumentError("Parameter 'path' is empty.")
536
537        pathcomponents = path.split("/", 1)
538        item = self._items.get(pathcomponents[0])
539        if item is None:
540            raise IOError(errno.ENOENT, "File not found", path)
541        if len(pathcomponents) == 1:
542            if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
543                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
544            deleteditem = self._items[pathcomponents[0]]
545            del self._items[pathcomponents[0]]
546            self.set_committed(False)
547            self.notify(DEL, self, pathcomponents[0], deleteditem)
548        else:
549            item.remove(pathcomponents[1], recursive=recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

def clone(self):
555    def clone(self):
556        raise NotImplementedError()
@must_be_writable
@synchronized
def add( self, source_obj: Union[arvados.arvfile.ArvadosFile, Collection], target_name: str, overwrite: bool = False, reparent: bool = False) -> None:
558    @must_be_writable
559    @synchronized
560    def add(
561            self,
562            source_obj: CollectionItem,
563            target_name: str,
564            overwrite: bool=False,
565            reparent: bool=False,
566    ) -> None:
567        """Copy or move a file or subcollection object to this collection
568
569        Arguments:
570
571        * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
572          to add to this collection
573
574        * target_name: str --- The path inside this collection where
575          `source_obj` should be added.
576
577        * overwrite: bool --- Controls the behavior of this method when the
578          collection already contains an object at `target_name`. If `False`
579          (the default), this method will raise `FileExistsError`. If `True`,
580          the object at `target_name` will be replaced with `source_obj`.
581
582        * reparent: bool --- Controls whether this method copies or moves
583          `source_obj`. If `False` (the default), `source_obj` is copied into
584          this collection. If `True`, `source_obj` is moved into this
585          collection.
586        """
587        if target_name in self and not overwrite:
588            raise IOError(errno.EEXIST, "File already exists", target_name)
589
590        modified_from = None
591        if target_name in self:
592            modified_from = self[target_name]
593
594        # Actually make the move or copy.
595        if reparent:
596            source_obj._reparent(self, target_name)
597            item = source_obj
598        else:
599            item = source_obj.clone(self, target_name)
600
601        self._items[target_name] = item
602        self.set_committed(False)
603        if not self._has_remote_blocks and source_obj.has_remote_blocks():
604            self.set_has_remote_blocks(True)
605
606        if modified_from:
607            self.notify(MOD, self, target_name, (modified_from, item))
608        else:
609            self.notify(ADD, self, target_name, item)

Copy or move a file or subcollection object to this collection

Arguments:

  • source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection

  • target_name: str — The path inside this collection where source_obj should be added.

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_name. If False (the default), this method will raise FileExistsError. If True, the object at target_name will be replaced with source_obj.

  • reparent: bool — Controls whether this method copies or moves source_obj. If False (the default), source_obj is copied into this collection. If True, source_obj is moved into this collection.

@must_be_writable
@synchronized
def copy( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
651    @must_be_writable
652    @synchronized
653    def copy(
654            self,
655            source: Union[str, CollectionItem],
656            target_path: str,
657            source_collection: Optional['RichCollectionBase']=None,
658            overwrite: bool=False,
659    ) -> None:
660        """Copy a file or subcollection object to this collection
661
662        Arguments:
663
664        * source: str | arvados.arvfile.ArvadosFile |
665          arvados.collection.Subcollection --- The file or subcollection to
666          add to this collection. If `source` is a str, the object will be
667          found by looking up this path from `source_collection` (see
668          below).
669
670        * target_path: str --- The path inside this collection where the
671          source object should be added.
672
673        * source_collection: arvados.collection.Collection | None --- The
674          collection to find the source object from when `source` is a
675          path. Defaults to the current collection (`self`).
676
677        * overwrite: bool --- Controls the behavior of this method when the
678          collection already contains an object at `target_path`. If `False`
679          (the default), this method will raise `FileExistsError`. If `True`,
680          the object at `target_path` will be replaced with `source_obj`.
681        """
682        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
683        target_dir.add(source_obj, target_name, overwrite, False)

Copy a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

@must_be_writable
@synchronized
def rename( self, source: Union[str, arvados.arvfile.ArvadosFile, Collection], target_path: str, source_collection: Optional[RichCollectionBase] = None, overwrite: bool = False) -> None:
685    @must_be_writable
686    @synchronized
687    def rename(
688            self,
689            source: Union[str, CollectionItem],
690            target_path: str,
691            source_collection: Optional['RichCollectionBase']=None,
692            overwrite: bool=False,
693    ) -> None:
694        """Move a file or subcollection object to this collection
695
696        Arguments:
697
698        * source: str | arvados.arvfile.ArvadosFile |
699          arvados.collection.Subcollection --- The file or subcollection to
700          add to this collection. If `source` is a str, the object will be
701          found by looking up this path from `source_collection` (see
702          below).
703
704        * target_path: str --- The path inside this collection where the
705          source object should be added.
706
707        * source_collection: arvados.collection.Collection | None --- The
708          collection to find the source object from when `source` is a
709          path. Defaults to the current collection (`self`).
710
711        * overwrite: bool --- Controls the behavior of this method when the
712          collection already contains an object at `target_path`. If `False`
713          (the default), this method will raise `FileExistsError`. If `True`,
714          the object at `target_path` will be replaced with `source_obj`.
715        """
716        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
717        if not source_obj.writable():
718            raise IOError(errno.EROFS, "Source collection is read only", source)
719        target_dir.add(source_obj, target_name, overwrite, True)

Move a file or subcollection object to this collection

Arguments:

  • source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If source is a str, the object will be found by looking up this path from source_collection (see below).

  • target_path: str — The path inside this collection where the source object should be added.

  • source_collection: Collection | None — The collection to find the source object from when source is a path. Defaults to the current collection (self).

  • overwrite: bool — Controls the behavior of this method when the collection already contains an object at target_path. If False (the default), this method will raise FileExistsError. If True, the object at target_path will be replaced with source_obj.

def portable_manifest_text(self, stream_name: str = '.') -> str:
721    def portable_manifest_text(self, stream_name: str=".") -> str:
722        """Get the portable manifest text for this collection
723
724        The portable manifest text is normalized, and does not include access
725        tokens. This method does not flush outstanding blocks to Keep.
726
727        Arguments:
728
729        * stream_name: str --- The name to use for this collection's stream in
730          the generated manifest. Default `'.'`.
731        """
732        return self._get_manifest_text(stream_name, True, True)

Get the portable manifest text for this collection

The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.
@synchronized
def manifest_text( self, stream_name: str = '.', strip: bool = False, normalize: bool = False, only_committed: bool = False) -> str:
734    @synchronized
735    def manifest_text(
736            self,
737            stream_name: str=".",
738            strip: bool=False,
739            normalize: bool=False,
740            only_committed: bool=False,
741    ) -> str:
742        """Get the manifest text for this collection
743
744        Arguments:
745
746        * stream_name: str --- The name to use for this collection's stream in
747          the generated manifest. Default `'.'`.
748
749        * strip: bool --- Controls whether or not the returned manifest text
750          includes access tokens. If `False` (the default), the manifest text
751          will include access tokens. If `True`, the manifest text will not
752          include access tokens.
753
754        * normalize: bool --- Controls whether or not the returned manifest
755          text is normalized. Default `False`.
756
757        * only_committed: bool --- Controls whether or not this method uploads
758          pending data to Keep before building and returning the manifest text.
759          If `False` (the default), this method will finish uploading all data
760          to Keep, then return the final manifest. If `True`, this method will
761          build and return a manifest that only refers to the data that has
762          finished uploading at the time this method was called.
763        """
764        if not only_committed:
765            self._my_block_manager().commit_all()
766        return self._get_manifest_text(stream_name, strip, normalize,
767                                       only_committed=only_committed)

Get the manifest text for this collection

Arguments:

  • stream_name: str — The name to use for this collection’s stream in the generated manifest. Default '.'.

  • strip: bool — Controls whether or not the returned manifest text includes access tokens. If False (the default), the manifest text will include access tokens. If True, the manifest text will not include access tokens.

  • normalize: bool — Controls whether or not the returned manifest text is normalized. Default False.

  • only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If False (the default), this method will finish uploading all data to Keep, then return the final manifest. If True, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.

@synchronized
def diff( self, end_collection: RichCollectionBase, prefix: str = '.', holding_collection: Optional[Collection] = None) -> List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]:
841    @synchronized
842    def diff(
843            self,
844            end_collection: 'RichCollectionBase',
845            prefix: str=".",
846            holding_collection: Optional['Collection']=None,
847    ) -> ChangeList:
848        """Build a list of differences between this collection and another
849
850        Arguments:
851
852        * end_collection: arvados.collection.RichCollectionBase --- A
853          collection object with the desired end state. The returned diff
854          list will describe how to go from the current collection object
855          `self` to `end_collection`.
856
857        * prefix: str --- The name to use for this collection's stream in
858          the diff list. Default `'.'`.
859
860        * holding_collection: arvados.collection.Collection | None --- A
861          collection object used to hold objects for the returned diff
862          list. By default, a new empty collection is created.
863        """
864        changes = []
865        if holding_collection is None:
866            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
867        for k in self:
868            if k not in end_collection:
869               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
870        for k in end_collection:
871            if k in self:
872                if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
873                    changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
874                elif end_collection[k] != self[k]:
875                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
876                else:
877                    changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
878            else:
879                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
880        return changes

Build a list of differences between this collection and another

Arguments:

  • end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object self to end_collection.

  • prefix: str — The name to use for this collection’s stream in the diff list. Default '.'.

  • holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.

@must_be_writable
@synchronized
def apply( self, changes: List[Union[Tuple[Literal['add', 'del'], str, Collection], Tuple[Literal['mod', 'tok'], str, Collection, Collection]]]) -> None:
882    @must_be_writable
883    @synchronized
884    def apply(self, changes: ChangeList) -> None:
885        """Apply a list of changes from to this collection
886
887        This method takes a list of changes generated by
888        `RichCollectionBase.diff` and applies it to this
889        collection. Afterward, the state of this collection object will
890        match the state of `end_collection` passed to `diff`. If a change
891        conflicts with a local change, it will be saved to an alternate path
892        indicating the conflict.
893
894        Arguments:
895
896        * changes: arvados.collection.ChangeList --- The list of differences
897          generated by `RichCollectionBase.diff`.
898        """
899        if changes:
900            self.set_committed(False)
901        for change in changes:
902            event_type = change[0]
903            path = change[1]
904            initial = change[2]
905            local = self.find(path)
906            conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
907                                                                    time.gmtime()))
908            if event_type == ADD:
909                if local is None:
910                    # No local file at path, safe to copy over new file
911                    self.copy(initial, path)
912                elif local is not None and local != initial:
913                    # There is already local file and it is different:
914                    # save change to conflict file.
915                    self.copy(initial, conflictpath)
916            elif event_type == MOD or event_type == TOK:
917                final = change[3]
918                if local == initial:
919                    # Local matches the "initial" item so it has not
920                    # changed locally and is safe to update.
921                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
922                        # Replace contents of local file with new contents
923                        local.replace_contents(final)
924                    else:
925                        # Overwrite path with new item; this can happen if
926                        # path was a file and is now a collection or vice versa
927                        self.copy(final, path, overwrite=True)
928                else:
929                    # Local is missing (presumably deleted) or local doesn't
930                    # match the "start" value, so save change to conflict file
931                    self.copy(final, conflictpath)
932            elif event_type == DEL:
933                if local == initial:
934                    # Local item matches "initial" value, so it is safe to remove.
935                    self.remove(path, recursive=True)
936                # else, the file is modified or already removed, in either
937                # case we don't want to try to remove it.

Apply a list of changes from to this collection

This method takes a list of changes generated by RichCollectionBase.diff and applies it to this collection. Afterward, the state of this collection object will match the state of end_collection passed to diff. If a change conflicts with a local change, it will be saved to an alternate path indicating the conflict.

Arguments:

def portable_data_hash(self) -> str:
939    def portable_data_hash(self) -> str:
940        """Get the portable data hash for this collection's manifest"""
941        if self._manifest_locator and self.committed():
942            # If the collection is already saved on the API server, and it's committed
943            # then return API server's PDH response.
944            return self._portable_data_hash
945        else:
946            stripped = self.portable_manifest_text().encode()
947            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))

Get the portable data hash for this collection’s manifest

@synchronized
def subscribe( self, callback: Callable[[Literal['add', 'del', 'mod', 'tok'], Collection, str, Union[arvados.arvfile.ArvadosFile, Collection]], object]) -> None:
949    @synchronized
950    def subscribe(self, callback: ChangeCallback) -> None:
951        """Set a notify callback for changes to this collection
952
953        Arguments:
954
955        * callback: arvados.collection.ChangeCallback --- The callable to
956          call each time the collection is changed.
957        """
958        if self._callback is None:
959            self._callback = callback
960        else:
961            raise errors.ArgumentError("A callback is already set on this collection.")

Set a notify callback for changes to this collection

Arguments:

@synchronized
def unsubscribe(self) -> None:
963    @synchronized
964    def unsubscribe(self) -> None:
965        """Remove any notify callback set for changes to this collection"""
966        if self._callback is not None:
967            self._callback = None

Remove any notify callback set for changes to this collection

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
 969    @synchronized
 970    def notify(
 971            self,
 972            event: ChangeType,
 973            collection: 'RichCollectionBase',
 974            name: str,
 975            item: CollectionItem,
 976    ) -> None:
 977        """Notify any subscribed callback about a change to this collection
 978
 979        .. ATTENTION:: Internal
 980           This method is only meant to be used by other Collection methods.
 981
 982        If a callback has been registered with `RichCollectionBase.subscribe`,
 983        it will be called with information about a change to this collection.
 984        Then this notification will be propagated to this collection's root.
 985
 986        Arguments:
 987
 988        * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
 989          the collection.
 990
 991        * collection: arvados.collection.RichCollectionBase --- The
 992          collection that was modified.
 993
 994        * name: str --- The name of the file or stream within `collection` that
 995          was modified.
 996
 997        * item: arvados.arvfile.ArvadosFile |
 998          arvados.collection.Subcollection --- The new contents at `name`
 999          within `collection`.
1000        """
1001        if self._callback:
1002            self._callback(event, collection, name, item)
1003        self.root_collection().notify(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

@synchronized
def flush(self) -> None:
1025    @synchronized
1026    def flush(self) -> None:
1027        """Upload any pending data to Keep"""
1028        for e in listvalues(self):
1029            e.flush()

Upload any pending data to Keep

Inherited Members
CollectionBase
stripped_manifest
class Collection(RichCollectionBase):
1032class Collection(RichCollectionBase):
1033    """Read and manipulate an Arvados collection
1034
1035    This class provides a high-level interface to create, read, and update
1036    Arvados collections and their contents. Refer to the Arvados Python SDK
1037    cookbook for [an introduction to using the Collection class][cookbook].
1038
1039    [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1040    """
1041
1042    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1043                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1044                 keep_client: Optional['arvados.keep.KeepClient']=None,
1045                 num_retries: int=10,
1046                 parent: Optional['Collection']=None,
1047                 apiconfig: Optional[Mapping[str, str]]=None,
1048                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1049                 replication_desired: Optional[int]=None,
1050                 storage_classes_desired: Optional[List[str]]=None,
1051                 put_threads: Optional[int]=None):
1052        """Initialize a Collection object
1053
1054        Arguments:
1055
1056        * manifest_locator_or_text: str | None --- This string can contain a
1057          collection manifest text, portable data hash, or UUID. When given a
1058          portable data hash or UUID, this instance will load a collection
1059          record from the API server. Otherwise, this instance will represent a
1060          new collection without an API server record. The default value `None`
1061          instantiates a new collection with an empty manifest.
1062
1063        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1064          Arvados API client object this instance uses to make requests. If
1065          none is given, this instance creates its own client using the
1066          settings from `apiconfig` (see below). If your client instantiates
1067          many Collection objects, you can help limit memory utilization by
1068          calling `arvados.api.api` to construct an
1069          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1070          for every Collection.
1071
1072        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1073          object this instance uses to make requests. If none is given, this
1074          instance creates its own client using its `api_client`.
1075
1076        * num_retries: int --- The number of times that client requests are
1077          retried. Default 10.
1078
1079        * parent: arvados.collection.Collection | None --- The parent Collection
1080          object of this instance, if any. This argument is primarily used by
1081          other Collection methods; user client code shouldn't need to use it.
1082
1083        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1084          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1085          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1086          Collection object constructs one from these settings. If no
1087          mapping is provided, calls `arvados.config.settings` to get these
1088          parameters from user configuration.
1089
1090        * block_manager: arvados.arvfile._BlockManager | None --- The
1091          _BlockManager object used by this instance to coordinate reading
1092          and writing Keep data blocks. If none is given, this instance
1093          constructs its own. This argument is primarily used by other
1094          Collection methods; user client code shouldn't need to use it.
1095
1096        * replication_desired: int | None --- This controls both the value of
1097          the `replication_desired` field on API collection records saved by
1098          this class, as well as the number of Keep services that the object
1099          writes new data blocks to. If none is given, uses the default value
1100          configured for the cluster.
1101
1102        * storage_classes_desired: list[str] | None --- This controls both
1103          the value of the `storage_classes_desired` field on API collection
1104          records saved by this class, as well as selecting which specific
1105          Keep services the object writes new data blocks to. If none is
1106          given, defaults to an empty list.
1107
1108        * put_threads: int | None --- The number of threads to run
1109          simultaneously to upload data blocks to Keep. This value is used when
1110          building a new `block_manager`. It is unused when a `block_manager`
1111          is provided.
1112        """
1113
1114        if storage_classes_desired and type(storage_classes_desired) is not list:
1115            raise errors.ArgumentError("storage_classes_desired must be list type.")
1116
1117        super(Collection, self).__init__(parent)
1118        self._api_client = api_client
1119        self._keep_client = keep_client
1120
1121        # Use the keep client from ThreadSafeApiCache
1122        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1123            self._keep_client = self._api_client.keep
1124
1125        self._block_manager = block_manager
1126        self.replication_desired = replication_desired
1127        self._storage_classes_desired = storage_classes_desired
1128        self.put_threads = put_threads
1129
1130        if apiconfig:
1131            self._config = apiconfig
1132        else:
1133            self._config = config.settings()
1134
1135        self.num_retries = num_retries
1136        self._manifest_locator = None
1137        self._manifest_text = None
1138        self._portable_data_hash = None
1139        self._api_response = None
1140        self._past_versions = set()
1141
1142        self.lock = threading.RLock()
1143        self.events = None
1144
1145        if manifest_locator_or_text:
1146            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1147                self._manifest_locator = manifest_locator_or_text
1148            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1149                self._manifest_locator = manifest_locator_or_text
1150                if not self._has_local_collection_uuid():
1151                    self._has_remote_blocks = True
1152            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1153                self._manifest_text = manifest_locator_or_text
1154                if '+R' in self._manifest_text:
1155                    self._has_remote_blocks = True
1156            else:
1157                raise errors.ArgumentError(
1158                    "Argument to CollectionReader is not a manifest or a collection UUID")
1159
1160            try:
1161                self._populate()
1162            except errors.SyntaxError as e:
1163                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1164
1165    def storage_classes_desired(self) -> List[str]:
1166        """Get this collection's `storage_classes_desired` value"""
1167        return self._storage_classes_desired or []
1168
1169    def root_collection(self) -> 'Collection':
1170        return self
1171
1172    def get_properties(self) -> Properties:
1173        """Get this collection's properties
1174
1175        This method always returns a dict. If this collection object does not
1176        have an associated API record, or that record does not have any
1177        properties set, this method returns an empty dict.
1178        """
1179        if self._api_response and self._api_response["properties"]:
1180            return self._api_response["properties"]
1181        else:
1182            return {}
1183
1184    def get_trash_at(self) -> Optional[datetime.datetime]:
1185        """Get this collection's `trash_at` field
1186
1187        This method parses the `trash_at` field of the collection's API
1188        record and returns a datetime from it. If that field is not set, or
1189        this collection object does not have an associated API record,
1190        returns None.
1191        """
1192        if self._api_response and self._api_response["trash_at"]:
1193            try:
1194                return ciso8601.parse_datetime(self._api_response["trash_at"])
1195            except ValueError:
1196                return None
1197        else:
1198            return None
1199
1200    def stream_name(self) -> str:
1201        return "."
1202
1203    def writable(self) -> bool:
1204        return True
1205
1206    @synchronized
1207    def known_past_version(
1208            self,
1209            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1210    ) -> bool:
1211        """Indicate whether an API record for this collection has been seen before
1212
1213        As this collection object loads records from the API server, it records
1214        their `modified_at` and `portable_data_hash` fields. This method accepts
1215        a 2-tuple with values for those fields, and returns `True` if the
1216        combination was previously loaded.
1217        """
1218        return modified_at_and_portable_data_hash in self._past_versions
1219
1220    @synchronized
1221    @retry_method
1222    def update(
1223            self,
1224            other: Optional['Collection']=None,
1225            num_retries: Optional[int]=None,
1226    ) -> None:
1227        """Merge another collection's contents into this one
1228
1229        This method compares the manifest of this collection instance with
1230        another, then updates this instance's manifest with changes from the
1231        other, renaming files to flag conflicts where necessary.
1232
1233        When called without any arguments, this method reloads the collection's
1234        API record, and updates this instance with any changes that have
1235        appeared server-side. If this instance does not have a corresponding
1236        API record, this method raises `arvados.errors.ArgumentError`.
1237
1238        Arguments:
1239
1240        * other: arvados.collection.Collection | None --- The collection
1241          whose contents should be merged into this instance. When not
1242          provided, this method reloads this collection's API record and
1243          constructs a Collection object from it.  If this instance does not
1244          have a corresponding API record, this method raises
1245          `arvados.errors.ArgumentError`.
1246
1247        * num_retries: int | None --- The number of times to retry reloading
1248          the collection's API record from the API server. If not specified,
1249          uses the `num_retries` provided when this instance was constructed.
1250        """
1251        if other is None:
1252            if self._manifest_locator is None:
1253                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1254            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1255            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1256                response.get("portable_data_hash") != self.portable_data_hash()):
1257                # The record on the server is different from our current one, but we've seen it before,
1258                # so ignore it because it's already been merged.
1259                # However, if it's the same as our current record, proceed with the update, because we want to update
1260                # our tokens.
1261                return
1262            else:
1263                self._remember_api_response(response)
1264            other = CollectionReader(response["manifest_text"])
1265        baseline = CollectionReader(self._manifest_text)
1266        self.apply(baseline.diff(other))
1267        self._manifest_text = self.manifest_text()
1268
1269    @synchronized
1270    def _my_api(self):
1271        if self._api_client is None:
1272            self._api_client = ThreadSafeApiCache(self._config, version='v1')
1273            if self._keep_client is None:
1274                self._keep_client = self._api_client.keep
1275        return self._api_client
1276
1277    @synchronized
1278    def _my_keep(self):
1279        if self._keep_client is None:
1280            if self._api_client is None:
1281                self._my_api()
1282            else:
1283                self._keep_client = KeepClient(api_client=self._api_client)
1284        return self._keep_client
1285
1286    @synchronized
1287    def _my_block_manager(self):
1288        if self._block_manager is None:
1289            copies = (self.replication_desired or
1290                      self._my_api()._rootDesc.get('defaultCollectionReplication',
1291                                                   2))
1292            self._block_manager = _BlockManager(self._my_keep(),
1293                                                copies=copies,
1294                                                put_threads=self.put_threads,
1295                                                num_retries=self.num_retries,
1296                                                storage_classes_func=self.storage_classes_desired)
1297        return self._block_manager
1298
1299    def _remember_api_response(self, response):
1300        self._api_response = response
1301        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1302
1303    def _populate_from_api_server(self):
1304        # As in KeepClient itself, we must wait until the last
1305        # possible moment to instantiate an API client, in order to
1306        # avoid tripping up clients that don't have access to an API
1307        # server.  If we do build one, make sure our Keep client uses
1308        # it.  If instantiation fails, we'll fall back to the except
1309        # clause, just like any other Collection lookup
1310        # failure. Return an exception, or None if successful.
1311        self._remember_api_response(self._my_api().collections().get(
1312            uuid=self._manifest_locator).execute(
1313                num_retries=self.num_retries))
1314        self._manifest_text = self._api_response['manifest_text']
1315        self._portable_data_hash = self._api_response['portable_data_hash']
1316        # If not overriden via kwargs, we should try to load the
1317        # replication_desired and storage_classes_desired from the API server
1318        if self.replication_desired is None:
1319            self.replication_desired = self._api_response.get('replication_desired', None)
1320        if self._storage_classes_desired is None:
1321            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1322
1323    def _populate(self):
1324        if self._manifest_text is None:
1325            if self._manifest_locator is None:
1326                return
1327            else:
1328                self._populate_from_api_server()
1329        self._baseline_manifest = self._manifest_text
1330        self._import_manifest(self._manifest_text)
1331
1332    def _has_collection_uuid(self):
1333        return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1334
1335    def _has_local_collection_uuid(self):
1336        return self._has_collection_uuid and \
1337            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1338
1339    def __enter__(self):
1340        return self
1341
1342    def __exit__(self, exc_type, exc_value, traceback):
1343        """Exit a context with this collection instance
1344
1345        If no exception was raised inside the context block, and this
1346        collection is writable and has a corresponding API record, that
1347        record will be updated to match the state of this instance at the end
1348        of the block.
1349        """
1350        if exc_type is None:
1351            if self.writable() and self._has_collection_uuid():
1352                self.save()
1353        self.stop_threads()
1354
1355    def stop_threads(self) -> None:
1356        """Stop background Keep upload/download threads"""
1357        if self._block_manager is not None:
1358            self._block_manager.stop_threads()
1359
1360    @synchronized
1361    def manifest_locator(self) -> Optional[str]:
1362        """Get this collection's manifest locator, if any
1363
1364        * If this collection instance is associated with an API record with a
1365          UUID, return that.
1366        * Otherwise, if this collection instance was loaded from an API record
1367          by portable data hash, return that.
1368        * Otherwise, return `None`.
1369        """
1370        return self._manifest_locator
1371
1372    @synchronized
1373    def clone(
1374            self,
1375            new_parent: Optional['Collection']=None,
1376            new_name: Optional[str]=None,
1377            readonly: bool=False,
1378            new_config: Optional[Mapping[str, str]]=None,
1379    ) -> 'Collection':
1380        """Create a Collection object with the same contents as this instance
1381
1382        This method creates a new Collection object with contents that match
1383        this instance's. The new collection will not be associated with any API
1384        record.
1385
1386        Arguments:
1387
1388        * new_parent: arvados.collection.Collection | None --- This value is
1389          passed to the new Collection's constructor as the `parent`
1390          argument.
1391
1392        * new_name: str | None --- This value is unused.
1393
1394        * readonly: bool --- If this value is true, this method constructs and
1395          returns a `CollectionReader`. Otherwise, it returns a mutable
1396          `Collection`. Default `False`.
1397
1398        * new_config: Mapping[str, str] | None --- This value is passed to the
1399          new Collection's constructor as `apiconfig`. If no value is provided,
1400          defaults to the configuration passed to this instance's constructor.
1401        """
1402        if new_config is None:
1403            new_config = self._config
1404        if readonly:
1405            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1406        else:
1407            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1408
1409        newcollection._clonefrom(self)
1410        return newcollection
1411
1412    @synchronized
1413    def api_response(self) -> Optional[Dict[str, Any]]:
1414        """Get this instance's associated API record
1415
1416        If this Collection instance has an associated API record, return it.
1417        Otherwise, return `None`.
1418        """
1419        return self._api_response
1420
1421    def find_or_create(
1422            self,
1423            path: str,
1424            create_type: CreateType,
1425    ) -> CollectionItem:
1426        if path == ".":
1427            return self
1428        else:
1429            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1430
1431    def find(self, path: str) -> CollectionItem:
1432        if path == ".":
1433            return self
1434        else:
1435            return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1436
1437    def remove(self, path: str, recursive: bool=False) -> None:
1438        if path == ".":
1439            raise errors.ArgumentError("Cannot remove '.'")
1440        else:
1441            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1442
1443    @must_be_writable
1444    @synchronized
1445    @retry_method
1446    def save(
1447            self,
1448            properties: Optional[Properties]=None,
1449            storage_classes: Optional[StorageClasses]=None,
1450            trash_at: Optional[datetime.datetime]=None,
1451            merge: bool=True,
1452            num_retries: Optional[int]=None,
1453            preserve_version: bool=False,
1454    ) -> str:
1455        """Save collection to an existing API record
1456
1457        This method updates the instance's corresponding API record to match
1458        the instance's state. If this instance does not have a corresponding API
1459        record yet, raises `AssertionError`. (To create a new API record, use
1460        `Collection.save_new`.) This method returns the saved collection
1461        manifest.
1462
1463        Arguments:
1464
1465        * properties: dict[str, Any] | None --- If provided, the API record will
1466          be updated with these properties. Note this will completely replace
1467          any existing properties.
1468
1469        * storage_classes: list[str] | None --- If provided, the API record will
1470          be updated with this value in the `storage_classes_desired` field.
1471          This value will also be saved on the instance and used for any
1472          changes that follow.
1473
1474        * trash_at: datetime.datetime | None --- If provided, the API record
1475          will be updated with this value in the `trash_at` field.
1476
1477        * merge: bool --- If `True` (the default), this method will first
1478          reload this collection's API record, and merge any new contents into
1479          this instance before saving changes. See `Collection.update` for
1480          details.
1481
1482        * num_retries: int | None --- The number of times to retry reloading
1483          the collection's API record from the API server. If not specified,
1484          uses the `num_retries` provided when this instance was constructed.
1485
1486        * preserve_version: bool --- This value will be passed to directly
1487          to the underlying API call. If `True`, the Arvados API will
1488          preserve the versions of this collection both immediately before
1489          and after the update. If `True` when the API server is not
1490          configured with collection versioning, this method raises
1491          `arvados.errors.ArgumentError`.
1492        """
1493        if properties and type(properties) is not dict:
1494            raise errors.ArgumentError("properties must be dictionary type.")
1495
1496        if storage_classes and type(storage_classes) is not list:
1497            raise errors.ArgumentError("storage_classes must be list type.")
1498        if storage_classes:
1499            self._storage_classes_desired = storage_classes
1500
1501        if trash_at and type(trash_at) is not datetime.datetime:
1502            raise errors.ArgumentError("trash_at must be datetime type.")
1503
1504        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1505            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1506
1507        body={}
1508        if properties:
1509            body["properties"] = properties
1510        if self.storage_classes_desired():
1511            body["storage_classes_desired"] = self.storage_classes_desired()
1512        if trash_at:
1513            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1514            body["trash_at"] = t
1515        if preserve_version:
1516            body["preserve_version"] = preserve_version
1517
1518        if not self.committed():
1519            if self._has_remote_blocks:
1520                # Copy any remote blocks to the local cluster.
1521                self._copy_remote_blocks(remote_blocks={})
1522                self._has_remote_blocks = False
1523            if not self._has_collection_uuid():
1524                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1525            elif not self._has_local_collection_uuid():
1526                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1527
1528            self._my_block_manager().commit_all()
1529
1530            if merge:
1531                self.update()
1532
1533            text = self.manifest_text(strip=False)
1534            body['manifest_text'] = text
1535
1536            self._remember_api_response(self._my_api().collections().update(
1537                uuid=self._manifest_locator,
1538                body=body
1539                ).execute(num_retries=num_retries))
1540            self._manifest_text = self._api_response["manifest_text"]
1541            self._portable_data_hash = self._api_response["portable_data_hash"]
1542            self.set_committed(True)
1543        elif body:
1544            self._remember_api_response(self._my_api().collections().update(
1545                uuid=self._manifest_locator,
1546                body=body
1547                ).execute(num_retries=num_retries))
1548
1549        return self._manifest_text
1550
1551
1552    @must_be_writable
1553    @synchronized
1554    @retry_method
1555    def save_new(
1556            self,
1557            name: Optional[str]=None,
1558            create_collection_record: bool=True,
1559            owner_uuid: Optional[str]=None,
1560            properties: Optional[Properties]=None,
1561            storage_classes: Optional[StorageClasses]=None,
1562            trash_at: Optional[datetime.datetime]=None,
1563            ensure_unique_name: bool=False,
1564            num_retries: Optional[int]=None,
1565            preserve_version: bool=False,
1566    ):
1567        """Save collection to a new API record
1568
1569        This method finishes uploading new data blocks and (optionally)
1570        creates a new API collection record with the provided data. If a new
1571        record is created, this instance becomes associated with that record
1572        for future updates like `save()`. This method returns the saved
1573        collection manifest.
1574
1575        Arguments:
1576
1577        * name: str | None --- The `name` field to use on the new collection
1578          record. If not specified, a generic default name is generated.
1579
1580        * create_collection_record: bool --- If `True` (the default), creates a
1581          collection record on the API server. If `False`, the method finishes
1582          all data uploads and only returns the resulting collection manifest
1583          without sending it to the API server.
1584
1585        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1586          new collection record.
1587
1588        * properties: dict[str, Any] | None --- The `properties` field to use on
1589          the new collection record.
1590
1591        * storage_classes: list[str] | None --- The
1592          `storage_classes_desired` field to use on the new collection record.
1593
1594        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1595          on the new collection record.
1596
1597        * ensure_unique_name: bool --- This value is passed directly to the
1598          Arvados API when creating the collection record. If `True`, the API
1599          server may modify the submitted `name` to ensure the collection's
1600          `name`+`owner_uuid` combination is unique. If `False` (the default),
1601          if a collection already exists with this same `name`+`owner_uuid`
1602          combination, creating a collection record will raise a validation
1603          error.
1604
1605        * num_retries: int | None --- The number of times to retry reloading
1606          the collection's API record from the API server. If not specified,
1607          uses the `num_retries` provided when this instance was constructed.
1608
1609        * preserve_version: bool --- This value will be passed to directly
1610          to the underlying API call. If `True`, the Arvados API will
1611          preserve the versions of this collection both immediately before
1612          and after the update. If `True` when the API server is not
1613          configured with collection versioning, this method raises
1614          `arvados.errors.ArgumentError`.
1615        """
1616        if properties and type(properties) is not dict:
1617            raise errors.ArgumentError("properties must be dictionary type.")
1618
1619        if storage_classes and type(storage_classes) is not list:
1620            raise errors.ArgumentError("storage_classes must be list type.")
1621
1622        if trash_at and type(trash_at) is not datetime.datetime:
1623            raise errors.ArgumentError("trash_at must be datetime type.")
1624
1625        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1626            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1627
1628        if self._has_remote_blocks:
1629            # Copy any remote blocks to the local cluster.
1630            self._copy_remote_blocks(remote_blocks={})
1631            self._has_remote_blocks = False
1632
1633        if storage_classes:
1634            self._storage_classes_desired = storage_classes
1635
1636        self._my_block_manager().commit_all()
1637        text = self.manifest_text(strip=False)
1638
1639        if create_collection_record:
1640            if name is None:
1641                name = "New collection"
1642                ensure_unique_name = True
1643
1644            body = {"manifest_text": text,
1645                    "name": name,
1646                    "replication_desired": self.replication_desired}
1647            if owner_uuid:
1648                body["owner_uuid"] = owner_uuid
1649            if properties:
1650                body["properties"] = properties
1651            if self.storage_classes_desired():
1652                body["storage_classes_desired"] = self.storage_classes_desired()
1653            if trash_at:
1654                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1655                body["trash_at"] = t
1656            if preserve_version:
1657                body["preserve_version"] = preserve_version
1658
1659            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1660            text = self._api_response["manifest_text"]
1661
1662            self._manifest_locator = self._api_response["uuid"]
1663            self._portable_data_hash = self._api_response["portable_data_hash"]
1664
1665            self._manifest_text = text
1666            self.set_committed(True)
1667
1668        return text
1669
1670    _token_re = re.compile(r'(\S+)(\s+|$)')
1671    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1672    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1673
1674    def _unescape_manifest_path(self, path):
1675        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1676
1677    @synchronized
1678    def _import_manifest(self, manifest_text):
1679        """Import a manifest into a `Collection`.
1680
1681        :manifest_text:
1682          The manifest text to import from.
1683
1684        """
1685        if len(self) > 0:
1686            raise ArgumentError("Can only import manifest into an empty collection")
1687
1688        STREAM_NAME = 0
1689        BLOCKS = 1
1690        SEGMENTS = 2
1691
1692        stream_name = None
1693        state = STREAM_NAME
1694
1695        for token_and_separator in self._token_re.finditer(manifest_text):
1696            tok = token_and_separator.group(1)
1697            sep = token_and_separator.group(2)
1698
1699            if state == STREAM_NAME:
1700                # starting a new stream
1701                stream_name = self._unescape_manifest_path(tok)
1702                blocks = []
1703                segments = []
1704                streamoffset = 0
1705                state = BLOCKS
1706                self.find_or_create(stream_name, COLLECTION)
1707                continue
1708
1709            if state == BLOCKS:
1710                block_locator = self._block_re.match(tok)
1711                if block_locator:
1712                    blocksize = int(block_locator.group(1))
1713                    blocks.append(Range(tok, streamoffset, blocksize, 0))
1714                    streamoffset += blocksize
1715                else:
1716                    state = SEGMENTS
1717
1718            if state == SEGMENTS:
1719                file_segment = self._segment_re.match(tok)
1720                if file_segment:
1721                    pos = int(file_segment.group(1))
1722                    size = int(file_segment.group(2))
1723                    name = self._unescape_manifest_path(file_segment.group(3))
1724                    if name.split('/')[-1] == '.':
1725                        # placeholder for persisting an empty directory, not a real file
1726                        if len(name) > 2:
1727                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1728                    else:
1729                        filepath = os.path.join(stream_name, name)
1730                        try:
1731                            afile = self.find_or_create(filepath, FILE)
1732                        except IOError as e:
1733                            if e.errno == errno.ENOTDIR:
1734                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1735                            else:
1736                                raise e from None
1737                        if isinstance(afile, ArvadosFile):
1738                            afile.add_segment(blocks, pos, size)
1739                        else:
1740                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1741                else:
1742                    # error!
1743                    raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1744
1745            if sep == "\n":
1746                stream_name = None
1747                state = STREAM_NAME
1748
1749        self.set_committed(True)
1750
1751    @synchronized
1752    def notify(
1753            self,
1754            event: ChangeType,
1755            collection: 'RichCollectionBase',
1756            name: str,
1757            item: CollectionItem,
1758    ) -> None:
1759        if self._callback:
1760            self._callback(event, collection, name, item)

Read and manipulate an Arvados collection

This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.

Collection( manifest_locator_or_text: Optional[str] = None, api_client: Optional[arvados.api_resources.ArvadosAPIClient] = None, keep_client: Optional[arvados.keep.KeepClient] = None, num_retries: int = 10, parent: Optional[Collection] = None, apiconfig: Optional[Mapping[str, str]] = None, block_manager: Optional[arvados.arvfile._BlockManager] = None, replication_desired: Optional[int] = None, storage_classes_desired: Optional[List[str]] = None, put_threads: Optional[int] = None)
1042    def __init__(self, manifest_locator_or_text: Optional[str]=None,
1043                 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1044                 keep_client: Optional['arvados.keep.KeepClient']=None,
1045                 num_retries: int=10,
1046                 parent: Optional['Collection']=None,
1047                 apiconfig: Optional[Mapping[str, str]]=None,
1048                 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1049                 replication_desired: Optional[int]=None,
1050                 storage_classes_desired: Optional[List[str]]=None,
1051                 put_threads: Optional[int]=None):
1052        """Initialize a Collection object
1053
1054        Arguments:
1055
1056        * manifest_locator_or_text: str | None --- This string can contain a
1057          collection manifest text, portable data hash, or UUID. When given a
1058          portable data hash or UUID, this instance will load a collection
1059          record from the API server. Otherwise, this instance will represent a
1060          new collection without an API server record. The default value `None`
1061          instantiates a new collection with an empty manifest.
1062
1063        * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1064          Arvados API client object this instance uses to make requests. If
1065          none is given, this instance creates its own client using the
1066          settings from `apiconfig` (see below). If your client instantiates
1067          many Collection objects, you can help limit memory utilization by
1068          calling `arvados.api.api` to construct an
1069          `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1070          for every Collection.
1071
1072        * keep_client: arvados.keep.KeepClient | None --- The Keep client
1073          object this instance uses to make requests. If none is given, this
1074          instance creates its own client using its `api_client`.
1075
1076        * num_retries: int --- The number of times that client requests are
1077          retried. Default 10.
1078
1079        * parent: arvados.collection.Collection | None --- The parent Collection
1080          object of this instance, if any. This argument is primarily used by
1081          other Collection methods; user client code shouldn't need to use it.
1082
1083        * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1084          `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1085          `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1086          Collection object constructs one from these settings. If no
1087          mapping is provided, calls `arvados.config.settings` to get these
1088          parameters from user configuration.
1089
1090        * block_manager: arvados.arvfile._BlockManager | None --- The
1091          _BlockManager object used by this instance to coordinate reading
1092          and writing Keep data blocks. If none is given, this instance
1093          constructs its own. This argument is primarily used by other
1094          Collection methods; user client code shouldn't need to use it.
1095
1096        * replication_desired: int | None --- This controls both the value of
1097          the `replication_desired` field on API collection records saved by
1098          this class, as well as the number of Keep services that the object
1099          writes new data blocks to. If none is given, uses the default value
1100          configured for the cluster.
1101
1102        * storage_classes_desired: list[str] | None --- This controls both
1103          the value of the `storage_classes_desired` field on API collection
1104          records saved by this class, as well as selecting which specific
1105          Keep services the object writes new data blocks to. If none is
1106          given, defaults to an empty list.
1107
1108        * put_threads: int | None --- The number of threads to run
1109          simultaneously to upload data blocks to Keep. This value is used when
1110          building a new `block_manager`. It is unused when a `block_manager`
1111          is provided.
1112        """
1113
1114        if storage_classes_desired and type(storage_classes_desired) is not list:
1115            raise errors.ArgumentError("storage_classes_desired must be list type.")
1116
1117        super(Collection, self).__init__(parent)
1118        self._api_client = api_client
1119        self._keep_client = keep_client
1120
1121        # Use the keep client from ThreadSafeApiCache
1122        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1123            self._keep_client = self._api_client.keep
1124
1125        self._block_manager = block_manager
1126        self.replication_desired = replication_desired
1127        self._storage_classes_desired = storage_classes_desired
1128        self.put_threads = put_threads
1129
1130        if apiconfig:
1131            self._config = apiconfig
1132        else:
1133            self._config = config.settings()
1134
1135        self.num_retries = num_retries
1136        self._manifest_locator = None
1137        self._manifest_text = None
1138        self._portable_data_hash = None
1139        self._api_response = None
1140        self._past_versions = set()
1141
1142        self.lock = threading.RLock()
1143        self.events = None
1144
1145        if manifest_locator_or_text:
1146            if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1147                self._manifest_locator = manifest_locator_or_text
1148            elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1149                self._manifest_locator = manifest_locator_or_text
1150                if not self._has_local_collection_uuid():
1151                    self._has_remote_blocks = True
1152            elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1153                self._manifest_text = manifest_locator_or_text
1154                if '+R' in self._manifest_text:
1155                    self._has_remote_blocks = True
1156            else:
1157                raise errors.ArgumentError(
1158                    "Argument to CollectionReader is not a manifest or a collection UUID")
1159
1160            try:
1161                self._populate()
1162            except errors.SyntaxError as e:
1163                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.safeapi.ThreadSafeApiCache, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

replication_desired
put_threads
num_retries
lock
events
def storage_classes_desired(self) -> List[str]:
1165    def storage_classes_desired(self) -> List[str]:
1166        """Get this collection's `storage_classes_desired` value"""
1167        return self._storage_classes_desired or []

Get this collection’s storage_classes_desired value

def root_collection(self) -> Collection:
1169    def root_collection(self) -> 'Collection':
1170        return self

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def get_properties(self) -> Dict[str, Any]:
1172    def get_properties(self) -> Properties:
1173        """Get this collection's properties
1174
1175        This method always returns a dict. If this collection object does not
1176        have an associated API record, or that record does not have any
1177        properties set, this method returns an empty dict.
1178        """
1179        if self._api_response and self._api_response["properties"]:
1180            return self._api_response["properties"]
1181        else:
1182            return {}

Get this collection’s properties

This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.

def get_trash_at(self) -> Optional[datetime.datetime]:
1184    def get_trash_at(self) -> Optional[datetime.datetime]:
1185        """Get this collection's `trash_at` field
1186
1187        This method parses the `trash_at` field of the collection's API
1188        record and returns a datetime from it. If that field is not set, or
1189        this collection object does not have an associated API record,
1190        returns None.
1191        """
1192        if self._api_response and self._api_response["trash_at"]:
1193            try:
1194                return ciso8601.parse_datetime(self._api_response["trash_at"])
1195            except ValueError:
1196                return None
1197        else:
1198            return None

Get this collection’s trash_at field

This method parses the trash_at field of the collection’s API record and returns a datetime from it. If that field is not set, or this collection object does not have an associated API record, returns None.

def stream_name(self) -> str:
1200    def stream_name(self) -> str:
1201        return "."

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

def writable(self) -> bool:
1203    def writable(self) -> bool:
1204        return True

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

@synchronized
def known_past_version( self, modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]) -> bool:
1206    @synchronized
1207    def known_past_version(
1208            self,
1209            modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1210    ) -> bool:
1211        """Indicate whether an API record for this collection has been seen before
1212
1213        As this collection object loads records from the API server, it records
1214        their `modified_at` and `portable_data_hash` fields. This method accepts
1215        a 2-tuple with values for those fields, and returns `True` if the
1216        combination was previously loaded.
1217        """
1218        return modified_at_and_portable_data_hash in self._past_versions

Indicate whether an API record for this collection has been seen before

As this collection object loads records from the API server, it records their modified_at and portable_data_hash fields. This method accepts a 2-tuple with values for those fields, and returns True if the combination was previously loaded.

@synchronized
@retry_method
def update( self, other: Optional[Collection] = None, num_retries: Optional[int] = None) -> None:
1220    @synchronized
1221    @retry_method
1222    def update(
1223            self,
1224            other: Optional['Collection']=None,
1225            num_retries: Optional[int]=None,
1226    ) -> None:
1227        """Merge another collection's contents into this one
1228
1229        This method compares the manifest of this collection instance with
1230        another, then updates this instance's manifest with changes from the
1231        other, renaming files to flag conflicts where necessary.
1232
1233        When called without any arguments, this method reloads the collection's
1234        API record, and updates this instance with any changes that have
1235        appeared server-side. If this instance does not have a corresponding
1236        API record, this method raises `arvados.errors.ArgumentError`.
1237
1238        Arguments:
1239
1240        * other: arvados.collection.Collection | None --- The collection
1241          whose contents should be merged into this instance. When not
1242          provided, this method reloads this collection's API record and
1243          constructs a Collection object from it.  If this instance does not
1244          have a corresponding API record, this method raises
1245          `arvados.errors.ArgumentError`.
1246
1247        * num_retries: int | None --- The number of times to retry reloading
1248          the collection's API record from the API server. If not specified,
1249          uses the `num_retries` provided when this instance was constructed.
1250        """
1251        if other is None:
1252            if self._manifest_locator is None:
1253                raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1254            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1255            if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1256                response.get("portable_data_hash") != self.portable_data_hash()):
1257                # The record on the server is different from our current one, but we've seen it before,
1258                # so ignore it because it's already been merged.
1259                # However, if it's the same as our current record, proceed with the update, because we want to update
1260                # our tokens.
1261                return
1262            else:
1263                self._remember_api_response(response)
1264            other = CollectionReader(response["manifest_text"])
1265        baseline = CollectionReader(self._manifest_text)
1266        self.apply(baseline.diff(other))
1267        self._manifest_text = self.manifest_text()

Merge another collection’s contents into this one

This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.

When called without any arguments, this method reloads the collection’s API record, and updates this instance with any changes that have appeared server-side. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

Arguments:

  • other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises arvados.errors.ArgumentError.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

def stop_threads(self) -> None:
1355    def stop_threads(self) -> None:
1356        """Stop background Keep upload/download threads"""
1357        if self._block_manager is not None:
1358            self._block_manager.stop_threads()

Stop background Keep upload/download threads

@synchronized
def manifest_locator(self) -> Optional[str]:
1360    @synchronized
1361    def manifest_locator(self) -> Optional[str]:
1362        """Get this collection's manifest locator, if any
1363
1364        * If this collection instance is associated with an API record with a
1365          UUID, return that.
1366        * Otherwise, if this collection instance was loaded from an API record
1367          by portable data hash, return that.
1368        * Otherwise, return `None`.
1369        """
1370        return self._manifest_locator

Get this collection’s manifest locator, if any

  • If this collection instance is associated with an API record with a UUID, return that.
  • Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
  • Otherwise, return None.
@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None, readonly: bool = False, new_config: Optional[Mapping[str, str]] = None) -> Collection:
1372    @synchronized
1373    def clone(
1374            self,
1375            new_parent: Optional['Collection']=None,
1376            new_name: Optional[str]=None,
1377            readonly: bool=False,
1378            new_config: Optional[Mapping[str, str]]=None,
1379    ) -> 'Collection':
1380        """Create a Collection object with the same contents as this instance
1381
1382        This method creates a new Collection object with contents that match
1383        this instance's. The new collection will not be associated with any API
1384        record.
1385
1386        Arguments:
1387
1388        * new_parent: arvados.collection.Collection | None --- This value is
1389          passed to the new Collection's constructor as the `parent`
1390          argument.
1391
1392        * new_name: str | None --- This value is unused.
1393
1394        * readonly: bool --- If this value is true, this method constructs and
1395          returns a `CollectionReader`. Otherwise, it returns a mutable
1396          `Collection`. Default `False`.
1397
1398        * new_config: Mapping[str, str] | None --- This value is passed to the
1399          new Collection's constructor as `apiconfig`. If no value is provided,
1400          defaults to the configuration passed to this instance's constructor.
1401        """
1402        if new_config is None:
1403            new_config = self._config
1404        if readonly:
1405            newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1406        else:
1407            newcollection = Collection(parent=new_parent, apiconfig=new_config)
1408
1409        newcollection._clonefrom(self)
1410        return newcollection

Create a Collection object with the same contents as this instance

This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.

Arguments:

  • new_parent: Collection | None — This value is passed to the new Collection’s constructor as the parent argument.

  • new_name: str | None — This value is unused.

  • readonly: bool — If this value is true, this method constructs and returns a CollectionReader. Otherwise, it returns a mutable Collection. Default False.

  • new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as apiconfig. If no value is provided, defaults to the configuration passed to this instance’s constructor.

@synchronized
def api_response(self) -> Optional[Dict[str, Any]]:
1412    @synchronized
1413    def api_response(self) -> Optional[Dict[str, Any]]:
1414        """Get this instance's associated API record
1415
1416        If this Collection instance has an associated API record, return it.
1417        Otherwise, return `None`.
1418        """
1419        return self._api_response

Get this instance’s associated API record

If this Collection instance has an associated API record, return it. Otherwise, return None.

def find_or_create( self, path: str, create_type: Literal['collection', 'file']) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1421    def find_or_create(
1422            self,
1423            path: str,
1424            create_type: CreateType,
1425    ) -> CollectionItem:
1426        if path == ".":
1427            return self
1428        else:
1429            return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)

Get the item at the given path, creating it if necessary

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method creates a new object and returns it, creating parent streams as needed. The type of object created is determined by the value of create_type.

Arguments:

  • path: str — The path to find or create within this collection.

  • create_type: Literal[COLLECTION, FILE] — The type of object to create at path if one does not exist. Passing COLLECTION creates a stream and returns the corresponding Subcollection. Passing FILE creates a new file and returns the corresponding arvados.arvfile.ArvadosFile.

def find( self, path: str) -> Union[arvados.arvfile.ArvadosFile, Collection]:
1431    def find(self, path: str) -> CollectionItem:
1432        if path == ".":
1433            return self
1434        else:
1435            return super(Collection, self).find(path[2:] if path.startswith("./") else path)

Get the item at the given path

If path refers to a stream in this collection, returns a corresponding Subcollection object. If path refers to a file in this collection, returns a corresponding arvados.arvfile.ArvadosFile object. If path does not exist in this collection, then this method raises NotADirectoryError.

Arguments:

  • path: str — The path to find or create within this collection.
def remove(self, path: str, recursive: bool = False) -> None:
1437    def remove(self, path: str, recursive: bool=False) -> None:
1438        if path == ".":
1439            raise errors.ArgumentError("Cannot remove '.'")
1440        else:
1441            return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)

Remove the file or stream at path

Arguments:

  • path: str — The path of the item to remove from the collection

  • recursive: bool — Controls the method’s behavior if path refers to a nonempty stream. If False (the default), this method raises OSError with errno ENOTEMPTY. If True, this method removes all items under the stream.

@must_be_writable
@synchronized
@retry_method
def save( self, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, merge: bool = True, num_retries: Optional[int] = None, preserve_version: bool = False) -> str:
1443    @must_be_writable
1444    @synchronized
1445    @retry_method
1446    def save(
1447            self,
1448            properties: Optional[Properties]=None,
1449            storage_classes: Optional[StorageClasses]=None,
1450            trash_at: Optional[datetime.datetime]=None,
1451            merge: bool=True,
1452            num_retries: Optional[int]=None,
1453            preserve_version: bool=False,
1454    ) -> str:
1455        """Save collection to an existing API record
1456
1457        This method updates the instance's corresponding API record to match
1458        the instance's state. If this instance does not have a corresponding API
1459        record yet, raises `AssertionError`. (To create a new API record, use
1460        `Collection.save_new`.) This method returns the saved collection
1461        manifest.
1462
1463        Arguments:
1464
1465        * properties: dict[str, Any] | None --- If provided, the API record will
1466          be updated with these properties. Note this will completely replace
1467          any existing properties.
1468
1469        * storage_classes: list[str] | None --- If provided, the API record will
1470          be updated with this value in the `storage_classes_desired` field.
1471          This value will also be saved on the instance and used for any
1472          changes that follow.
1473
1474        * trash_at: datetime.datetime | None --- If provided, the API record
1475          will be updated with this value in the `trash_at` field.
1476
1477        * merge: bool --- If `True` (the default), this method will first
1478          reload this collection's API record, and merge any new contents into
1479          this instance before saving changes. See `Collection.update` for
1480          details.
1481
1482        * num_retries: int | None --- The number of times to retry reloading
1483          the collection's API record from the API server. If not specified,
1484          uses the `num_retries` provided when this instance was constructed.
1485
1486        * preserve_version: bool --- This value will be passed to directly
1487          to the underlying API call. If `True`, the Arvados API will
1488          preserve the versions of this collection both immediately before
1489          and after the update. If `True` when the API server is not
1490          configured with collection versioning, this method raises
1491          `arvados.errors.ArgumentError`.
1492        """
1493        if properties and type(properties) is not dict:
1494            raise errors.ArgumentError("properties must be dictionary type.")
1495
1496        if storage_classes and type(storage_classes) is not list:
1497            raise errors.ArgumentError("storage_classes must be list type.")
1498        if storage_classes:
1499            self._storage_classes_desired = storage_classes
1500
1501        if trash_at and type(trash_at) is not datetime.datetime:
1502            raise errors.ArgumentError("trash_at must be datetime type.")
1503
1504        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1505            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1506
1507        body={}
1508        if properties:
1509            body["properties"] = properties
1510        if self.storage_classes_desired():
1511            body["storage_classes_desired"] = self.storage_classes_desired()
1512        if trash_at:
1513            t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1514            body["trash_at"] = t
1515        if preserve_version:
1516            body["preserve_version"] = preserve_version
1517
1518        if not self.committed():
1519            if self._has_remote_blocks:
1520                # Copy any remote blocks to the local cluster.
1521                self._copy_remote_blocks(remote_blocks={})
1522                self._has_remote_blocks = False
1523            if not self._has_collection_uuid():
1524                raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1525            elif not self._has_local_collection_uuid():
1526                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1527
1528            self._my_block_manager().commit_all()
1529
1530            if merge:
1531                self.update()
1532
1533            text = self.manifest_text(strip=False)
1534            body['manifest_text'] = text
1535
1536            self._remember_api_response(self._my_api().collections().update(
1537                uuid=self._manifest_locator,
1538                body=body
1539                ).execute(num_retries=num_retries))
1540            self._manifest_text = self._api_response["manifest_text"]
1541            self._portable_data_hash = self._api_response["portable_data_hash"]
1542            self.set_committed(True)
1543        elif body:
1544            self._remember_api_response(self._my_api().collections().update(
1545                uuid=self._manifest_locator,
1546                body=body
1547                ).execute(num_retries=num_retries))
1548
1549        return self._manifest_text

Save collection to an existing API record

This method updates the instance’s corresponding API record to match the instance’s state. If this instance does not have a corresponding API record yet, raises AssertionError. (To create a new API record, use Collection.save_new.) This method returns the saved collection manifest.

Arguments:

  • properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.

  • storage_classes: list[str] | None — If provided, the API record will be updated with this value in the storage_classes_desired field. This value will also be saved on the instance and used for any changes that follow.

  • trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the trash_at field.

  • merge: bool — If True (the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. See Collection.update for details.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@must_be_writable
@synchronized
@retry_method
def save_new( self, name: Optional[str] = None, create_collection_record: bool = True, owner_uuid: Optional[str] = None, properties: Optional[Dict[str, Any]] = None, storage_classes: Optional[List[str]] = None, trash_at: Optional[datetime.datetime] = None, ensure_unique_name: bool = False, num_retries: Optional[int] = None, preserve_version: bool = False):
1552    @must_be_writable
1553    @synchronized
1554    @retry_method
1555    def save_new(
1556            self,
1557            name: Optional[str]=None,
1558            create_collection_record: bool=True,
1559            owner_uuid: Optional[str]=None,
1560            properties: Optional[Properties]=None,
1561            storage_classes: Optional[StorageClasses]=None,
1562            trash_at: Optional[datetime.datetime]=None,
1563            ensure_unique_name: bool=False,
1564            num_retries: Optional[int]=None,
1565            preserve_version: bool=False,
1566    ):
1567        """Save collection to a new API record
1568
1569        This method finishes uploading new data blocks and (optionally)
1570        creates a new API collection record with the provided data. If a new
1571        record is created, this instance becomes associated with that record
1572        for future updates like `save()`. This method returns the saved
1573        collection manifest.
1574
1575        Arguments:
1576
1577        * name: str | None --- The `name` field to use on the new collection
1578          record. If not specified, a generic default name is generated.
1579
1580        * create_collection_record: bool --- If `True` (the default), creates a
1581          collection record on the API server. If `False`, the method finishes
1582          all data uploads and only returns the resulting collection manifest
1583          without sending it to the API server.
1584
1585        * owner_uuid: str | None --- The `owner_uuid` field to use on the
1586          new collection record.
1587
1588        * properties: dict[str, Any] | None --- The `properties` field to use on
1589          the new collection record.
1590
1591        * storage_classes: list[str] | None --- The
1592          `storage_classes_desired` field to use on the new collection record.
1593
1594        * trash_at: datetime.datetime | None --- The `trash_at` field to use
1595          on the new collection record.
1596
1597        * ensure_unique_name: bool --- This value is passed directly to the
1598          Arvados API when creating the collection record. If `True`, the API
1599          server may modify the submitted `name` to ensure the collection's
1600          `name`+`owner_uuid` combination is unique. If `False` (the default),
1601          if a collection already exists with this same `name`+`owner_uuid`
1602          combination, creating a collection record will raise a validation
1603          error.
1604
1605        * num_retries: int | None --- The number of times to retry reloading
1606          the collection's API record from the API server. If not specified,
1607          uses the `num_retries` provided when this instance was constructed.
1608
1609        * preserve_version: bool --- This value will be passed to directly
1610          to the underlying API call. If `True`, the Arvados API will
1611          preserve the versions of this collection both immediately before
1612          and after the update. If `True` when the API server is not
1613          configured with collection versioning, this method raises
1614          `arvados.errors.ArgumentError`.
1615        """
1616        if properties and type(properties) is not dict:
1617            raise errors.ArgumentError("properties must be dictionary type.")
1618
1619        if storage_classes and type(storage_classes) is not list:
1620            raise errors.ArgumentError("storage_classes must be list type.")
1621
1622        if trash_at and type(trash_at) is not datetime.datetime:
1623            raise errors.ArgumentError("trash_at must be datetime type.")
1624
1625        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1626            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1627
1628        if self._has_remote_blocks:
1629            # Copy any remote blocks to the local cluster.
1630            self._copy_remote_blocks(remote_blocks={})
1631            self._has_remote_blocks = False
1632
1633        if storage_classes:
1634            self._storage_classes_desired = storage_classes
1635
1636        self._my_block_manager().commit_all()
1637        text = self.manifest_text(strip=False)
1638
1639        if create_collection_record:
1640            if name is None:
1641                name = "New collection"
1642                ensure_unique_name = True
1643
1644            body = {"manifest_text": text,
1645                    "name": name,
1646                    "replication_desired": self.replication_desired}
1647            if owner_uuid:
1648                body["owner_uuid"] = owner_uuid
1649            if properties:
1650                body["properties"] = properties
1651            if self.storage_classes_desired():
1652                body["storage_classes_desired"] = self.storage_classes_desired()
1653            if trash_at:
1654                t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1655                body["trash_at"] = t
1656            if preserve_version:
1657                body["preserve_version"] = preserve_version
1658
1659            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1660            text = self._api_response["manifest_text"]
1661
1662            self._manifest_locator = self._api_response["uuid"]
1663            self._portable_data_hash = self._api_response["portable_data_hash"]
1664
1665            self._manifest_text = text
1666            self.set_committed(True)
1667
1668        return text

Save collection to a new API record

This method finishes uploading new data blocks and (optionally) creates a new API collection record with the provided data. If a new record is created, this instance becomes associated with that record for future updates like save(). This method returns the saved collection manifest.

Arguments:

  • name: str | None — The name field to use on the new collection record. If not specified, a generic default name is generated.

  • create_collection_record: bool — If True (the default), creates a collection record on the API server. If False, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.

  • owner_uuid: str | None — The owner_uuid field to use on the new collection record.

  • properties: dict[str, Any] | None — The properties field to use on the new collection record.

  • storage_classes: list[str] | None — The storage_classes_desired field to use on the new collection record.

  • trash_at: datetime.datetime | None — The trash_at field to use on the new collection record.

  • ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If True, the API server may modify the submitted name to ensure the collection’s name+owner_uuid combination is unique. If False (the default), if a collection already exists with this same name+owner_uuid combination, creating a collection record will raise a validation error.

  • num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the num_retries provided when this instance was constructed.

  • preserve_version: bool — This value will be passed to directly to the underlying API call. If True, the Arvados API will preserve the versions of this collection both immediately before and after the update. If True when the API server is not configured with collection versioning, this method raises arvados.errors.ArgumentError.

@synchronized
def notify( self, event: Literal['add', 'del', 'mod', 'tok'], collection: RichCollectionBase, name: str, item: Union[arvados.arvfile.ArvadosFile, Collection]) -> None:
1751    @synchronized
1752    def notify(
1753            self,
1754            event: ChangeType,
1755            collection: 'RichCollectionBase',
1756            name: str,
1757            item: CollectionItem,
1758    ) -> None:
1759        if self._callback:
1760            self._callback(event, collection, name, item)

Notify any subscribed callback about a change to this collection

If a callback has been registered with RichCollectionBase.subscribe, it will be called with information about a change to this collection. Then this notification will be propagated to this collection’s root.

Arguments:

  • event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.

  • collection: RichCollectionBase — The collection that was modified.

  • name: str — The name of the file or stream within collection that was modified.

  • item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at name within collection.

class Subcollection(RichCollectionBase):
1763class Subcollection(RichCollectionBase):
1764    """Read and manipulate a stream/directory within an Arvados collection
1765
1766    This class represents a single stream (like a directory) within an Arvados
1767    `Collection`. It is returned by `Collection.find` and provides the same API.
1768    Operations that work on the API collection record propagate to the parent
1769    `Collection` object.
1770    """
1771
1772    def __init__(self, parent, name):
1773        super(Subcollection, self).__init__(parent)
1774        self.lock = self.root_collection().lock
1775        self._manifest_text = None
1776        self.name = name
1777        self.num_retries = parent.num_retries
1778
1779    def root_collection(self) -> 'Collection':
1780        return self.parent.root_collection()
1781
1782    def writable(self) -> bool:
1783        return self.root_collection().writable()
1784
1785    def _my_api(self):
1786        return self.root_collection()._my_api()
1787
1788    def _my_keep(self):
1789        return self.root_collection()._my_keep()
1790
1791    def _my_block_manager(self):
1792        return self.root_collection()._my_block_manager()
1793
1794    def stream_name(self) -> str:
1795        return os.path.join(self.parent.stream_name(), self.name)
1796
1797    @synchronized
1798    def clone(
1799            self,
1800            new_parent: Optional['Collection']=None,
1801            new_name: Optional[str]=None,
1802    ) -> 'Subcollection':
1803        c = Subcollection(new_parent, new_name)
1804        c._clonefrom(self)
1805        return c
1806
1807    @must_be_writable
1808    @synchronized
1809    def _reparent(self, newparent, newname):
1810        self.set_committed(False)
1811        self.flush()
1812        self.parent.remove(self.name, recursive=True)
1813        self.parent = newparent
1814        self.name = newname
1815        self.lock = self.parent.root_collection().lock
1816
1817    @synchronized
1818    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1819        """Encode empty directories by using an \056-named (".") empty file"""
1820        if len(self._items) == 0:
1821            return "%s %s 0:0:\\056\n" % (
1822                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1823        return super(Subcollection, self)._get_manifest_text(stream_name,
1824                                                             strip, normalize,
1825                                                             only_committed)

Read and manipulate a stream/directory within an Arvados collection

This class represents a single stream (like a directory) within an Arvados Collection. It is returned by Collection.find and provides the same API. Operations that work on the API collection record propagate to the parent Collection object.

Subcollection(parent, name)
1772    def __init__(self, parent, name):
1773        super(Subcollection, self).__init__(parent)
1774        self.lock = self.root_collection().lock
1775        self._manifest_text = None
1776        self.name = name
1777        self.num_retries = parent.num_retries
lock
name
num_retries
def root_collection(self) -> Collection:
1779    def root_collection(self) -> 'Collection':
1780        return self.parent.root_collection()

Get this collection’s root collection object

If you open a subcollection with Collection.find, calling this method on that subcollection returns the source Collection object.

def writable(self) -> bool:
1782    def writable(self) -> bool:
1783        return self.root_collection().writable()

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

def stream_name(self) -> str:
1794    def stream_name(self) -> str:
1795        return os.path.join(self.parent.stream_name(), self.name)

Get the name of the manifest stream represented by this collection

If you open a subcollection with Collection.find, calling this method on that subcollection returns the name of the stream you opened.

@synchronized
def clone( self, new_parent: Optional[Collection] = None, new_name: Optional[str] = None) -> Subcollection:
1797    @synchronized
1798    def clone(
1799            self,
1800            new_parent: Optional['Collection']=None,
1801            new_name: Optional[str]=None,
1802    ) -> 'Subcollection':
1803        c = Subcollection(new_parent, new_name)
1804        c._clonefrom(self)
1805        return c
class CollectionReader(Collection):
1828class CollectionReader(Collection):
1829    """Read-only `Collection` subclass
1830
1831    This class will never create or update any API collection records. You can
1832    use this class for additional code safety when you only need to read
1833    existing collections.
1834    """
1835    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1836        self._in_init = True
1837        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1838        self._in_init = False
1839
1840        # Forego any locking since it should never change once initialized.
1841        self.lock = NoopLock()
1842
1843        # Backwards compatability with old CollectionReader
1844        # all_streams() and all_files()
1845        self._streams = None
1846
1847    def writable(self) -> bool:
1848        return self._in_init
1849
1850    def _populate_streams(orig_func):
1851        @functools.wraps(orig_func)
1852        def populate_streams_wrapper(self, *args, **kwargs):
1853            # Defer populating self._streams until needed since it creates a copy of the manifest.
1854            if self._streams is None:
1855                if self._manifest_text:
1856                    self._streams = [sline.split()
1857                                     for sline in self._manifest_text.split("\n")
1858                                     if sline]
1859                else:
1860                    self._streams = []
1861            return orig_func(self, *args, **kwargs)
1862        return populate_streams_wrapper
1863
1864    @arvados.util._deprecated('3.0', 'Collection iteration')
1865    @_populate_streams
1866    def normalize(self):
1867        """Normalize the streams returned by `all_streams`"""
1868        streams = {}
1869        for s in self.all_streams():
1870            for f in s.all_files():
1871                streamname, filename = split(s.name() + "/" + f.name())
1872                if streamname not in streams:
1873                    streams[streamname] = {}
1874                if filename not in streams[streamname]:
1875                    streams[streamname][filename] = []
1876                for r in f.segments:
1877                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1878
1879        self._streams = [normalize_stream(s, streams[s])
1880                         for s in sorted(streams)]
1881
1882    @arvados.util._deprecated('3.0', 'Collection iteration')
1883    @_populate_streams
1884    def all_streams(self):
1885        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1886                for s in self._streams]
1887
1888    @arvados.util._deprecated('3.0', 'Collection iteration')
1889    @_populate_streams
1890    def all_files(self):
1891        for s in self.all_streams():
1892            for f in s.all_files():
1893                yield f

Read-only Collection subclass

This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.

CollectionReader(manifest_locator_or_text, *args, **kwargs)
1835    def __init__(self, manifest_locator_or_text, *args, **kwargs):
1836        self._in_init = True
1837        super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1838        self._in_init = False
1839
1840        # Forego any locking since it should never change once initialized.
1841        self.lock = NoopLock()
1842
1843        # Backwards compatability with old CollectionReader
1844        # all_streams() and all_files()
1845        self._streams = None

Initialize a Collection object

Arguments:

  • manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value None instantiates a new collection with an empty manifest.

  • api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from apiconfig (see below). If your client instantiates many Collection objects, you can help limit memory utilization by calling arvados.api.api to construct an arvados.safeapi.ThreadSafeApiCache, and use that as the api_client for every Collection.

  • keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its api_client.

  • num_retries: int — The number of times that client requests are retried. Default 10.

  • parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • apiconfig: Mapping[str, str] | None — A mapping with entries for ARVADOS_API_HOST, ARVADOS_API_TOKEN, and optionally ARVADOS_API_HOST_INSECURE. When no api_client is provided, the Collection object constructs one from these settings. If no mapping is provided, calls arvados.config.settings to get these parameters from user configuration.

  • block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.

  • replication_desired: int | None — This controls both the value of the replication_desired field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.

  • storage_classes_desired: list[str] | None — This controls both the value of the storage_classes_desired field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.

  • put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new block_manager. It is unused when a block_manager is provided.

lock
def writable(self) -> bool:
1847    def writable(self) -> bool:
1848        return self._in_init

Indicate whether this collection object can be modified

This method returns False if this object is a CollectionReader, else True.

@arvados.util._deprecated('3.0', 'Collection iteration')
def normalize(self):
1864    @arvados.util._deprecated('3.0', 'Collection iteration')
1865    @_populate_streams
1866    def normalize(self):
1867        """Normalize the streams returned by `all_streams`"""
1868        streams = {}
1869        for s in self.all_streams():
1870            for f in s.all_files():
1871                streamname, filename = split(s.name() + "/" + f.name())
1872                if streamname not in streams:
1873                    streams[streamname] = {}
1874                if filename not in streams[streamname]:
1875                    streams[streamname][filename] = []
1876                for r in f.segments:
1877                    streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1878
1879        self._streams = [normalize_stream(s, streams[s])
1880                         for s in sorted(streams)]

Normalize the streams returned by all_streams

@arvados.util._deprecated('3.0', 'Collection iteration')
def all_streams(self):
1882    @arvados.util._deprecated('3.0', 'Collection iteration')
1883    @_populate_streams
1884    def all_streams(self):
1885        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1886                for s in self._streams]
@arvados.util._deprecated('3.0', 'Collection iteration')
def all_files(self):
1888    @arvados.util._deprecated('3.0', 'Collection iteration')
1889    @_populate_streams
1890    def all_files(self):
1891        for s in self.all_streams():
1892            for f in s.all_files():
1893                yield f
class CollectionWriter(CollectionBase):
1896class CollectionWriter(CollectionBase):
1897    """Create a new collection from scratch
1898
1899    .. WARNING:: Deprecated
1900       This class is deprecated. Prefer `arvados.collection.Collection`
1901       instead.
1902    """
1903
1904    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1905    def __init__(self, api_client=None, num_retries=0, replication=None):
1906        """Instantiate a CollectionWriter.
1907
1908        CollectionWriter lets you build a new Arvados Collection from scratch.
1909        Write files to it.  The CollectionWriter will upload data to Keep as
1910        appropriate, and provide you with the Collection manifest text when
1911        you're finished.
1912
1913        Arguments:
1914        * api_client: The API client to use to look up Collections.  If not
1915          provided, CollectionReader will build one from available Arvados
1916          configuration.
1917        * num_retries: The default number of times to retry failed
1918          service requests.  Default 0.  You may change this value
1919          after instantiation, but note those changes may not
1920          propagate to related objects like the Keep client.
1921        * replication: The number of copies of each block to store.
1922          If this argument is None or not supplied, replication is
1923          the server-provided default if available, otherwise 2.
1924        """
1925        self._api_client = api_client
1926        self.num_retries = num_retries
1927        self.replication = (2 if replication is None else replication)
1928        self._keep_client = None
1929        self._data_buffer = []
1930        self._data_buffer_len = 0
1931        self._current_stream_files = []
1932        self._current_stream_length = 0
1933        self._current_stream_locators = []
1934        self._current_stream_name = '.'
1935        self._current_file_name = None
1936        self._current_file_pos = 0
1937        self._finished_streams = []
1938        self._close_file = None
1939        self._queued_file = None
1940        self._queued_dirents = deque()
1941        self._queued_trees = deque()
1942        self._last_open = None
1943
1944    def __exit__(self, exc_type, exc_value, traceback):
1945        if exc_type is None:
1946            self.finish()
1947
1948    def do_queued_work(self):
1949        # The work queue consists of three pieces:
1950        # * _queued_file: The file object we're currently writing to the
1951        #   Collection.
1952        # * _queued_dirents: Entries under the current directory
1953        #   (_queued_trees[0]) that we want to write or recurse through.
1954        #   This may contain files from subdirectories if
1955        #   max_manifest_depth == 0 for this directory.
1956        # * _queued_trees: Directories that should be written as separate
1957        #   streams to the Collection.
1958        # This function handles the smallest piece of work currently queued
1959        # (current file, then current directory, then next directory) until
1960        # no work remains.  The _work_THING methods each do a unit of work on
1961        # THING.  _queue_THING methods add a THING to the work queue.
1962        while True:
1963            if self._queued_file:
1964                self._work_file()
1965            elif self._queued_dirents:
1966                self._work_dirents()
1967            elif self._queued_trees:
1968                self._work_trees()
1969            else:
1970                break
1971
1972    def _work_file(self):
1973        while True:
1974            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1975            if not buf:
1976                break
1977            self.write(buf)
1978        self.finish_current_file()
1979        if self._close_file:
1980            self._queued_file.close()
1981        self._close_file = None
1982        self._queued_file = None
1983
1984    def _work_dirents(self):
1985        path, stream_name, max_manifest_depth = self._queued_trees[0]
1986        if stream_name != self.current_stream_name():
1987            self.start_new_stream(stream_name)
1988        while self._queued_dirents:
1989            dirent = self._queued_dirents.popleft()
1990            target = os.path.join(path, dirent)
1991            if os.path.isdir(target):
1992                self._queue_tree(target,
1993                                 os.path.join(stream_name, dirent),
1994                                 max_manifest_depth - 1)
1995            else:
1996                self._queue_file(target, dirent)
1997                break
1998        if not self._queued_dirents:
1999            self._queued_trees.popleft()
2000
2001    def _work_trees(self):
2002        path, stream_name, max_manifest_depth = self._queued_trees[0]
2003        d = arvados.util.listdir_recursive(
2004            path, max_depth = (None if max_manifest_depth == 0 else 0))
2005        if d:
2006            self._queue_dirents(stream_name, d)
2007        else:
2008            self._queued_trees.popleft()
2009
2010    def _queue_file(self, source, filename=None):
2011        assert (self._queued_file is None), "tried to queue more than one file"
2012        if not hasattr(source, 'read'):
2013            source = open(source, 'rb')
2014            self._close_file = True
2015        else:
2016            self._close_file = False
2017        if filename is None:
2018            filename = os.path.basename(source.name)
2019        self.start_new_file(filename)
2020        self._queued_file = source
2021
2022    def _queue_dirents(self, stream_name, dirents):
2023        assert (not self._queued_dirents), "tried to queue more than one tree"
2024        self._queued_dirents = deque(sorted(dirents))
2025
2026    def _queue_tree(self, path, stream_name, max_manifest_depth):
2027        self._queued_trees.append((path, stream_name, max_manifest_depth))
2028
2029    def write_file(self, source, filename=None):
2030        self._queue_file(source, filename)
2031        self.do_queued_work()
2032
2033    def write_directory_tree(self,
2034                             path, stream_name='.', max_manifest_depth=-1):
2035        self._queue_tree(path, stream_name, max_manifest_depth)
2036        self.do_queued_work()
2037
2038    def write(self, newdata):
2039        if isinstance(newdata, bytes):
2040            pass
2041        elif isinstance(newdata, str):
2042            newdata = newdata.encode()
2043        elif hasattr(newdata, '__iter__'):
2044            for s in newdata:
2045                self.write(s)
2046            return
2047        self._data_buffer.append(newdata)
2048        self._data_buffer_len += len(newdata)
2049        self._current_stream_length += len(newdata)
2050        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2051            self.flush_data()
2052
2053    def open(self, streampath, filename=None):
2054        """open(streampath[, filename]) -> file-like object
2055
2056        Pass in the path of a file to write to the Collection, either as a
2057        single string or as two separate stream name and file name arguments.
2058        This method returns a file-like object you can write to add it to the
2059        Collection.
2060
2061        You may only have one file object from the Collection open at a time,
2062        so be sure to close the object when you're done.  Using the object in
2063        a with statement makes that easy:
2064
2065            with cwriter.open('./doc/page1.txt') as outfile:
2066                outfile.write(page1_data)
2067            with cwriter.open('./doc/page2.txt') as outfile:
2068                outfile.write(page2_data)
2069        """
2070        if filename is None:
2071            streampath, filename = split(streampath)
2072        if self._last_open and not self._last_open.closed:
2073            raise errors.AssertionError(
2074                u"can't open '{}' when '{}' is still open".format(
2075                    filename, self._last_open.name))
2076        if streampath != self.current_stream_name():
2077            self.start_new_stream(streampath)
2078        self.set_current_file_name(filename)
2079        self._last_open = _WriterFile(self, filename)
2080        return self._last_open
2081
2082    def flush_data(self):
2083        data_buffer = b''.join(self._data_buffer)
2084        if data_buffer:
2085            self._current_stream_locators.append(
2086                self._my_keep().put(
2087                    data_buffer[0:config.KEEP_BLOCK_SIZE],
2088                    copies=self.replication))
2089            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2090            self._data_buffer_len = len(self._data_buffer[0])
2091
2092    def start_new_file(self, newfilename=None):
2093        self.finish_current_file()
2094        self.set_current_file_name(newfilename)
2095
2096    def set_current_file_name(self, newfilename):
2097        if re.search(r'[\t\n]', newfilename):
2098            raise errors.AssertionError(
2099                "Manifest filenames cannot contain whitespace: %s" %
2100                newfilename)
2101        elif re.search(r'\x00', newfilename):
2102            raise errors.AssertionError(
2103                "Manifest filenames cannot contain NUL characters: %s" %
2104                newfilename)
2105        self._current_file_name = newfilename
2106
2107    def current_file_name(self):
2108        return self._current_file_name
2109
2110    def finish_current_file(self):
2111        if self._current_file_name is None:
2112            if self._current_file_pos == self._current_stream_length:
2113                return
2114            raise errors.AssertionError(
2115                "Cannot finish an unnamed file " +
2116                "(%d bytes at offset %d in '%s' stream)" %
2117                (self._current_stream_length - self._current_file_pos,
2118                 self._current_file_pos,
2119                 self._current_stream_name))
2120        self._current_stream_files.append([
2121                self._current_file_pos,
2122                self._current_stream_length - self._current_file_pos,
2123                self._current_file_name])
2124        self._current_file_pos = self._current_stream_length
2125        self._current_file_name = None
2126
2127    def start_new_stream(self, newstreamname='.'):
2128        self.finish_current_stream()
2129        self.set_current_stream_name(newstreamname)
2130
2131    def set_current_stream_name(self, newstreamname):
2132        if re.search(r'[\t\n]', newstreamname):
2133            raise errors.AssertionError(
2134                "Manifest stream names cannot contain whitespace: '%s'" %
2135                (newstreamname))
2136        self._current_stream_name = '.' if newstreamname=='' else newstreamname
2137
2138    def current_stream_name(self):
2139        return self._current_stream_name
2140
2141    def finish_current_stream(self):
2142        self.finish_current_file()
2143        self.flush_data()
2144        if not self._current_stream_files:
2145            pass
2146        elif self._current_stream_name is None:
2147            raise errors.AssertionError(
2148                "Cannot finish an unnamed stream (%d bytes in %d files)" %
2149                (self._current_stream_length, len(self._current_stream_files)))
2150        else:
2151            if not self._current_stream_locators:
2152                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2153            self._finished_streams.append([self._current_stream_name,
2154                                           self._current_stream_locators,
2155                                           self._current_stream_files])
2156        self._current_stream_files = []
2157        self._current_stream_length = 0
2158        self._current_stream_locators = []
2159        self._current_stream_name = None
2160        self._current_file_pos = 0
2161        self._current_file_name = None
2162
2163    def finish(self):
2164        """Store the manifest in Keep and return its locator.
2165
2166        This is useful for storing manifest fragments (task outputs)
2167        temporarily in Keep during a Crunch job.
2168
2169        In other cases you should make a collection instead, by
2170        sending manifest_text() to the API server's "create
2171        collection" endpoint.
2172        """
2173        return self._my_keep().put(self.manifest_text().encode(),
2174                                   copies=self.replication)
2175
2176    def portable_data_hash(self):
2177        stripped = self.stripped_manifest().encode()
2178        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2179
2180    def manifest_text(self):
2181        self.finish_current_stream()
2182        manifest = ''
2183
2184        for stream in self._finished_streams:
2185            if not re.search(r'^\.(/.*)?$', stream[0]):
2186                manifest += './'
2187            manifest += stream[0].replace(' ', '\\040')
2188            manifest += ' ' + ' '.join(stream[1])
2189            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2190            manifest += "\n"
2191
2192        return manifest
2193
2194    def data_locators(self):
2195        ret = []
2196        for name, locators, files in self._finished_streams:
2197            ret += locators
2198        return ret
2199
2200    def save_new(self, name=None):
2201        return self._api_client.collections().create(
2202            ensure_unique_name=True,
2203            body={
2204                'name': name,
2205                'manifest_text': self.manifest_text(),
2206            }).execute(num_retries=self.num_retries)

Create a new collection from scratch

@arvados.util._deprecated('3.0', 'arvados.collection.Collection')
CollectionWriter(api_client=None, num_retries=0, replication=None)
1904    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1905    def __init__(self, api_client=None, num_retries=0, replication=None):
1906        """Instantiate a CollectionWriter.
1907
1908        CollectionWriter lets you build a new Arvados Collection from scratch.
1909        Write files to it.  The CollectionWriter will upload data to Keep as
1910        appropriate, and provide you with the Collection manifest text when
1911        you're finished.
1912
1913        Arguments:
1914        * api_client: The API client to use to look up Collections.  If not
1915          provided, CollectionReader will build one from available Arvados
1916          configuration.
1917        * num_retries: The default number of times to retry failed
1918          service requests.  Default 0.  You may change this value
1919          after instantiation, but note those changes may not
1920          propagate to related objects like the Keep client.
1921        * replication: The number of copies of each block to store.
1922          If this argument is None or not supplied, replication is
1923          the server-provided default if available, otherwise 2.
1924        """
1925        self._api_client = api_client
1926        self.num_retries = num_retries
1927        self.replication = (2 if replication is None else replication)
1928        self._keep_client = None
1929        self._data_buffer = []
1930        self._data_buffer_len = 0
1931        self._current_stream_files = []
1932        self._current_stream_length = 0
1933        self._current_stream_locators = []
1934        self._current_stream_name = '.'
1935        self._current_file_name = None
1936        self._current_file_pos = 0
1937        self._finished_streams = []
1938        self._close_file = None
1939        self._queued_file = None
1940        self._queued_dirents = deque()
1941        self._queued_trees = deque()
1942        self._last_open = None

Instantiate a CollectionWriter.

CollectionWriter lets you build a new Arvados Collection from scratch. Write files to it. The CollectionWriter will upload data to Keep as appropriate, and provide you with the Collection manifest text when you’re finished.

Arguments:

  • api_client: The API client to use to look up Collections. If not provided, CollectionReader will build one from available Arvados configuration.
  • num_retries: The default number of times to retry failed service requests. Default 0. You may change this value after instantiation, but note those changes may not propagate to related objects like the Keep client.
  • replication: The number of copies of each block to store. If this argument is None or not supplied, replication is the server-provided default if available, otherwise 2.
num_retries
replication
def do_queued_work(self):
1948    def do_queued_work(self):
1949        # The work queue consists of three pieces:
1950        # * _queued_file: The file object we're currently writing to the
1951        #   Collection.
1952        # * _queued_dirents: Entries under the current directory
1953        #   (_queued_trees[0]) that we want to write or recurse through.
1954        #   This may contain files from subdirectories if
1955        #   max_manifest_depth == 0 for this directory.
1956        # * _queued_trees: Directories that should be written as separate
1957        #   streams to the Collection.
1958        # This function handles the smallest piece of work currently queued
1959        # (current file, then current directory, then next directory) until
1960        # no work remains.  The _work_THING methods each do a unit of work on
1961        # THING.  _queue_THING methods add a THING to the work queue.
1962        while True:
1963            if self._queued_file:
1964                self._work_file()
1965            elif self._queued_dirents:
1966                self._work_dirents()
1967            elif self._queued_trees:
1968                self._work_trees()
1969            else:
1970                break
def write_file(self, source, filename=None):
2029    def write_file(self, source, filename=None):
2030        self._queue_file(source, filename)
2031        self.do_queued_work()
def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
2033    def write_directory_tree(self,
2034                             path, stream_name='.', max_manifest_depth=-1):
2035        self._queue_tree(path, stream_name, max_manifest_depth)
2036        self.do_queued_work()
def write(self, newdata):
2038    def write(self, newdata):
2039        if isinstance(newdata, bytes):
2040            pass
2041        elif isinstance(newdata, str):
2042            newdata = newdata.encode()
2043        elif hasattr(newdata, '__iter__'):
2044            for s in newdata:
2045                self.write(s)
2046            return
2047        self._data_buffer.append(newdata)
2048        self._data_buffer_len += len(newdata)
2049        self._current_stream_length += len(newdata)
2050        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2051            self.flush_data()
def open(self, streampath, filename=None):
2053    def open(self, streampath, filename=None):
2054        """open(streampath[, filename]) -> file-like object
2055
2056        Pass in the path of a file to write to the Collection, either as a
2057        single string or as two separate stream name and file name arguments.
2058        This method returns a file-like object you can write to add it to the
2059        Collection.
2060
2061        You may only have one file object from the Collection open at a time,
2062        so be sure to close the object when you're done.  Using the object in
2063        a with statement makes that easy:
2064
2065            with cwriter.open('./doc/page1.txt') as outfile:
2066                outfile.write(page1_data)
2067            with cwriter.open('./doc/page2.txt') as outfile:
2068                outfile.write(page2_data)
2069        """
2070        if filename is None:
2071            streampath, filename = split(streampath)
2072        if self._last_open and not self._last_open.closed:
2073            raise errors.AssertionError(
2074                u"can't open '{}' when '{}' is still open".format(
2075                    filename, self._last_open.name))
2076        if streampath != self.current_stream_name():
2077            self.start_new_stream(streampath)
2078        self.set_current_file_name(filename)
2079        self._last_open = _WriterFile(self, filename)
2080        return self._last_open

open(streampath[, filename]) -> file-like object

Pass in the path of a file to write to the Collection, either as a single string or as two separate stream name and file name arguments. This method returns a file-like object you can write to add it to the Collection.

You may only have one file object from the Collection open at a time, so be sure to close the object when you’re done. Using the object in a with statement makes that easy:

with cwriter.open('./doc/page1.txt') as outfile:
    outfile.write(page1_data)
with cwriter.open('./doc/page2.txt') as outfile:
    outfile.write(page2_data)
def flush_data(self):
2082    def flush_data(self):
2083        data_buffer = b''.join(self._data_buffer)
2084        if data_buffer:
2085            self._current_stream_locators.append(
2086                self._my_keep().put(
2087                    data_buffer[0:config.KEEP_BLOCK_SIZE],
2088                    copies=self.replication))
2089            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2090            self._data_buffer_len = len(self._data_buffer[0])
def start_new_file(self, newfilename=None):
2092    def start_new_file(self, newfilename=None):
2093        self.finish_current_file()
2094        self.set_current_file_name(newfilename)
def set_current_file_name(self, newfilename):
2096    def set_current_file_name(self, newfilename):
2097        if re.search(r'[\t\n]', newfilename):
2098            raise errors.AssertionError(
2099                "Manifest filenames cannot contain whitespace: %s" %
2100                newfilename)
2101        elif re.search(r'\x00', newfilename):
2102            raise errors.AssertionError(
2103                "Manifest filenames cannot contain NUL characters: %s" %
2104                newfilename)
2105        self._current_file_name = newfilename
def current_file_name(self):
2107    def current_file_name(self):
2108        return self._current_file_name
def finish_current_file(self):
2110    def finish_current_file(self):
2111        if self._current_file_name is None:
2112            if self._current_file_pos == self._current_stream_length:
2113                return
2114            raise errors.AssertionError(
2115                "Cannot finish an unnamed file " +
2116                "(%d bytes at offset %d in '%s' stream)" %
2117                (self._current_stream_length - self._current_file_pos,
2118                 self._current_file_pos,
2119                 self._current_stream_name))
2120        self._current_stream_files.append([
2121                self._current_file_pos,
2122                self._current_stream_length - self._current_file_pos,
2123                self._current_file_name])
2124        self._current_file_pos = self._current_stream_length
2125        self._current_file_name = None
def start_new_stream(self, newstreamname='.'):
2127    def start_new_stream(self, newstreamname='.'):
2128        self.finish_current_stream()
2129        self.set_current_stream_name(newstreamname)
def set_current_stream_name(self, newstreamname):
2131    def set_current_stream_name(self, newstreamname):
2132        if re.search(r'[\t\n]', newstreamname):
2133            raise errors.AssertionError(
2134                "Manifest stream names cannot contain whitespace: '%s'" %
2135                (newstreamname))
2136        self._current_stream_name = '.' if newstreamname=='' else newstreamname
def current_stream_name(self):
2138    def current_stream_name(self):
2139        return self._current_stream_name
def finish_current_stream(self):
2141    def finish_current_stream(self):
2142        self.finish_current_file()
2143        self.flush_data()
2144        if not self._current_stream_files:
2145            pass
2146        elif self._current_stream_name is None:
2147            raise errors.AssertionError(
2148                "Cannot finish an unnamed stream (%d bytes in %d files)" %
2149                (self._current_stream_length, len(self._current_stream_files)))
2150        else:
2151            if not self._current_stream_locators:
2152                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2153            self._finished_streams.append([self._current_stream_name,
2154                                           self._current_stream_locators,
2155                                           self._current_stream_files])
2156        self._current_stream_files = []
2157        self._current_stream_length = 0
2158        self._current_stream_locators = []
2159        self._current_stream_name = None
2160        self._current_file_pos = 0
2161        self._current_file_name = None
def finish(self):
2163    def finish(self):
2164        """Store the manifest in Keep and return its locator.
2165
2166        This is useful for storing manifest fragments (task outputs)
2167        temporarily in Keep during a Crunch job.
2168
2169        In other cases you should make a collection instead, by
2170        sending manifest_text() to the API server's "create
2171        collection" endpoint.
2172        """
2173        return self._my_keep().put(self.manifest_text().encode(),
2174                                   copies=self.replication)

Store the manifest in Keep and return its locator.

This is useful for storing manifest fragments (task outputs) temporarily in Keep during a Crunch job.

In other cases you should make a collection instead, by sending manifest_text() to the API server’s “create collection” endpoint.

def portable_data_hash(self):
2176    def portable_data_hash(self):
2177        stripped = self.stripped_manifest().encode()
2178        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
def manifest_text(self):
2180    def manifest_text(self):
2181        self.finish_current_stream()
2182        manifest = ''
2183
2184        for stream in self._finished_streams:
2185            if not re.search(r'^\.(/.*)?$', stream[0]):
2186                manifest += './'
2187            manifest += stream[0].replace(' ', '\\040')
2188            manifest += ' ' + ' '.join(stream[1])
2189            manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2190            manifest += "\n"
2191
2192        return manifest
def data_locators(self):
2194    def data_locators(self):
2195        ret = []
2196        for name, locators, files in self._finished_streams:
2197            ret += locators
2198        return ret
def save_new(self, name=None):
2200    def save_new(self, name=None):
2201        return self._api_client.collections().create(
2202            ensure_unique_name=True,
2203            body={
2204                'name': name,
2205                'manifest_text': self.manifest_text(),
2206            }).execute(num_retries=self.num_retries)
Inherited Members
CollectionBase
stripped_manifest
class ResumableCollectionWriter(CollectionWriter):
2209class ResumableCollectionWriter(CollectionWriter):
2210    """CollectionWriter that can serialize internal state to disk
2211
2212    .. WARNING:: Deprecated
2213       This class is deprecated. Prefer `arvados.collection.Collection`
2214       instead.
2215    """
2216
2217    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2218                   '_current_stream_locators', '_current_stream_name',
2219                   '_current_file_name', '_current_file_pos', '_close_file',
2220                   '_data_buffer', '_dependencies', '_finished_streams',
2221                   '_queued_dirents', '_queued_trees']
2222
2223    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2224    def __init__(self, api_client=None, **kwargs):
2225        self._dependencies = {}
2226        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2227
2228    @classmethod
2229    def from_state(cls, state, *init_args, **init_kwargs):
2230        # Try to build a new writer from scratch with the given state.
2231        # If the state is not suitable to resume (because files have changed,
2232        # been deleted, aren't predictable, etc.), raise a
2233        # StaleWriterStateError.  Otherwise, return the initialized writer.
2234        # The caller is responsible for calling writer.do_queued_work()
2235        # appropriately after it's returned.
2236        writer = cls(*init_args, **init_kwargs)
2237        for attr_name in cls.STATE_PROPS:
2238            attr_value = state[attr_name]
2239            attr_class = getattr(writer, attr_name).__class__
2240            # Coerce the value into the same type as the initial value, if
2241            # needed.
2242            if attr_class not in (type(None), attr_value.__class__):
2243                attr_value = attr_class(attr_value)
2244            setattr(writer, attr_name, attr_value)
2245        # Check dependencies before we try to resume anything.
2246        if any(KeepLocator(ls).permission_expired()
2247               for ls in writer._current_stream_locators):
2248            raise errors.StaleWriterStateError(
2249                "locators include expired permission hint")
2250        writer.check_dependencies()
2251        if state['_current_file'] is not None:
2252            path, pos = state['_current_file']
2253            try:
2254                writer._queued_file = open(path, 'rb')
2255                writer._queued_file.seek(pos)
2256            except IOError as error:
2257                raise errors.StaleWriterStateError(
2258                    u"failed to reopen active file {}: {}".format(path, error))
2259        return writer
2260
2261    def check_dependencies(self):
2262        for path, orig_stat in listitems(self._dependencies):
2263            if not S_ISREG(orig_stat[ST_MODE]):
2264                raise errors.StaleWriterStateError(u"{} not file".format(path))
2265            try:
2266                now_stat = tuple(os.stat(path))
2267            except OSError as error:
2268                raise errors.StaleWriterStateError(
2269                    u"failed to stat {}: {}".format(path, error))
2270            if ((not S_ISREG(now_stat[ST_MODE])) or
2271                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2272                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2273                raise errors.StaleWriterStateError(u"{} changed".format(path))
2274
2275    def dump_state(self, copy_func=lambda x: x):
2276        state = {attr: copy_func(getattr(self, attr))
2277                 for attr in self.STATE_PROPS}
2278        if self._queued_file is None:
2279            state['_current_file'] = None
2280        else:
2281            state['_current_file'] = (os.path.realpath(self._queued_file.name),
2282                                      self._queued_file.tell())
2283        return state
2284
2285    def _queue_file(self, source, filename=None):
2286        try:
2287            src_path = os.path.realpath(source)
2288        except Exception:
2289            raise errors.AssertionError(u"{} not a file path".format(source))
2290        try:
2291            path_stat = os.stat(src_path)
2292        except OSError as stat_error:
2293            path_stat = None
2294        super(ResumableCollectionWriter, self)._queue_file(source, filename)
2295        fd_stat = os.fstat(self._queued_file.fileno())
2296        if not S_ISREG(fd_stat.st_mode):
2297            # We won't be able to resume from this cache anyway, so don't
2298            # worry about further checks.
2299            self._dependencies[source] = tuple(fd_stat)
2300        elif path_stat is None:
2301            raise errors.AssertionError(
2302                u"could not stat {}: {}".format(source, stat_error))
2303        elif path_stat.st_ino != fd_stat.st_ino:
2304            raise errors.AssertionError(
2305                u"{} changed between open and stat calls".format(source))
2306        else:
2307            self._dependencies[src_path] = tuple(fd_stat)
2308
2309    def write(self, data):
2310        if self._queued_file is None:
2311            raise errors.AssertionError(
2312                "resumable writer can't accept unsourced data")
2313        return super(ResumableCollectionWriter, self).write(data)

CollectionWriter that can serialize internal state to disk

@arvados.util._deprecated('3.0', 'arvados.collection.Collection')
ResumableCollectionWriter(api_client=None, **kwargs)
2223    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2224    def __init__(self, api_client=None, **kwargs):
2225        self._dependencies = {}
2226        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
STATE_PROPS = ['_current_stream_files', '_current_stream_length', '_current_stream_locators', '_current_stream_name', '_current_file_name', '_current_file_pos', '_close_file', '_data_buffer', '_dependencies', '_finished_streams', '_queued_dirents', '_queued_trees']
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):
2228    @classmethod
2229    def from_state(cls, state, *init_args, **init_kwargs):
2230        # Try to build a new writer from scratch with the given state.
2231        # If the state is not suitable to resume (because files have changed,
2232        # been deleted, aren't predictable, etc.), raise a
2233        # StaleWriterStateError.  Otherwise, return the initialized writer.
2234        # The caller is responsible for calling writer.do_queued_work()
2235        # appropriately after it's returned.
2236        writer = cls(*init_args, **init_kwargs)
2237        for attr_name in cls.STATE_PROPS:
2238            attr_value = state[attr_name]
2239            attr_class = getattr(writer, attr_name).__class__
2240            # Coerce the value into the same type as the initial value, if
2241            # needed.
2242            if attr_class not in (type(None), attr_value.__class__):
2243                attr_value = attr_class(attr_value)
2244            setattr(writer, attr_name, attr_value)
2245        # Check dependencies before we try to resume anything.
2246        if any(KeepLocator(ls).permission_expired()
2247               for ls in writer._current_stream_locators):
2248            raise errors.StaleWriterStateError(
2249                "locators include expired permission hint")
2250        writer.check_dependencies()
2251        if state['_current_file'] is not None:
2252            path, pos = state['_current_file']
2253            try:
2254                writer._queued_file = open(path, 'rb')
2255                writer._queued_file.seek(pos)
2256            except IOError as error:
2257                raise errors.StaleWriterStateError(
2258                    u"failed to reopen active file {}: {}".format(path, error))
2259        return writer
def check_dependencies(self):
2261    def check_dependencies(self):
2262        for path, orig_stat in listitems(self._dependencies):
2263            if not S_ISREG(orig_stat[ST_MODE]):
2264                raise errors.StaleWriterStateError(u"{} not file".format(path))
2265            try:
2266                now_stat = tuple(os.stat(path))
2267            except OSError as error:
2268                raise errors.StaleWriterStateError(
2269                    u"failed to stat {}: {}".format(path, error))
2270            if ((not S_ISREG(now_stat[ST_MODE])) or
2271                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2272                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2273                raise errors.StaleWriterStateError(u"{} changed".format(path))
def dump_state(self, copy_func=<function ResumableCollectionWriter.<lambda>>):
2275    def dump_state(self, copy_func=lambda x: x):
2276        state = {attr: copy_func(getattr(self, attr))
2277                 for attr in self.STATE_PROPS}
2278        if self._queued_file is None:
2279            state['_current_file'] = None
2280        else:
2281            state['_current_file'] = (os.path.realpath(self._queued_file.name),
2282                                      self._queued_file.tell())
2283        return state
def write(self, data):
2309    def write(self, data):
2310        if self._queued_file is None:
2311            raise errors.AssertionError(
2312                "resumable writer can't accept unsourced data")
2313        return super(ResumableCollectionWriter, self).write(data)