arvados.collection
Tools to work with Arvados collections
This module provides high-level interfaces to create, read, and update
Arvados collections. Most users will want to instantiate Collection
objects, and use methods like Collection.open
and Collection.mkdirs
to
read and write data in the collection. Refer to the Arvados Python SDK
cookbook for an introduction to using the Collection class.
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Tools to work with Arvados collections 5 6This module provides high-level interfaces to create, read, and update 7Arvados collections. Most users will want to instantiate `Collection` 8objects, and use methods like `Collection.open` and `Collection.mkdirs` to 9read and write data in the collection. Refer to the Arvados Python SDK 10cookbook for [an introduction to using the Collection class][cookbook]. 11 12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 13""" 14 15import ciso8601 16import datetime 17import errno 18import functools 19import hashlib 20import io 21import logging 22import os 23import re 24import sys 25import threading 26import time 27 28from collections import deque 29from stat import * 30 31from ._internal import streams 32from .api import ThreadSafeAPIClient 33from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock 34from .keep import KeepLocator, KeepClient 35import arvados.config as config 36import arvados.errors as errors 37import arvados.util 38import arvados.events as events 39from arvados.retry import retry_method 40 41from typing import ( 42 Any, 43 Callable, 44 Dict, 45 IO, 46 Iterator, 47 List, 48 Mapping, 49 Optional, 50 Tuple, 51 Union, 52) 53 54if sys.version_info < (3, 8): 55 from typing_extensions import Literal 56else: 57 from typing import Literal 58 59_logger = logging.getLogger('arvados.collection') 60 61ADD = "add" 62"""Argument value for `Collection` methods to represent an added item""" 63DEL = "del" 64"""Argument value for `Collection` methods to represent a removed item""" 65MOD = "mod" 66"""Argument value for `Collection` methods to represent a modified item""" 67TOK = "tok" 68"""Argument value for `Collection` methods to represent an item with token differences""" 69FILE = "file" 70"""`create_type` value for `Collection.find_or_create`""" 71COLLECTION = "collection" 72"""`create_type` value for `Collection.find_or_create`""" 73 74ChangeList = List[Union[ 75 Tuple[Literal[ADD, DEL], str, 'Collection'], 76 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'], 77]] 78ChangeType = Literal[ADD, DEL, MOD, TOK] 79CollectionItem = Union[ArvadosFile, 'Collection'] 80ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object] 81CreateType = Literal[COLLECTION, FILE] 82Properties = Dict[str, Any] 83StorageClasses = List[str] 84 85class CollectionBase(object): 86 """Abstract base class for Collection classes 87 88 .. ATTENTION:: Internal 89 This class is meant to be used by other parts of the SDK. User code 90 should instantiate or subclass `Collection` or one of its subclasses 91 directly. 92 """ 93 94 def __enter__(self): 95 """Enter a context block with this collection instance""" 96 return self 97 98 def __exit__(self, exc_type, exc_value, traceback): 99 """Exit a context block with this collection instance""" 100 pass 101 102 def _my_keep(self): 103 if self._keep_client is None: 104 self._keep_client = KeepClient(api_client=self._api_client, 105 num_retries=self.num_retries) 106 return self._keep_client 107 108 def stripped_manifest(self) -> str: 109 """Create a copy of the collection manifest with only size hints 110 111 This method returns a string with the current collection's manifest 112 text with all non-portable locator hints like permission hints and 113 remote cluster hints removed. The only hints in the returned manifest 114 will be size hints. 115 """ 116 raw = self.manifest_text() 117 clean = [] 118 for line in raw.split("\n"): 119 fields = line.split() 120 if fields: 121 clean_fields = fields[:1] + [ 122 (re.sub(r'\+[^\d][^\+]*', '', x) 123 if re.match(arvados.util.keep_locator_pattern, x) 124 else x) 125 for x in fields[1:]] 126 clean += [' '.join(clean_fields), "\n"] 127 return ''.join(clean) 128 129 130class _WriterFile(_FileLikeObjectBase): 131 def __init__(self, coll_writer, name): 132 super(_WriterFile, self).__init__(name, 'wb') 133 self.dest = coll_writer 134 135 def close(self): 136 super(_WriterFile, self).close() 137 self.dest.finish_current_file() 138 139 @_FileLikeObjectBase._before_close 140 def write(self, data): 141 self.dest.write(data) 142 143 @_FileLikeObjectBase._before_close 144 def writelines(self, seq): 145 for data in seq: 146 self.write(data) 147 148 @_FileLikeObjectBase._before_close 149 def flush(self): 150 self.dest.flush_data() 151 152 153class RichCollectionBase(CollectionBase): 154 """Base class for Collection classes 155 156 .. ATTENTION:: Internal 157 This class is meant to be used by other parts of the SDK. User code 158 should instantiate or subclass `Collection` or one of its subclasses 159 directly. 160 """ 161 162 def __init__(self, parent=None): 163 self.parent = parent 164 self._committed = False 165 self._has_remote_blocks = False 166 self._callback = None 167 self._items = {} 168 169 def _my_api(self): 170 raise NotImplementedError() 171 172 def _my_keep(self): 173 raise NotImplementedError() 174 175 def _my_block_manager(self): 176 raise NotImplementedError() 177 178 def writable(self) -> bool: 179 """Indicate whether this collection object can be modified 180 181 This method returns `False` if this object is a `CollectionReader`, 182 else `True`. 183 """ 184 raise NotImplementedError() 185 186 def root_collection(self) -> 'Collection': 187 """Get this collection's root collection object 188 189 If you open a subcollection with `Collection.find`, calling this method 190 on that subcollection returns the source Collection object. 191 """ 192 raise NotImplementedError() 193 194 def stream_name(self) -> str: 195 """Get the name of the manifest stream represented by this collection 196 197 If you open a subcollection with `Collection.find`, calling this method 198 on that subcollection returns the name of the stream you opened. 199 """ 200 raise NotImplementedError() 201 202 @synchronized 203 def has_remote_blocks(self) -> bool: 204 """Indiciate whether the collection refers to remote data 205 206 Returns `True` if the collection manifest includes any Keep locators 207 with a remote hint (`+R`), else `False`. 208 """ 209 if self._has_remote_blocks: 210 return True 211 for item in self: 212 if self[item].has_remote_blocks(): 213 return True 214 return False 215 216 @synchronized 217 def set_has_remote_blocks(self, val: bool) -> None: 218 """Cache whether this collection refers to remote blocks 219 220 .. ATTENTION:: Internal 221 This method is only meant to be used by other Collection methods. 222 223 Set this collection's cached "has remote blocks" flag to the given 224 value. 225 """ 226 self._has_remote_blocks = val 227 if self.parent: 228 self.parent.set_has_remote_blocks(val) 229 230 @must_be_writable 231 @synchronized 232 def find_or_create( 233 self, 234 path: str, 235 create_type: CreateType, 236 ) -> CollectionItem: 237 """Get the item at the given path, creating it if necessary 238 239 If `path` refers to a stream in this collection, returns a 240 corresponding `Subcollection` object. If `path` refers to a file in 241 this collection, returns a corresponding 242 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 243 this collection, then this method creates a new object and returns 244 it, creating parent streams as needed. The type of object created is 245 determined by the value of `create_type`. 246 247 Arguments: 248 249 * path: str --- The path to find or create within this collection. 250 251 * create_type: Literal[COLLECTION, FILE] --- The type of object to 252 create at `path` if one does not exist. Passing `COLLECTION` 253 creates a stream and returns the corresponding 254 `Subcollection`. Passing `FILE` creates a new file and returns the 255 corresponding `arvados.arvfile.ArvadosFile`. 256 """ 257 pathcomponents = path.split("/", 1) 258 if pathcomponents[0]: 259 item = self._items.get(pathcomponents[0]) 260 if len(pathcomponents) == 1: 261 if item is None: 262 # create new file 263 if create_type == COLLECTION: 264 item = Subcollection(self, pathcomponents[0]) 265 else: 266 item = ArvadosFile(self, pathcomponents[0]) 267 self._items[pathcomponents[0]] = item 268 self.set_committed(False) 269 self.notify(ADD, self, pathcomponents[0], item) 270 return item 271 else: 272 if item is None: 273 # create new collection 274 item = Subcollection(self, pathcomponents[0]) 275 self._items[pathcomponents[0]] = item 276 self.set_committed(False) 277 self.notify(ADD, self, pathcomponents[0], item) 278 if isinstance(item, RichCollectionBase): 279 return item.find_or_create(pathcomponents[1], create_type) 280 else: 281 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 282 else: 283 return self 284 285 @synchronized 286 def find(self, path: str) -> CollectionItem: 287 """Get the item at the given path 288 289 If `path` refers to a stream in this collection, returns a 290 corresponding `Subcollection` object. If `path` refers to a file in 291 this collection, returns a corresponding 292 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 293 this collection, then this method raises `NotADirectoryError`. 294 295 Arguments: 296 297 * path: str --- The path to find or create within this collection. 298 """ 299 if not path: 300 raise errors.ArgumentError("Parameter 'path' is empty.") 301 302 pathcomponents = path.split("/", 1) 303 if pathcomponents[0] == '': 304 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 305 306 item = self._items.get(pathcomponents[0]) 307 if item is None: 308 return None 309 elif len(pathcomponents) == 1: 310 return item 311 else: 312 if isinstance(item, RichCollectionBase): 313 if pathcomponents[1]: 314 return item.find(pathcomponents[1]) 315 else: 316 return item 317 else: 318 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 319 320 @synchronized 321 def mkdirs(self, path: str) -> 'Subcollection': 322 """Create and return a subcollection at `path` 323 324 If `path` exists within this collection, raises `FileExistsError`. 325 Otherwise, creates a stream at that path and returns the 326 corresponding `Subcollection`. 327 """ 328 if self.find(path) != None: 329 raise IOError(errno.EEXIST, "Directory or file exists", path) 330 331 return self.find_or_create(path, COLLECTION) 332 333 def open( 334 self, 335 path: str, 336 mode: str="r", 337 encoding: Optional[str]=None 338 ) -> IO: 339 """Open a file-like object within the collection 340 341 This method returns a file-like object that can read and/or write the 342 file located at `path` within the collection. If you attempt to write 343 a `path` that does not exist, the file is created with `find_or_create`. 344 If the file cannot be opened for any other reason, this method raises 345 `OSError` with an appropriate errno. 346 347 Arguments: 348 349 * path: str --- The path of the file to open within this collection 350 351 * mode: str --- The mode to open this file. Supports all the same 352 values as `builtins.open`. 353 354 * encoding: str | None --- The text encoding of the file. Only used 355 when the file is opened in text mode. The default is 356 platform-dependent. 357 358 """ 359 if not re.search(r'^[rwa][bt]?\+?$', mode): 360 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 361 362 if mode[0] == 'r' and '+' not in mode: 363 fclass = ArvadosFileReader 364 arvfile = self.find(path) 365 elif not self.writable(): 366 raise IOError(errno.EROFS, "Collection is read only") 367 else: 368 fclass = ArvadosFileWriter 369 arvfile = self.find_or_create(path, FILE) 370 371 if arvfile is None: 372 raise IOError(errno.ENOENT, "File not found", path) 373 if not isinstance(arvfile, ArvadosFile): 374 raise IOError(errno.EISDIR, "Is a directory", path) 375 376 if mode[0] == 'w': 377 arvfile.truncate(0) 378 379 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 380 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 381 if 'b' not in mode: 382 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 383 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 384 return f 385 386 def modified(self) -> bool: 387 """Indicate whether this collection has an API server record 388 389 Returns `False` if this collection corresponds to a record loaded from 390 the API server, `True` otherwise. 391 """ 392 return not self.committed() 393 394 @synchronized 395 def committed(self): 396 """Indicate whether this collection has an API server record 397 398 Returns `True` if this collection corresponds to a record loaded from 399 the API server, `False` otherwise. 400 """ 401 return self._committed 402 403 @synchronized 404 def set_committed(self, value: bool=True): 405 """Cache whether this collection has an API server record 406 407 .. ATTENTION:: Internal 408 This method is only meant to be used by other Collection methods. 409 410 Set this collection's cached "committed" flag to the given 411 value and propagates it as needed. 412 """ 413 if value == self._committed: 414 return 415 if value: 416 for k,v in self._items.items(): 417 v.set_committed(True) 418 self._committed = True 419 else: 420 self._committed = False 421 if self.parent is not None: 422 self.parent.set_committed(False) 423 424 @synchronized 425 def __iter__(self) -> Iterator[str]: 426 """Iterate names of streams and files in this collection 427 428 This method does not recurse. It only iterates the contents of this 429 collection's corresponding stream. 430 """ 431 return iter(self._items) 432 433 @synchronized 434 def __getitem__(self, k: str) -> CollectionItem: 435 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 436 437 This method does not recurse. If you want to search a path, use 438 `RichCollectionBase.find` instead. 439 """ 440 return self._items[k] 441 442 @synchronized 443 def __contains__(self, k: str) -> bool: 444 """Indicate whether this collection has an item with this name 445 446 This method does not recurse. It you want to check a path, use 447 `RichCollectionBase.exists` instead. 448 """ 449 return k in self._items 450 451 @synchronized 452 def __len__(self): 453 """Get the number of items directly contained in this collection 454 455 This method does not recurse. It only counts the streams and files 456 in this collection's corresponding stream. 457 """ 458 return len(self._items) 459 460 @must_be_writable 461 @synchronized 462 def __delitem__(self, p: str) -> None: 463 """Delete an item from this collection's stream 464 465 This method does not recurse. If you want to remove an item by a 466 path, use `RichCollectionBase.remove` instead. 467 """ 468 del self._items[p] 469 self.set_committed(False) 470 self.notify(DEL, self, p, None) 471 472 @synchronized 473 def keys(self) -> Iterator[str]: 474 """Iterate names of streams and files in this collection 475 476 This method does not recurse. It only iterates the contents of this 477 collection's corresponding stream. 478 """ 479 return self._items.keys() 480 481 @synchronized 482 def values(self) -> List[CollectionItem]: 483 """Get a list of objects in this collection's stream 484 485 The return value includes a `Subcollection` for every stream, and an 486 `arvados.arvfile.ArvadosFile` for every file, directly within this 487 collection's stream. This method does not recurse. 488 """ 489 return list(self._items.values()) 490 491 @synchronized 492 def items(self) -> List[Tuple[str, CollectionItem]]: 493 """Get a list of `(name, object)` tuples from this collection's stream 494 495 The return value includes a `Subcollection` for every stream, and an 496 `arvados.arvfile.ArvadosFile` for every file, directly within this 497 collection's stream. This method does not recurse. 498 """ 499 return list(self._items.items()) 500 501 def exists(self, path: str) -> bool: 502 """Indicate whether this collection includes an item at `path` 503 504 This method returns `True` if `path` refers to a stream or file within 505 this collection, else `False`. 506 507 Arguments: 508 509 * path: str --- The path to check for existence within this collection 510 """ 511 return self.find(path) is not None 512 513 @must_be_writable 514 @synchronized 515 def remove(self, path: str, recursive: bool=False) -> None: 516 """Remove the file or stream at `path` 517 518 Arguments: 519 520 * path: str --- The path of the item to remove from the collection 521 522 * recursive: bool --- Controls the method's behavior if `path` refers 523 to a nonempty stream. If `False` (the default), this method raises 524 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 525 items under the stream. 526 """ 527 if not path: 528 raise errors.ArgumentError("Parameter 'path' is empty.") 529 530 pathcomponents = path.split("/", 1) 531 item = self._items.get(pathcomponents[0]) 532 if item is None: 533 raise IOError(errno.ENOENT, "File not found", path) 534 if len(pathcomponents) == 1: 535 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 536 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 537 deleteditem = self._items[pathcomponents[0]] 538 del self._items[pathcomponents[0]] 539 self.set_committed(False) 540 self.notify(DEL, self, pathcomponents[0], deleteditem) 541 else: 542 item.remove(pathcomponents[1], recursive=recursive) 543 544 def _clonefrom(self, source): 545 for k,v in source.items(): 546 self._items[k] = v.clone(self, k) 547 548 def clone(self): 549 raise NotImplementedError() 550 551 @must_be_writable 552 @synchronized 553 def add( 554 self, 555 source_obj: CollectionItem, 556 target_name: str, 557 overwrite: bool=False, 558 reparent: bool=False, 559 ) -> None: 560 """Copy or move a file or subcollection object to this collection 561 562 Arguments: 563 564 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 565 to add to this collection 566 567 * target_name: str --- The path inside this collection where 568 `source_obj` should be added. 569 570 * overwrite: bool --- Controls the behavior of this method when the 571 collection already contains an object at `target_name`. If `False` 572 (the default), this method will raise `FileExistsError`. If `True`, 573 the object at `target_name` will be replaced with `source_obj`. 574 575 * reparent: bool --- Controls whether this method copies or moves 576 `source_obj`. If `False` (the default), `source_obj` is copied into 577 this collection. If `True`, `source_obj` is moved into this 578 collection. 579 """ 580 if target_name in self and not overwrite: 581 raise IOError(errno.EEXIST, "File already exists", target_name) 582 583 modified_from = None 584 if target_name in self: 585 modified_from = self[target_name] 586 587 # Actually make the move or copy. 588 if reparent: 589 source_obj._reparent(self, target_name) 590 item = source_obj 591 else: 592 item = source_obj.clone(self, target_name) 593 594 self._items[target_name] = item 595 self.set_committed(False) 596 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 597 self.set_has_remote_blocks(True) 598 599 if modified_from: 600 self.notify(MOD, self, target_name, (modified_from, item)) 601 else: 602 self.notify(ADD, self, target_name, item) 603 604 def _get_src_target(self, source, target_path, source_collection, create_dest): 605 if source_collection is None: 606 source_collection = self 607 608 # Find the object 609 if isinstance(source, str): 610 source_obj = source_collection.find(source) 611 if source_obj is None: 612 raise IOError(errno.ENOENT, "File not found", source) 613 sourcecomponents = source.split("/") 614 else: 615 source_obj = source 616 sourcecomponents = None 617 618 # Find parent collection the target path 619 targetcomponents = target_path.split("/") 620 621 # Determine the name to use. 622 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 623 624 if not target_name: 625 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 626 627 if create_dest: 628 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 629 else: 630 if len(targetcomponents) > 1: 631 target_dir = self.find("/".join(targetcomponents[0:-1])) 632 else: 633 target_dir = self 634 635 if target_dir is None: 636 raise IOError(errno.ENOENT, "Target directory not found", target_name) 637 638 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 639 target_dir = target_dir[target_name] 640 target_name = sourcecomponents[-1] 641 642 return (source_obj, target_dir, target_name) 643 644 @must_be_writable 645 @synchronized 646 def copy( 647 self, 648 source: Union[str, CollectionItem], 649 target_path: str, 650 source_collection: Optional['RichCollectionBase']=None, 651 overwrite: bool=False, 652 ) -> None: 653 """Copy a file or subcollection object to this collection 654 655 Arguments: 656 657 * source: str | arvados.arvfile.ArvadosFile | 658 arvados.collection.Subcollection --- The file or subcollection to 659 add to this collection. If `source` is a str, the object will be 660 found by looking up this path from `source_collection` (see 661 below). 662 663 * target_path: str --- The path inside this collection where the 664 source object should be added. 665 666 * source_collection: arvados.collection.Collection | None --- The 667 collection to find the source object from when `source` is a 668 path. Defaults to the current collection (`self`). 669 670 * overwrite: bool --- Controls the behavior of this method when the 671 collection already contains an object at `target_path`. If `False` 672 (the default), this method will raise `FileExistsError`. If `True`, 673 the object at `target_path` will be replaced with `source_obj`. 674 """ 675 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 676 target_dir.add(source_obj, target_name, overwrite, False) 677 678 @must_be_writable 679 @synchronized 680 def rename( 681 self, 682 source: Union[str, CollectionItem], 683 target_path: str, 684 source_collection: Optional['RichCollectionBase']=None, 685 overwrite: bool=False, 686 ) -> None: 687 """Move a file or subcollection object to this collection 688 689 Arguments: 690 691 * source: str | arvados.arvfile.ArvadosFile | 692 arvados.collection.Subcollection --- The file or subcollection to 693 add to this collection. If `source` is a str, the object will be 694 found by looking up this path from `source_collection` (see 695 below). 696 697 * target_path: str --- The path inside this collection where the 698 source object should be added. 699 700 * source_collection: arvados.collection.Collection | None --- The 701 collection to find the source object from when `source` is a 702 path. Defaults to the current collection (`self`). 703 704 * overwrite: bool --- Controls the behavior of this method when the 705 collection already contains an object at `target_path`. If `False` 706 (the default), this method will raise `FileExistsError`. If `True`, 707 the object at `target_path` will be replaced with `source_obj`. 708 """ 709 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 710 if not source_obj.writable(): 711 raise IOError(errno.EROFS, "Source collection is read only", source) 712 target_dir.add(source_obj, target_name, overwrite, True) 713 714 def portable_manifest_text(self, stream_name: str=".") -> str: 715 """Get the portable manifest text for this collection 716 717 The portable manifest text is normalized, and does not include access 718 tokens. This method does not flush outstanding blocks to Keep. 719 720 Arguments: 721 722 * stream_name: str --- The name to use for this collection's stream in 723 the generated manifest. Default `'.'`. 724 """ 725 return self._get_manifest_text(stream_name, True, True) 726 727 @synchronized 728 def manifest_text( 729 self, 730 stream_name: str=".", 731 strip: bool=False, 732 normalize: bool=False, 733 only_committed: bool=False, 734 ) -> str: 735 """Get the manifest text for this collection 736 737 Arguments: 738 739 * stream_name: str --- The name to use for this collection's stream in 740 the generated manifest. Default `'.'`. 741 742 * strip: bool --- Controls whether or not the returned manifest text 743 includes access tokens. If `False` (the default), the manifest text 744 will include access tokens. If `True`, the manifest text will not 745 include access tokens. 746 747 * normalize: bool --- Controls whether or not the returned manifest 748 text is normalized. Default `False`. 749 750 * only_committed: bool --- Controls whether or not this method uploads 751 pending data to Keep before building and returning the manifest text. 752 If `False` (the default), this method will finish uploading all data 753 to Keep, then return the final manifest. If `True`, this method will 754 build and return a manifest that only refers to the data that has 755 finished uploading at the time this method was called. 756 """ 757 if not only_committed: 758 self._my_block_manager().commit_all() 759 return self._get_manifest_text(stream_name, strip, normalize, 760 only_committed=only_committed) 761 762 @synchronized 763 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 764 """Get the manifest text for this collection, sub collections and files. 765 766 :stream_name: 767 Name to use for this stream (directory) 768 769 :strip: 770 If True, remove signing tokens from block locators if present. 771 If False (default), block locators are left unchanged. 772 773 :normalize: 774 If True, always export the manifest text in normalized form 775 even if the Collection is not modified. If False (default) and the collection 776 is not modified, return the original manifest text even if it is not 777 in normalized form. 778 779 :only_committed: 780 If True, only include blocks that were already committed to Keep. 781 782 """ 783 784 if not self.committed() or self._manifest_text is None or normalize: 785 stream = {} 786 buf = [] 787 sorted_keys = sorted(self.keys()) 788 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 789 # Create a stream per file `k` 790 arvfile = self[filename] 791 filestream = [] 792 for segment in arvfile.segments(): 793 loc = segment.locator 794 if arvfile.parent._my_block_manager().is_bufferblock(loc): 795 if only_committed: 796 continue 797 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 798 if strip: 799 loc = KeepLocator(loc).stripped() 800 filestream.append(streams.LocatorAndRange( 801 loc, 802 KeepLocator(loc).size, 803 segment.segment_offset, 804 segment.range_size, 805 )) 806 stream[filename] = filestream 807 if stream: 808 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n") 809 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 810 buf.append(self[dirname].manifest_text( 811 stream_name=os.path.join(stream_name, dirname), 812 strip=strip, normalize=True, only_committed=only_committed)) 813 return "".join(buf) 814 else: 815 if strip: 816 return self.stripped_manifest() 817 else: 818 return self._manifest_text 819 820 @synchronized 821 def _copy_remote_blocks(self, remote_blocks={}): 822 """Scan through the entire collection and ask Keep to copy remote blocks. 823 824 When accessing a remote collection, blocks will have a remote signature 825 (+R instead of +A). Collect these signatures and request Keep to copy the 826 blocks to the local cluster, returning local (+A) signatures. 827 828 :remote_blocks: 829 Shared cache of remote to local block mappings. This is used to avoid 830 doing extra work when blocks are shared by more than one file in 831 different subdirectories. 832 833 """ 834 for item in self: 835 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 836 return remote_blocks 837 838 @synchronized 839 def diff( 840 self, 841 end_collection: 'RichCollectionBase', 842 prefix: str=".", 843 holding_collection: Optional['Collection']=None, 844 ) -> ChangeList: 845 """Build a list of differences between this collection and another 846 847 Arguments: 848 849 * end_collection: arvados.collection.RichCollectionBase --- A 850 collection object with the desired end state. The returned diff 851 list will describe how to go from the current collection object 852 `self` to `end_collection`. 853 854 * prefix: str --- The name to use for this collection's stream in 855 the diff list. Default `'.'`. 856 857 * holding_collection: arvados.collection.Collection | None --- A 858 collection object used to hold objects for the returned diff 859 list. By default, a new empty collection is created. 860 """ 861 changes = [] 862 if holding_collection is None: 863 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 864 for k in self: 865 if k not in end_collection: 866 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 867 for k in end_collection: 868 if k in self: 869 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 870 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 871 elif end_collection[k] != self[k]: 872 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 873 else: 874 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 875 else: 876 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 877 return changes 878 879 @must_be_writable 880 @synchronized 881 def apply(self, changes: ChangeList) -> None: 882 """Apply a list of changes from to this collection 883 884 This method takes a list of changes generated by 885 `RichCollectionBase.diff` and applies it to this 886 collection. Afterward, the state of this collection object will 887 match the state of `end_collection` passed to `diff`. If a change 888 conflicts with a local change, it will be saved to an alternate path 889 indicating the conflict. 890 891 Arguments: 892 893 * changes: arvados.collection.ChangeList --- The list of differences 894 generated by `RichCollectionBase.diff`. 895 """ 896 if changes: 897 self.set_committed(False) 898 for change in changes: 899 event_type = change[0] 900 path = change[1] 901 initial = change[2] 902 local = self.find(path) 903 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 904 time.gmtime())) 905 if event_type == ADD: 906 if local is None: 907 # No local file at path, safe to copy over new file 908 self.copy(initial, path) 909 elif local is not None and local != initial: 910 # There is already local file and it is different: 911 # save change to conflict file. 912 self.copy(initial, conflictpath) 913 elif event_type == MOD or event_type == TOK: 914 final = change[3] 915 if local == initial: 916 # Local matches the "initial" item so it has not 917 # changed locally and is safe to update. 918 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 919 # Replace contents of local file with new contents 920 local.replace_contents(final) 921 else: 922 # Overwrite path with new item; this can happen if 923 # path was a file and is now a collection or vice versa 924 self.copy(final, path, overwrite=True) 925 else: 926 # Local is missing (presumably deleted) or local doesn't 927 # match the "start" value, so save change to conflict file 928 self.copy(final, conflictpath) 929 elif event_type == DEL: 930 if local == initial: 931 # Local item matches "initial" value, so it is safe to remove. 932 self.remove(path, recursive=True) 933 # else, the file is modified or already removed, in either 934 # case we don't want to try to remove it. 935 936 def portable_data_hash(self) -> str: 937 """Get the portable data hash for this collection's manifest""" 938 if self._manifest_locator and self.committed(): 939 # If the collection is already saved on the API server, and it's committed 940 # then return API server's PDH response. 941 return self._portable_data_hash 942 else: 943 stripped = self.portable_manifest_text().encode() 944 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 945 946 @synchronized 947 def subscribe(self, callback: ChangeCallback) -> None: 948 """Set a notify callback for changes to this collection 949 950 Arguments: 951 952 * callback: arvados.collection.ChangeCallback --- The callable to 953 call each time the collection is changed. 954 """ 955 if self._callback is None: 956 self._callback = callback 957 else: 958 raise errors.ArgumentError("A callback is already set on this collection.") 959 960 @synchronized 961 def unsubscribe(self) -> None: 962 """Remove any notify callback set for changes to this collection""" 963 if self._callback is not None: 964 self._callback = None 965 966 @synchronized 967 def notify( 968 self, 969 event: ChangeType, 970 collection: 'RichCollectionBase', 971 name: str, 972 item: CollectionItem, 973 ) -> None: 974 """Notify any subscribed callback about a change to this collection 975 976 .. ATTENTION:: Internal 977 This method is only meant to be used by other Collection methods. 978 979 If a callback has been registered with `RichCollectionBase.subscribe`, 980 it will be called with information about a change to this collection. 981 Then this notification will be propagated to this collection's root. 982 983 Arguments: 984 985 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 986 the collection. 987 988 * collection: arvados.collection.RichCollectionBase --- The 989 collection that was modified. 990 991 * name: str --- The name of the file or stream within `collection` that 992 was modified. 993 994 * item: arvados.arvfile.ArvadosFile | 995 arvados.collection.Subcollection --- The new contents at `name` 996 within `collection`. 997 """ 998 if self._callback: 999 self._callback(event, collection, name, item) 1000 self.root_collection().notify(event, collection, name, item) 1001 1002 @synchronized 1003 def __eq__(self, other: Any) -> bool: 1004 """Indicate whether this collection object is equal to another""" 1005 if other is self: 1006 return True 1007 if not isinstance(other, RichCollectionBase): 1008 return False 1009 if len(self._items) != len(other): 1010 return False 1011 for k in self._items: 1012 if k not in other: 1013 return False 1014 if self._items[k] != other[k]: 1015 return False 1016 return True 1017 1018 def __ne__(self, other: Any) -> bool: 1019 """Indicate whether this collection object is not equal to another""" 1020 return not self.__eq__(other) 1021 1022 @synchronized 1023 def flush(self) -> None: 1024 """Upload any pending data to Keep""" 1025 for e in self.values(): 1026 e.flush() 1027 1028 1029class Collection(RichCollectionBase): 1030 """Read and manipulate an Arvados collection 1031 1032 This class provides a high-level interface to create, read, and update 1033 Arvados collections and their contents. Refer to the Arvados Python SDK 1034 cookbook for [an introduction to using the Collection class][cookbook]. 1035 1036 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1037 """ 1038 1039 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1040 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1041 keep_client: Optional['arvados.keep.KeepClient']=None, 1042 num_retries: int=10, 1043 parent: Optional['Collection']=None, 1044 apiconfig: Optional[Mapping[str, str]]=None, 1045 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1046 replication_desired: Optional[int]=None, 1047 storage_classes_desired: Optional[List[str]]=None, 1048 put_threads: Optional[int]=None): 1049 """Initialize a Collection object 1050 1051 Arguments: 1052 1053 * manifest_locator_or_text: str | None --- This string can contain a 1054 collection manifest text, portable data hash, or UUID. When given a 1055 portable data hash or UUID, this instance will load a collection 1056 record from the API server. Otherwise, this instance will represent a 1057 new collection without an API server record. The default value `None` 1058 instantiates a new collection with an empty manifest. 1059 1060 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1061 Arvados API client object this instance uses to make requests. If 1062 none is given, this instance creates its own client using the 1063 settings from `apiconfig` (see below). If your client instantiates 1064 many Collection objects, you can help limit memory utilization by 1065 calling `arvados.api.api` to construct an 1066 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client` 1067 for every Collection. 1068 1069 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1070 object this instance uses to make requests. If none is given, this 1071 instance creates its own client using its `api_client`. 1072 1073 * num_retries: int --- The number of times that client requests are 1074 retried. Default 10. 1075 1076 * parent: arvados.collection.Collection | None --- The parent Collection 1077 object of this instance, if any. This argument is primarily used by 1078 other Collection methods; user client code shouldn't need to use it. 1079 1080 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1081 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1082 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1083 Collection object constructs one from these settings. If no 1084 mapping is provided, calls `arvados.config.settings` to get these 1085 parameters from user configuration. 1086 1087 * block_manager: arvados.arvfile._BlockManager | None --- The 1088 _BlockManager object used by this instance to coordinate reading 1089 and writing Keep data blocks. If none is given, this instance 1090 constructs its own. This argument is primarily used by other 1091 Collection methods; user client code shouldn't need to use it. 1092 1093 * replication_desired: int | None --- This controls both the value of 1094 the `replication_desired` field on API collection records saved by 1095 this class, as well as the number of Keep services that the object 1096 writes new data blocks to. If none is given, uses the default value 1097 configured for the cluster. 1098 1099 * storage_classes_desired: list[str] | None --- This controls both 1100 the value of the `storage_classes_desired` field on API collection 1101 records saved by this class, as well as selecting which specific 1102 Keep services the object writes new data blocks to. If none is 1103 given, defaults to an empty list. 1104 1105 * put_threads: int | None --- The number of threads to run 1106 simultaneously to upload data blocks to Keep. This value is used when 1107 building a new `block_manager`. It is unused when a `block_manager` 1108 is provided. 1109 """ 1110 1111 if storage_classes_desired and type(storage_classes_desired) is not list: 1112 raise errors.ArgumentError("storage_classes_desired must be list type.") 1113 1114 super(Collection, self).__init__(parent) 1115 self._api_client = api_client 1116 self._keep_client = keep_client 1117 1118 # Use the keep client from ThreadSafeAPIClient 1119 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient): 1120 self._keep_client = self._api_client.keep 1121 1122 self._block_manager = block_manager 1123 self.replication_desired = replication_desired 1124 self._storage_classes_desired = storage_classes_desired 1125 self.put_threads = put_threads 1126 1127 if apiconfig: 1128 self._config = apiconfig 1129 else: 1130 self._config = config.settings() 1131 1132 self.num_retries = num_retries 1133 self._manifest_locator = None 1134 self._manifest_text = None 1135 self._portable_data_hash = None 1136 self._api_response = None 1137 self._past_versions = set() 1138 1139 self.lock = threading.RLock() 1140 self.events = None 1141 1142 if manifest_locator_or_text: 1143 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1144 self._manifest_locator = manifest_locator_or_text 1145 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1146 self._manifest_locator = manifest_locator_or_text 1147 if not self._has_local_collection_uuid(): 1148 self._has_remote_blocks = True 1149 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1150 self._manifest_text = manifest_locator_or_text 1151 if '+R' in self._manifest_text: 1152 self._has_remote_blocks = True 1153 else: 1154 raise errors.ArgumentError( 1155 "Argument to CollectionReader is not a manifest or a collection UUID") 1156 1157 try: 1158 self._populate() 1159 except errors.SyntaxError as e: 1160 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1161 1162 def storage_classes_desired(self) -> List[str]: 1163 """Get this collection's `storage_classes_desired` value""" 1164 return self._storage_classes_desired or [] 1165 1166 def root_collection(self) -> 'Collection': 1167 return self 1168 1169 def get_properties(self) -> Properties: 1170 """Get this collection's properties 1171 1172 This method always returns a dict. If this collection object does not 1173 have an associated API record, or that record does not have any 1174 properties set, this method returns an empty dict. 1175 """ 1176 if self._api_response and self._api_response["properties"]: 1177 return self._api_response["properties"] 1178 else: 1179 return {} 1180 1181 def get_trash_at(self) -> Optional[datetime.datetime]: 1182 """Get this collection's `trash_at` field 1183 1184 This method parses the `trash_at` field of the collection's API 1185 record and returns a datetime from it. If that field is not set, or 1186 this collection object does not have an associated API record, 1187 returns None. 1188 """ 1189 if self._api_response and self._api_response["trash_at"]: 1190 try: 1191 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1192 except ValueError: 1193 return None 1194 else: 1195 return None 1196 1197 def stream_name(self) -> str: 1198 return "." 1199 1200 def writable(self) -> bool: 1201 return True 1202 1203 @synchronized 1204 def known_past_version( 1205 self, 1206 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1207 ) -> bool: 1208 """Indicate whether an API record for this collection has been seen before 1209 1210 As this collection object loads records from the API server, it records 1211 their `modified_at` and `portable_data_hash` fields. This method accepts 1212 a 2-tuple with values for those fields, and returns `True` if the 1213 combination was previously loaded. 1214 """ 1215 return modified_at_and_portable_data_hash in self._past_versions 1216 1217 @synchronized 1218 @retry_method 1219 def update( 1220 self, 1221 other: Optional['Collection']=None, 1222 num_retries: Optional[int]=None, 1223 ) -> None: 1224 """Merge another collection's contents into this one 1225 1226 This method compares the manifest of this collection instance with 1227 another, then updates this instance's manifest with changes from the 1228 other, renaming files to flag conflicts where necessary. 1229 1230 When called without any arguments, this method reloads the collection's 1231 API record, and updates this instance with any changes that have 1232 appeared server-side. If this instance does not have a corresponding 1233 API record, this method raises `arvados.errors.ArgumentError`. 1234 1235 Arguments: 1236 1237 * other: arvados.collection.Collection | None --- The collection 1238 whose contents should be merged into this instance. When not 1239 provided, this method reloads this collection's API record and 1240 constructs a Collection object from it. If this instance does not 1241 have a corresponding API record, this method raises 1242 `arvados.errors.ArgumentError`. 1243 1244 * num_retries: int | None --- The number of times to retry reloading 1245 the collection's API record from the API server. If not specified, 1246 uses the `num_retries` provided when this instance was constructed. 1247 """ 1248 if other is None: 1249 if self._manifest_locator is None: 1250 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1251 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1252 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1253 response.get("portable_data_hash") != self.portable_data_hash()): 1254 # The record on the server is different from our current one, but we've seen it before, 1255 # so ignore it because it's already been merged. 1256 # However, if it's the same as our current record, proceed with the update, because we want to update 1257 # our tokens. 1258 return 1259 else: 1260 self._remember_api_response(response) 1261 other = CollectionReader(response["manifest_text"]) 1262 baseline = CollectionReader(self._manifest_text) 1263 self.apply(baseline.diff(other)) 1264 self._manifest_text = self.manifest_text() 1265 1266 @synchronized 1267 def _my_api(self): 1268 if self._api_client is None: 1269 self._api_client = ThreadSafeAPIClient(self._config, version='v1') 1270 if self._keep_client is None: 1271 self._keep_client = self._api_client.keep 1272 return self._api_client 1273 1274 @synchronized 1275 def _my_keep(self): 1276 if self._keep_client is None: 1277 if self._api_client is None: 1278 self._my_api() 1279 else: 1280 self._keep_client = KeepClient(api_client=self._api_client) 1281 return self._keep_client 1282 1283 @synchronized 1284 def _my_block_manager(self): 1285 if self._block_manager is None: 1286 copies = (self.replication_desired or 1287 self._my_api()._rootDesc.get('defaultCollectionReplication', 1288 2)) 1289 self._block_manager = _BlockManager(self._my_keep(), 1290 copies=copies, 1291 put_threads=self.put_threads, 1292 num_retries=self.num_retries, 1293 storage_classes_func=self.storage_classes_desired) 1294 return self._block_manager 1295 1296 def _remember_api_response(self, response): 1297 self._api_response = response 1298 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1299 1300 def _populate_from_api_server(self): 1301 # As in KeepClient itself, we must wait until the last 1302 # possible moment to instantiate an API client, in order to 1303 # avoid tripping up clients that don't have access to an API 1304 # server. If we do build one, make sure our Keep client uses 1305 # it. If instantiation fails, we'll fall back to the except 1306 # clause, just like any other Collection lookup 1307 # failure. Return an exception, or None if successful. 1308 self._remember_api_response(self._my_api().collections().get( 1309 uuid=self._manifest_locator).execute( 1310 num_retries=self.num_retries)) 1311 self._manifest_text = self._api_response['manifest_text'] 1312 self._portable_data_hash = self._api_response['portable_data_hash'] 1313 # If not overriden via kwargs, we should try to load the 1314 # replication_desired and storage_classes_desired from the API server 1315 if self.replication_desired is None: 1316 self.replication_desired = self._api_response.get('replication_desired', None) 1317 if self._storage_classes_desired is None: 1318 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1319 1320 def _populate(self): 1321 if self._manifest_text is None: 1322 if self._manifest_locator is None: 1323 return 1324 else: 1325 self._populate_from_api_server() 1326 self._baseline_manifest = self._manifest_text 1327 self._import_manifest(self._manifest_text) 1328 1329 def _has_collection_uuid(self): 1330 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1331 1332 def _has_local_collection_uuid(self): 1333 return self._has_collection_uuid and \ 1334 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1335 1336 def __enter__(self): 1337 return self 1338 1339 def __exit__(self, exc_type, exc_value, traceback): 1340 """Exit a context with this collection instance 1341 1342 If no exception was raised inside the context block, and this 1343 collection is writable and has a corresponding API record, that 1344 record will be updated to match the state of this instance at the end 1345 of the block. 1346 """ 1347 if exc_type is None: 1348 if self.writable() and self._has_collection_uuid(): 1349 self.save() 1350 self.stop_threads() 1351 1352 def stop_threads(self) -> None: 1353 """Stop background Keep upload/download threads""" 1354 if self._block_manager is not None: 1355 self._block_manager.stop_threads() 1356 1357 @synchronized 1358 def manifest_locator(self) -> Optional[str]: 1359 """Get this collection's manifest locator, if any 1360 1361 * If this collection instance is associated with an API record with a 1362 UUID, return that. 1363 * Otherwise, if this collection instance was loaded from an API record 1364 by portable data hash, return that. 1365 * Otherwise, return `None`. 1366 """ 1367 return self._manifest_locator 1368 1369 @synchronized 1370 def clone( 1371 self, 1372 new_parent: Optional['Collection']=None, 1373 new_name: Optional[str]=None, 1374 readonly: bool=False, 1375 new_config: Optional[Mapping[str, str]]=None, 1376 ) -> 'Collection': 1377 """Create a Collection object with the same contents as this instance 1378 1379 This method creates a new Collection object with contents that match 1380 this instance's. The new collection will not be associated with any API 1381 record. 1382 1383 Arguments: 1384 1385 * new_parent: arvados.collection.Collection | None --- This value is 1386 passed to the new Collection's constructor as the `parent` 1387 argument. 1388 1389 * new_name: str | None --- This value is unused. 1390 1391 * readonly: bool --- If this value is true, this method constructs and 1392 returns a `CollectionReader`. Otherwise, it returns a mutable 1393 `Collection`. Default `False`. 1394 1395 * new_config: Mapping[str, str] | None --- This value is passed to the 1396 new Collection's constructor as `apiconfig`. If no value is provided, 1397 defaults to the configuration passed to this instance's constructor. 1398 """ 1399 if new_config is None: 1400 new_config = self._config 1401 if readonly: 1402 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1403 else: 1404 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1405 1406 newcollection._clonefrom(self) 1407 return newcollection 1408 1409 @synchronized 1410 def api_response(self) -> Optional[Dict[str, Any]]: 1411 """Get this instance's associated API record 1412 1413 If this Collection instance has an associated API record, return it. 1414 Otherwise, return `None`. 1415 """ 1416 return self._api_response 1417 1418 def find_or_create( 1419 self, 1420 path: str, 1421 create_type: CreateType, 1422 ) -> CollectionItem: 1423 if path == ".": 1424 return self 1425 else: 1426 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1427 1428 def find(self, path: str) -> CollectionItem: 1429 if path == ".": 1430 return self 1431 else: 1432 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1433 1434 def remove(self, path: str, recursive: bool=False) -> None: 1435 if path == ".": 1436 raise errors.ArgumentError("Cannot remove '.'") 1437 else: 1438 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1439 1440 @must_be_writable 1441 @synchronized 1442 @retry_method 1443 def save( 1444 self, 1445 properties: Optional[Properties]=None, 1446 storage_classes: Optional[StorageClasses]=None, 1447 trash_at: Optional[datetime.datetime]=None, 1448 merge: bool=True, 1449 num_retries: Optional[int]=None, 1450 preserve_version: bool=False, 1451 ) -> str: 1452 """Save collection to an existing API record 1453 1454 This method updates the instance's corresponding API record to match 1455 the instance's state. If this instance does not have a corresponding API 1456 record yet, raises `AssertionError`. (To create a new API record, use 1457 `Collection.save_new`.) This method returns the saved collection 1458 manifest. 1459 1460 Arguments: 1461 1462 * properties: dict[str, Any] | None --- If provided, the API record will 1463 be updated with these properties. Note this will completely replace 1464 any existing properties. 1465 1466 * storage_classes: list[str] | None --- If provided, the API record will 1467 be updated with this value in the `storage_classes_desired` field. 1468 This value will also be saved on the instance and used for any 1469 changes that follow. 1470 1471 * trash_at: datetime.datetime | None --- If provided, the API record 1472 will be updated with this value in the `trash_at` field. 1473 1474 * merge: bool --- If `True` (the default), this method will first 1475 reload this collection's API record, and merge any new contents into 1476 this instance before saving changes. See `Collection.update` for 1477 details. 1478 1479 * num_retries: int | None --- The number of times to retry reloading 1480 the collection's API record from the API server. If not specified, 1481 uses the `num_retries` provided when this instance was constructed. 1482 1483 * preserve_version: bool --- This value will be passed to directly 1484 to the underlying API call. If `True`, the Arvados API will 1485 preserve the versions of this collection both immediately before 1486 and after the update. If `True` when the API server is not 1487 configured with collection versioning, this method raises 1488 `arvados.errors.ArgumentError`. 1489 """ 1490 if properties and type(properties) is not dict: 1491 raise errors.ArgumentError("properties must be dictionary type.") 1492 1493 if storage_classes and type(storage_classes) is not list: 1494 raise errors.ArgumentError("storage_classes must be list type.") 1495 if storage_classes: 1496 self._storage_classes_desired = storage_classes 1497 1498 if trash_at and type(trash_at) is not datetime.datetime: 1499 raise errors.ArgumentError("trash_at must be datetime type.") 1500 1501 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1502 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1503 1504 body={} 1505 if properties: 1506 body["properties"] = properties 1507 if self.storage_classes_desired(): 1508 body["storage_classes_desired"] = self.storage_classes_desired() 1509 if trash_at: 1510 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1511 body["trash_at"] = t 1512 if preserve_version: 1513 body["preserve_version"] = preserve_version 1514 1515 if not self.committed(): 1516 if self._has_remote_blocks: 1517 # Copy any remote blocks to the local cluster. 1518 self._copy_remote_blocks(remote_blocks={}) 1519 self._has_remote_blocks = False 1520 if not self._has_collection_uuid(): 1521 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1522 elif not self._has_local_collection_uuid(): 1523 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1524 1525 self._my_block_manager().commit_all() 1526 1527 if merge: 1528 self.update() 1529 1530 text = self.manifest_text(strip=False) 1531 body['manifest_text'] = text 1532 1533 self._remember_api_response(self._my_api().collections().update( 1534 uuid=self._manifest_locator, 1535 body=body 1536 ).execute(num_retries=num_retries)) 1537 self._manifest_text = self._api_response["manifest_text"] 1538 self._portable_data_hash = self._api_response["portable_data_hash"] 1539 self.set_committed(True) 1540 elif body: 1541 self._remember_api_response(self._my_api().collections().update( 1542 uuid=self._manifest_locator, 1543 body=body 1544 ).execute(num_retries=num_retries)) 1545 1546 return self._manifest_text 1547 1548 1549 @must_be_writable 1550 @synchronized 1551 @retry_method 1552 def save_new( 1553 self, 1554 name: Optional[str]=None, 1555 create_collection_record: bool=True, 1556 owner_uuid: Optional[str]=None, 1557 properties: Optional[Properties]=None, 1558 storage_classes: Optional[StorageClasses]=None, 1559 trash_at: Optional[datetime.datetime]=None, 1560 ensure_unique_name: bool=False, 1561 num_retries: Optional[int]=None, 1562 preserve_version: bool=False, 1563 ): 1564 """Save collection to a new API record 1565 1566 This method finishes uploading new data blocks and (optionally) 1567 creates a new API collection record with the provided data. If a new 1568 record is created, this instance becomes associated with that record 1569 for future updates like `save()`. This method returns the saved 1570 collection manifest. 1571 1572 Arguments: 1573 1574 * name: str | None --- The `name` field to use on the new collection 1575 record. If not specified, a generic default name is generated. 1576 1577 * create_collection_record: bool --- If `True` (the default), creates a 1578 collection record on the API server. If `False`, the method finishes 1579 all data uploads and only returns the resulting collection manifest 1580 without sending it to the API server. 1581 1582 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1583 new collection record. 1584 1585 * properties: dict[str, Any] | None --- The `properties` field to use on 1586 the new collection record. 1587 1588 * storage_classes: list[str] | None --- The 1589 `storage_classes_desired` field to use on the new collection record. 1590 1591 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1592 on the new collection record. 1593 1594 * ensure_unique_name: bool --- This value is passed directly to the 1595 Arvados API when creating the collection record. If `True`, the API 1596 server may modify the submitted `name` to ensure the collection's 1597 `name`+`owner_uuid` combination is unique. If `False` (the default), 1598 if a collection already exists with this same `name`+`owner_uuid` 1599 combination, creating a collection record will raise a validation 1600 error. 1601 1602 * num_retries: int | None --- The number of times to retry reloading 1603 the collection's API record from the API server. If not specified, 1604 uses the `num_retries` provided when this instance was constructed. 1605 1606 * preserve_version: bool --- This value will be passed to directly 1607 to the underlying API call. If `True`, the Arvados API will 1608 preserve the versions of this collection both immediately before 1609 and after the update. If `True` when the API server is not 1610 configured with collection versioning, this method raises 1611 `arvados.errors.ArgumentError`. 1612 """ 1613 if properties and type(properties) is not dict: 1614 raise errors.ArgumentError("properties must be dictionary type.") 1615 1616 if storage_classes and type(storage_classes) is not list: 1617 raise errors.ArgumentError("storage_classes must be list type.") 1618 1619 if trash_at and type(trash_at) is not datetime.datetime: 1620 raise errors.ArgumentError("trash_at must be datetime type.") 1621 1622 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1623 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1624 1625 if self._has_remote_blocks: 1626 # Copy any remote blocks to the local cluster. 1627 self._copy_remote_blocks(remote_blocks={}) 1628 self._has_remote_blocks = False 1629 1630 if storage_classes: 1631 self._storage_classes_desired = storage_classes 1632 1633 self._my_block_manager().commit_all() 1634 text = self.manifest_text(strip=False) 1635 1636 if create_collection_record: 1637 if name is None: 1638 name = "New collection" 1639 ensure_unique_name = True 1640 1641 body = {"manifest_text": text, 1642 "name": name, 1643 "replication_desired": self.replication_desired} 1644 if owner_uuid: 1645 body["owner_uuid"] = owner_uuid 1646 if properties: 1647 body["properties"] = properties 1648 if self.storage_classes_desired(): 1649 body["storage_classes_desired"] = self.storage_classes_desired() 1650 if trash_at: 1651 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1652 body["trash_at"] = t 1653 if preserve_version: 1654 body["preserve_version"] = preserve_version 1655 1656 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1657 text = self._api_response["manifest_text"] 1658 1659 self._manifest_locator = self._api_response["uuid"] 1660 self._portable_data_hash = self._api_response["portable_data_hash"] 1661 1662 self._manifest_text = text 1663 self.set_committed(True) 1664 1665 return text 1666 1667 _token_re = re.compile(r'(\S+)(\s+|$)') 1668 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1669 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1670 1671 def _unescape_manifest_path(self, path): 1672 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1673 1674 @synchronized 1675 def _import_manifest(self, manifest_text): 1676 """Import a manifest into a `Collection`. 1677 1678 :manifest_text: 1679 The manifest text to import from. 1680 1681 """ 1682 if len(self) > 0: 1683 raise ArgumentError("Can only import manifest into an empty collection") 1684 1685 STREAM_NAME = 0 1686 BLOCKS = 1 1687 SEGMENTS = 2 1688 1689 stream_name = None 1690 state = STREAM_NAME 1691 1692 for token_and_separator in self._token_re.finditer(manifest_text): 1693 tok = token_and_separator.group(1) 1694 sep = token_and_separator.group(2) 1695 1696 if state == STREAM_NAME: 1697 # starting a new stream 1698 stream_name = self._unescape_manifest_path(tok) 1699 blocks = [] 1700 segments = [] 1701 streamoffset = 0 1702 state = BLOCKS 1703 self.find_or_create(stream_name, COLLECTION) 1704 continue 1705 1706 if state == BLOCKS: 1707 block_locator = self._block_re.match(tok) 1708 if block_locator: 1709 blocksize = int(block_locator.group(1)) 1710 blocks.append(streams.Range(tok, streamoffset, blocksize, 0)) 1711 streamoffset += blocksize 1712 else: 1713 state = SEGMENTS 1714 1715 if state == SEGMENTS: 1716 file_segment = self._segment_re.match(tok) 1717 if file_segment: 1718 pos = int(file_segment.group(1)) 1719 size = int(file_segment.group(2)) 1720 name = self._unescape_manifest_path(file_segment.group(3)) 1721 if name.split('/')[-1] == '.': 1722 # placeholder for persisting an empty directory, not a real file 1723 if len(name) > 2: 1724 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1725 else: 1726 filepath = os.path.join(stream_name, name) 1727 try: 1728 afile = self.find_or_create(filepath, FILE) 1729 except IOError as e: 1730 if e.errno == errno.ENOTDIR: 1731 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1732 else: 1733 raise e from None 1734 if isinstance(afile, ArvadosFile): 1735 afile.add_segment(blocks, pos, size) 1736 else: 1737 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1738 else: 1739 # error! 1740 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1741 1742 if sep == "\n": 1743 stream_name = None 1744 state = STREAM_NAME 1745 1746 self.set_committed(True) 1747 1748 @synchronized 1749 def notify( 1750 self, 1751 event: ChangeType, 1752 collection: 'RichCollectionBase', 1753 name: str, 1754 item: CollectionItem, 1755 ) -> None: 1756 if self._callback: 1757 self._callback(event, collection, name, item) 1758 1759 1760class Subcollection(RichCollectionBase): 1761 """Read and manipulate a stream/directory within an Arvados collection 1762 1763 This class represents a single stream (like a directory) within an Arvados 1764 `Collection`. It is returned by `Collection.find` and provides the same API. 1765 Operations that work on the API collection record propagate to the parent 1766 `Collection` object. 1767 """ 1768 1769 def __init__(self, parent, name): 1770 super(Subcollection, self).__init__(parent) 1771 self.lock = self.root_collection().lock 1772 self._manifest_text = None 1773 self.name = name 1774 self.num_retries = parent.num_retries 1775 1776 def root_collection(self) -> 'Collection': 1777 return self.parent.root_collection() 1778 1779 def writable(self) -> bool: 1780 return self.root_collection().writable() 1781 1782 def _my_api(self): 1783 return self.root_collection()._my_api() 1784 1785 def _my_keep(self): 1786 return self.root_collection()._my_keep() 1787 1788 def _my_block_manager(self): 1789 return self.root_collection()._my_block_manager() 1790 1791 def stream_name(self) -> str: 1792 return os.path.join(self.parent.stream_name(), self.name) 1793 1794 @synchronized 1795 def clone( 1796 self, 1797 new_parent: Optional['Collection']=None, 1798 new_name: Optional[str]=None, 1799 ) -> 'Subcollection': 1800 c = Subcollection(new_parent, new_name) 1801 c._clonefrom(self) 1802 return c 1803 1804 @must_be_writable 1805 @synchronized 1806 def _reparent(self, newparent, newname): 1807 self.set_committed(False) 1808 self.flush() 1809 self.parent.remove(self.name, recursive=True) 1810 self.parent = newparent 1811 self.name = newname 1812 self.lock = self.parent.root_collection().lock 1813 1814 @synchronized 1815 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1816 """Encode empty directories by using an \056-named (".") empty file""" 1817 if len(self._items) == 0: 1818 return "%s %s 0:0:\\056\n" % ( 1819 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1820 return super(Subcollection, self)._get_manifest_text(stream_name, 1821 strip, normalize, 1822 only_committed) 1823 1824 1825class CollectionReader(Collection): 1826 """Read-only `Collection` subclass 1827 1828 This class will never create or update any API collection records. You can 1829 use this class for additional code safety when you only need to read 1830 existing collections. 1831 """ 1832 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1833 self._in_init = True 1834 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1835 self._in_init = False 1836 1837 # Forego any locking since it should never change once initialized. 1838 self.lock = NoopLock() 1839 1840 # Backwards compatability with old CollectionReader 1841 # all_streams() and all_files() 1842 self._streams = None 1843 1844 def writable(self) -> bool: 1845 return self._in_init
Argument value for Collection
methods to represent an added item
Argument value for Collection
methods to represent a removed item
Argument value for Collection
methods to represent a modified item
Argument value for Collection
methods to represent an item with token differences
create_type
value for Collection.find_or_create
create_type
value for Collection.find_or_create
86class CollectionBase(object): 87 """Abstract base class for Collection classes 88 89 .. ATTENTION:: Internal 90 This class is meant to be used by other parts of the SDK. User code 91 should instantiate or subclass `Collection` or one of its subclasses 92 directly. 93 """ 94 95 def __enter__(self): 96 """Enter a context block with this collection instance""" 97 return self 98 99 def __exit__(self, exc_type, exc_value, traceback): 100 """Exit a context block with this collection instance""" 101 pass 102 103 def _my_keep(self): 104 if self._keep_client is None: 105 self._keep_client = KeepClient(api_client=self._api_client, 106 num_retries=self.num_retries) 107 return self._keep_client 108 109 def stripped_manifest(self) -> str: 110 """Create a copy of the collection manifest with only size hints 111 112 This method returns a string with the current collection's manifest 113 text with all non-portable locator hints like permission hints and 114 remote cluster hints removed. The only hints in the returned manifest 115 will be size hints. 116 """ 117 raw = self.manifest_text() 118 clean = [] 119 for line in raw.split("\n"): 120 fields = line.split() 121 if fields: 122 clean_fields = fields[:1] + [ 123 (re.sub(r'\+[^\d][^\+]*', '', x) 124 if re.match(arvados.util.keep_locator_pattern, x) 125 else x) 126 for x in fields[1:]] 127 clean += [' '.join(clean_fields), "\n"] 128 return ''.join(clean)
Abstract base class for Collection classes
109 def stripped_manifest(self) -> str: 110 """Create a copy of the collection manifest with only size hints 111 112 This method returns a string with the current collection's manifest 113 text with all non-portable locator hints like permission hints and 114 remote cluster hints removed. The only hints in the returned manifest 115 will be size hints. 116 """ 117 raw = self.manifest_text() 118 clean = [] 119 for line in raw.split("\n"): 120 fields = line.split() 121 if fields: 122 clean_fields = fields[:1] + [ 123 (re.sub(r'\+[^\d][^\+]*', '', x) 124 if re.match(arvados.util.keep_locator_pattern, x) 125 else x) 126 for x in fields[1:]] 127 clean += [' '.join(clean_fields), "\n"] 128 return ''.join(clean)
Create a copy of the collection manifest with only size hints
This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.
154class RichCollectionBase(CollectionBase): 155 """Base class for Collection classes 156 157 .. ATTENTION:: Internal 158 This class is meant to be used by other parts of the SDK. User code 159 should instantiate or subclass `Collection` or one of its subclasses 160 directly. 161 """ 162 163 def __init__(self, parent=None): 164 self.parent = parent 165 self._committed = False 166 self._has_remote_blocks = False 167 self._callback = None 168 self._items = {} 169 170 def _my_api(self): 171 raise NotImplementedError() 172 173 def _my_keep(self): 174 raise NotImplementedError() 175 176 def _my_block_manager(self): 177 raise NotImplementedError() 178 179 def writable(self) -> bool: 180 """Indicate whether this collection object can be modified 181 182 This method returns `False` if this object is a `CollectionReader`, 183 else `True`. 184 """ 185 raise NotImplementedError() 186 187 def root_collection(self) -> 'Collection': 188 """Get this collection's root collection object 189 190 If you open a subcollection with `Collection.find`, calling this method 191 on that subcollection returns the source Collection object. 192 """ 193 raise NotImplementedError() 194 195 def stream_name(self) -> str: 196 """Get the name of the manifest stream represented by this collection 197 198 If you open a subcollection with `Collection.find`, calling this method 199 on that subcollection returns the name of the stream you opened. 200 """ 201 raise NotImplementedError() 202 203 @synchronized 204 def has_remote_blocks(self) -> bool: 205 """Indiciate whether the collection refers to remote data 206 207 Returns `True` if the collection manifest includes any Keep locators 208 with a remote hint (`+R`), else `False`. 209 """ 210 if self._has_remote_blocks: 211 return True 212 for item in self: 213 if self[item].has_remote_blocks(): 214 return True 215 return False 216 217 @synchronized 218 def set_has_remote_blocks(self, val: bool) -> None: 219 """Cache whether this collection refers to remote blocks 220 221 .. ATTENTION:: Internal 222 This method is only meant to be used by other Collection methods. 223 224 Set this collection's cached "has remote blocks" flag to the given 225 value. 226 """ 227 self._has_remote_blocks = val 228 if self.parent: 229 self.parent.set_has_remote_blocks(val) 230 231 @must_be_writable 232 @synchronized 233 def find_or_create( 234 self, 235 path: str, 236 create_type: CreateType, 237 ) -> CollectionItem: 238 """Get the item at the given path, creating it if necessary 239 240 If `path` refers to a stream in this collection, returns a 241 corresponding `Subcollection` object. If `path` refers to a file in 242 this collection, returns a corresponding 243 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 244 this collection, then this method creates a new object and returns 245 it, creating parent streams as needed. The type of object created is 246 determined by the value of `create_type`. 247 248 Arguments: 249 250 * path: str --- The path to find or create within this collection. 251 252 * create_type: Literal[COLLECTION, FILE] --- The type of object to 253 create at `path` if one does not exist. Passing `COLLECTION` 254 creates a stream and returns the corresponding 255 `Subcollection`. Passing `FILE` creates a new file and returns the 256 corresponding `arvados.arvfile.ArvadosFile`. 257 """ 258 pathcomponents = path.split("/", 1) 259 if pathcomponents[0]: 260 item = self._items.get(pathcomponents[0]) 261 if len(pathcomponents) == 1: 262 if item is None: 263 # create new file 264 if create_type == COLLECTION: 265 item = Subcollection(self, pathcomponents[0]) 266 else: 267 item = ArvadosFile(self, pathcomponents[0]) 268 self._items[pathcomponents[0]] = item 269 self.set_committed(False) 270 self.notify(ADD, self, pathcomponents[0], item) 271 return item 272 else: 273 if item is None: 274 # create new collection 275 item = Subcollection(self, pathcomponents[0]) 276 self._items[pathcomponents[0]] = item 277 self.set_committed(False) 278 self.notify(ADD, self, pathcomponents[0], item) 279 if isinstance(item, RichCollectionBase): 280 return item.find_or_create(pathcomponents[1], create_type) 281 else: 282 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 283 else: 284 return self 285 286 @synchronized 287 def find(self, path: str) -> CollectionItem: 288 """Get the item at the given path 289 290 If `path` refers to a stream in this collection, returns a 291 corresponding `Subcollection` object. If `path` refers to a file in 292 this collection, returns a corresponding 293 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 294 this collection, then this method raises `NotADirectoryError`. 295 296 Arguments: 297 298 * path: str --- The path to find or create within this collection. 299 """ 300 if not path: 301 raise errors.ArgumentError("Parameter 'path' is empty.") 302 303 pathcomponents = path.split("/", 1) 304 if pathcomponents[0] == '': 305 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 306 307 item = self._items.get(pathcomponents[0]) 308 if item is None: 309 return None 310 elif len(pathcomponents) == 1: 311 return item 312 else: 313 if isinstance(item, RichCollectionBase): 314 if pathcomponents[1]: 315 return item.find(pathcomponents[1]) 316 else: 317 return item 318 else: 319 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 320 321 @synchronized 322 def mkdirs(self, path: str) -> 'Subcollection': 323 """Create and return a subcollection at `path` 324 325 If `path` exists within this collection, raises `FileExistsError`. 326 Otherwise, creates a stream at that path and returns the 327 corresponding `Subcollection`. 328 """ 329 if self.find(path) != None: 330 raise IOError(errno.EEXIST, "Directory or file exists", path) 331 332 return self.find_or_create(path, COLLECTION) 333 334 def open( 335 self, 336 path: str, 337 mode: str="r", 338 encoding: Optional[str]=None 339 ) -> IO: 340 """Open a file-like object within the collection 341 342 This method returns a file-like object that can read and/or write the 343 file located at `path` within the collection. If you attempt to write 344 a `path` that does not exist, the file is created with `find_or_create`. 345 If the file cannot be opened for any other reason, this method raises 346 `OSError` with an appropriate errno. 347 348 Arguments: 349 350 * path: str --- The path of the file to open within this collection 351 352 * mode: str --- The mode to open this file. Supports all the same 353 values as `builtins.open`. 354 355 * encoding: str | None --- The text encoding of the file. Only used 356 when the file is opened in text mode. The default is 357 platform-dependent. 358 359 """ 360 if not re.search(r'^[rwa][bt]?\+?$', mode): 361 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 362 363 if mode[0] == 'r' and '+' not in mode: 364 fclass = ArvadosFileReader 365 arvfile = self.find(path) 366 elif not self.writable(): 367 raise IOError(errno.EROFS, "Collection is read only") 368 else: 369 fclass = ArvadosFileWriter 370 arvfile = self.find_or_create(path, FILE) 371 372 if arvfile is None: 373 raise IOError(errno.ENOENT, "File not found", path) 374 if not isinstance(arvfile, ArvadosFile): 375 raise IOError(errno.EISDIR, "Is a directory", path) 376 377 if mode[0] == 'w': 378 arvfile.truncate(0) 379 380 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 381 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 382 if 'b' not in mode: 383 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 384 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 385 return f 386 387 def modified(self) -> bool: 388 """Indicate whether this collection has an API server record 389 390 Returns `False` if this collection corresponds to a record loaded from 391 the API server, `True` otherwise. 392 """ 393 return not self.committed() 394 395 @synchronized 396 def committed(self): 397 """Indicate whether this collection has an API server record 398 399 Returns `True` if this collection corresponds to a record loaded from 400 the API server, `False` otherwise. 401 """ 402 return self._committed 403 404 @synchronized 405 def set_committed(self, value: bool=True): 406 """Cache whether this collection has an API server record 407 408 .. ATTENTION:: Internal 409 This method is only meant to be used by other Collection methods. 410 411 Set this collection's cached "committed" flag to the given 412 value and propagates it as needed. 413 """ 414 if value == self._committed: 415 return 416 if value: 417 for k,v in self._items.items(): 418 v.set_committed(True) 419 self._committed = True 420 else: 421 self._committed = False 422 if self.parent is not None: 423 self.parent.set_committed(False) 424 425 @synchronized 426 def __iter__(self) -> Iterator[str]: 427 """Iterate names of streams and files in this collection 428 429 This method does not recurse. It only iterates the contents of this 430 collection's corresponding stream. 431 """ 432 return iter(self._items) 433 434 @synchronized 435 def __getitem__(self, k: str) -> CollectionItem: 436 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 437 438 This method does not recurse. If you want to search a path, use 439 `RichCollectionBase.find` instead. 440 """ 441 return self._items[k] 442 443 @synchronized 444 def __contains__(self, k: str) -> bool: 445 """Indicate whether this collection has an item with this name 446 447 This method does not recurse. It you want to check a path, use 448 `RichCollectionBase.exists` instead. 449 """ 450 return k in self._items 451 452 @synchronized 453 def __len__(self): 454 """Get the number of items directly contained in this collection 455 456 This method does not recurse. It only counts the streams and files 457 in this collection's corresponding stream. 458 """ 459 return len(self._items) 460 461 @must_be_writable 462 @synchronized 463 def __delitem__(self, p: str) -> None: 464 """Delete an item from this collection's stream 465 466 This method does not recurse. If you want to remove an item by a 467 path, use `RichCollectionBase.remove` instead. 468 """ 469 del self._items[p] 470 self.set_committed(False) 471 self.notify(DEL, self, p, None) 472 473 @synchronized 474 def keys(self) -> Iterator[str]: 475 """Iterate names of streams and files in this collection 476 477 This method does not recurse. It only iterates the contents of this 478 collection's corresponding stream. 479 """ 480 return self._items.keys() 481 482 @synchronized 483 def values(self) -> List[CollectionItem]: 484 """Get a list of objects in this collection's stream 485 486 The return value includes a `Subcollection` for every stream, and an 487 `arvados.arvfile.ArvadosFile` for every file, directly within this 488 collection's stream. This method does not recurse. 489 """ 490 return list(self._items.values()) 491 492 @synchronized 493 def items(self) -> List[Tuple[str, CollectionItem]]: 494 """Get a list of `(name, object)` tuples from this collection's stream 495 496 The return value includes a `Subcollection` for every stream, and an 497 `arvados.arvfile.ArvadosFile` for every file, directly within this 498 collection's stream. This method does not recurse. 499 """ 500 return list(self._items.items()) 501 502 def exists(self, path: str) -> bool: 503 """Indicate whether this collection includes an item at `path` 504 505 This method returns `True` if `path` refers to a stream or file within 506 this collection, else `False`. 507 508 Arguments: 509 510 * path: str --- The path to check for existence within this collection 511 """ 512 return self.find(path) is not None 513 514 @must_be_writable 515 @synchronized 516 def remove(self, path: str, recursive: bool=False) -> None: 517 """Remove the file or stream at `path` 518 519 Arguments: 520 521 * path: str --- The path of the item to remove from the collection 522 523 * recursive: bool --- Controls the method's behavior if `path` refers 524 to a nonempty stream. If `False` (the default), this method raises 525 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 526 items under the stream. 527 """ 528 if not path: 529 raise errors.ArgumentError("Parameter 'path' is empty.") 530 531 pathcomponents = path.split("/", 1) 532 item = self._items.get(pathcomponents[0]) 533 if item is None: 534 raise IOError(errno.ENOENT, "File not found", path) 535 if len(pathcomponents) == 1: 536 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 537 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 538 deleteditem = self._items[pathcomponents[0]] 539 del self._items[pathcomponents[0]] 540 self.set_committed(False) 541 self.notify(DEL, self, pathcomponents[0], deleteditem) 542 else: 543 item.remove(pathcomponents[1], recursive=recursive) 544 545 def _clonefrom(self, source): 546 for k,v in source.items(): 547 self._items[k] = v.clone(self, k) 548 549 def clone(self): 550 raise NotImplementedError() 551 552 @must_be_writable 553 @synchronized 554 def add( 555 self, 556 source_obj: CollectionItem, 557 target_name: str, 558 overwrite: bool=False, 559 reparent: bool=False, 560 ) -> None: 561 """Copy or move a file or subcollection object to this collection 562 563 Arguments: 564 565 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 566 to add to this collection 567 568 * target_name: str --- The path inside this collection where 569 `source_obj` should be added. 570 571 * overwrite: bool --- Controls the behavior of this method when the 572 collection already contains an object at `target_name`. If `False` 573 (the default), this method will raise `FileExistsError`. If `True`, 574 the object at `target_name` will be replaced with `source_obj`. 575 576 * reparent: bool --- Controls whether this method copies or moves 577 `source_obj`. If `False` (the default), `source_obj` is copied into 578 this collection. If `True`, `source_obj` is moved into this 579 collection. 580 """ 581 if target_name in self and not overwrite: 582 raise IOError(errno.EEXIST, "File already exists", target_name) 583 584 modified_from = None 585 if target_name in self: 586 modified_from = self[target_name] 587 588 # Actually make the move or copy. 589 if reparent: 590 source_obj._reparent(self, target_name) 591 item = source_obj 592 else: 593 item = source_obj.clone(self, target_name) 594 595 self._items[target_name] = item 596 self.set_committed(False) 597 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 598 self.set_has_remote_blocks(True) 599 600 if modified_from: 601 self.notify(MOD, self, target_name, (modified_from, item)) 602 else: 603 self.notify(ADD, self, target_name, item) 604 605 def _get_src_target(self, source, target_path, source_collection, create_dest): 606 if source_collection is None: 607 source_collection = self 608 609 # Find the object 610 if isinstance(source, str): 611 source_obj = source_collection.find(source) 612 if source_obj is None: 613 raise IOError(errno.ENOENT, "File not found", source) 614 sourcecomponents = source.split("/") 615 else: 616 source_obj = source 617 sourcecomponents = None 618 619 # Find parent collection the target path 620 targetcomponents = target_path.split("/") 621 622 # Determine the name to use. 623 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 624 625 if not target_name: 626 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 627 628 if create_dest: 629 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 630 else: 631 if len(targetcomponents) > 1: 632 target_dir = self.find("/".join(targetcomponents[0:-1])) 633 else: 634 target_dir = self 635 636 if target_dir is None: 637 raise IOError(errno.ENOENT, "Target directory not found", target_name) 638 639 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 640 target_dir = target_dir[target_name] 641 target_name = sourcecomponents[-1] 642 643 return (source_obj, target_dir, target_name) 644 645 @must_be_writable 646 @synchronized 647 def copy( 648 self, 649 source: Union[str, CollectionItem], 650 target_path: str, 651 source_collection: Optional['RichCollectionBase']=None, 652 overwrite: bool=False, 653 ) -> None: 654 """Copy a file or subcollection object to this collection 655 656 Arguments: 657 658 * source: str | arvados.arvfile.ArvadosFile | 659 arvados.collection.Subcollection --- The file or subcollection to 660 add to this collection. If `source` is a str, the object will be 661 found by looking up this path from `source_collection` (see 662 below). 663 664 * target_path: str --- The path inside this collection where the 665 source object should be added. 666 667 * source_collection: arvados.collection.Collection | None --- The 668 collection to find the source object from when `source` is a 669 path. Defaults to the current collection (`self`). 670 671 * overwrite: bool --- Controls the behavior of this method when the 672 collection already contains an object at `target_path`. If `False` 673 (the default), this method will raise `FileExistsError`. If `True`, 674 the object at `target_path` will be replaced with `source_obj`. 675 """ 676 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 677 target_dir.add(source_obj, target_name, overwrite, False) 678 679 @must_be_writable 680 @synchronized 681 def rename( 682 self, 683 source: Union[str, CollectionItem], 684 target_path: str, 685 source_collection: Optional['RichCollectionBase']=None, 686 overwrite: bool=False, 687 ) -> None: 688 """Move a file or subcollection object to this collection 689 690 Arguments: 691 692 * source: str | arvados.arvfile.ArvadosFile | 693 arvados.collection.Subcollection --- The file or subcollection to 694 add to this collection. If `source` is a str, the object will be 695 found by looking up this path from `source_collection` (see 696 below). 697 698 * target_path: str --- The path inside this collection where the 699 source object should be added. 700 701 * source_collection: arvados.collection.Collection | None --- The 702 collection to find the source object from when `source` is a 703 path. Defaults to the current collection (`self`). 704 705 * overwrite: bool --- Controls the behavior of this method when the 706 collection already contains an object at `target_path`. If `False` 707 (the default), this method will raise `FileExistsError`. If `True`, 708 the object at `target_path` will be replaced with `source_obj`. 709 """ 710 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 711 if not source_obj.writable(): 712 raise IOError(errno.EROFS, "Source collection is read only", source) 713 target_dir.add(source_obj, target_name, overwrite, True) 714 715 def portable_manifest_text(self, stream_name: str=".") -> str: 716 """Get the portable manifest text for this collection 717 718 The portable manifest text is normalized, and does not include access 719 tokens. This method does not flush outstanding blocks to Keep. 720 721 Arguments: 722 723 * stream_name: str --- The name to use for this collection's stream in 724 the generated manifest. Default `'.'`. 725 """ 726 return self._get_manifest_text(stream_name, True, True) 727 728 @synchronized 729 def manifest_text( 730 self, 731 stream_name: str=".", 732 strip: bool=False, 733 normalize: bool=False, 734 only_committed: bool=False, 735 ) -> str: 736 """Get the manifest text for this collection 737 738 Arguments: 739 740 * stream_name: str --- The name to use for this collection's stream in 741 the generated manifest. Default `'.'`. 742 743 * strip: bool --- Controls whether or not the returned manifest text 744 includes access tokens. If `False` (the default), the manifest text 745 will include access tokens. If `True`, the manifest text will not 746 include access tokens. 747 748 * normalize: bool --- Controls whether or not the returned manifest 749 text is normalized. Default `False`. 750 751 * only_committed: bool --- Controls whether or not this method uploads 752 pending data to Keep before building and returning the manifest text. 753 If `False` (the default), this method will finish uploading all data 754 to Keep, then return the final manifest. If `True`, this method will 755 build and return a manifest that only refers to the data that has 756 finished uploading at the time this method was called. 757 """ 758 if not only_committed: 759 self._my_block_manager().commit_all() 760 return self._get_manifest_text(stream_name, strip, normalize, 761 only_committed=only_committed) 762 763 @synchronized 764 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 765 """Get the manifest text for this collection, sub collections and files. 766 767 :stream_name: 768 Name to use for this stream (directory) 769 770 :strip: 771 If True, remove signing tokens from block locators if present. 772 If False (default), block locators are left unchanged. 773 774 :normalize: 775 If True, always export the manifest text in normalized form 776 even if the Collection is not modified. If False (default) and the collection 777 is not modified, return the original manifest text even if it is not 778 in normalized form. 779 780 :only_committed: 781 If True, only include blocks that were already committed to Keep. 782 783 """ 784 785 if not self.committed() or self._manifest_text is None or normalize: 786 stream = {} 787 buf = [] 788 sorted_keys = sorted(self.keys()) 789 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 790 # Create a stream per file `k` 791 arvfile = self[filename] 792 filestream = [] 793 for segment in arvfile.segments(): 794 loc = segment.locator 795 if arvfile.parent._my_block_manager().is_bufferblock(loc): 796 if only_committed: 797 continue 798 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 799 if strip: 800 loc = KeepLocator(loc).stripped() 801 filestream.append(streams.LocatorAndRange( 802 loc, 803 KeepLocator(loc).size, 804 segment.segment_offset, 805 segment.range_size, 806 )) 807 stream[filename] = filestream 808 if stream: 809 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n") 810 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 811 buf.append(self[dirname].manifest_text( 812 stream_name=os.path.join(stream_name, dirname), 813 strip=strip, normalize=True, only_committed=only_committed)) 814 return "".join(buf) 815 else: 816 if strip: 817 return self.stripped_manifest() 818 else: 819 return self._manifest_text 820 821 @synchronized 822 def _copy_remote_blocks(self, remote_blocks={}): 823 """Scan through the entire collection and ask Keep to copy remote blocks. 824 825 When accessing a remote collection, blocks will have a remote signature 826 (+R instead of +A). Collect these signatures and request Keep to copy the 827 blocks to the local cluster, returning local (+A) signatures. 828 829 :remote_blocks: 830 Shared cache of remote to local block mappings. This is used to avoid 831 doing extra work when blocks are shared by more than one file in 832 different subdirectories. 833 834 """ 835 for item in self: 836 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 837 return remote_blocks 838 839 @synchronized 840 def diff( 841 self, 842 end_collection: 'RichCollectionBase', 843 prefix: str=".", 844 holding_collection: Optional['Collection']=None, 845 ) -> ChangeList: 846 """Build a list of differences between this collection and another 847 848 Arguments: 849 850 * end_collection: arvados.collection.RichCollectionBase --- A 851 collection object with the desired end state. The returned diff 852 list will describe how to go from the current collection object 853 `self` to `end_collection`. 854 855 * prefix: str --- The name to use for this collection's stream in 856 the diff list. Default `'.'`. 857 858 * holding_collection: arvados.collection.Collection | None --- A 859 collection object used to hold objects for the returned diff 860 list. By default, a new empty collection is created. 861 """ 862 changes = [] 863 if holding_collection is None: 864 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 865 for k in self: 866 if k not in end_collection: 867 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 868 for k in end_collection: 869 if k in self: 870 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 871 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 872 elif end_collection[k] != self[k]: 873 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 874 else: 875 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 876 else: 877 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 878 return changes 879 880 @must_be_writable 881 @synchronized 882 def apply(self, changes: ChangeList) -> None: 883 """Apply a list of changes from to this collection 884 885 This method takes a list of changes generated by 886 `RichCollectionBase.diff` and applies it to this 887 collection. Afterward, the state of this collection object will 888 match the state of `end_collection` passed to `diff`. If a change 889 conflicts with a local change, it will be saved to an alternate path 890 indicating the conflict. 891 892 Arguments: 893 894 * changes: arvados.collection.ChangeList --- The list of differences 895 generated by `RichCollectionBase.diff`. 896 """ 897 if changes: 898 self.set_committed(False) 899 for change in changes: 900 event_type = change[0] 901 path = change[1] 902 initial = change[2] 903 local = self.find(path) 904 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 905 time.gmtime())) 906 if event_type == ADD: 907 if local is None: 908 # No local file at path, safe to copy over new file 909 self.copy(initial, path) 910 elif local is not None and local != initial: 911 # There is already local file and it is different: 912 # save change to conflict file. 913 self.copy(initial, conflictpath) 914 elif event_type == MOD or event_type == TOK: 915 final = change[3] 916 if local == initial: 917 # Local matches the "initial" item so it has not 918 # changed locally and is safe to update. 919 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 920 # Replace contents of local file with new contents 921 local.replace_contents(final) 922 else: 923 # Overwrite path with new item; this can happen if 924 # path was a file and is now a collection or vice versa 925 self.copy(final, path, overwrite=True) 926 else: 927 # Local is missing (presumably deleted) or local doesn't 928 # match the "start" value, so save change to conflict file 929 self.copy(final, conflictpath) 930 elif event_type == DEL: 931 if local == initial: 932 # Local item matches "initial" value, so it is safe to remove. 933 self.remove(path, recursive=True) 934 # else, the file is modified or already removed, in either 935 # case we don't want to try to remove it. 936 937 def portable_data_hash(self) -> str: 938 """Get the portable data hash for this collection's manifest""" 939 if self._manifest_locator and self.committed(): 940 # If the collection is already saved on the API server, and it's committed 941 # then return API server's PDH response. 942 return self._portable_data_hash 943 else: 944 stripped = self.portable_manifest_text().encode() 945 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 946 947 @synchronized 948 def subscribe(self, callback: ChangeCallback) -> None: 949 """Set a notify callback for changes to this collection 950 951 Arguments: 952 953 * callback: arvados.collection.ChangeCallback --- The callable to 954 call each time the collection is changed. 955 """ 956 if self._callback is None: 957 self._callback = callback 958 else: 959 raise errors.ArgumentError("A callback is already set on this collection.") 960 961 @synchronized 962 def unsubscribe(self) -> None: 963 """Remove any notify callback set for changes to this collection""" 964 if self._callback is not None: 965 self._callback = None 966 967 @synchronized 968 def notify( 969 self, 970 event: ChangeType, 971 collection: 'RichCollectionBase', 972 name: str, 973 item: CollectionItem, 974 ) -> None: 975 """Notify any subscribed callback about a change to this collection 976 977 .. ATTENTION:: Internal 978 This method is only meant to be used by other Collection methods. 979 980 If a callback has been registered with `RichCollectionBase.subscribe`, 981 it will be called with information about a change to this collection. 982 Then this notification will be propagated to this collection's root. 983 984 Arguments: 985 986 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 987 the collection. 988 989 * collection: arvados.collection.RichCollectionBase --- The 990 collection that was modified. 991 992 * name: str --- The name of the file or stream within `collection` that 993 was modified. 994 995 * item: arvados.arvfile.ArvadosFile | 996 arvados.collection.Subcollection --- The new contents at `name` 997 within `collection`. 998 """ 999 if self._callback: 1000 self._callback(event, collection, name, item) 1001 self.root_collection().notify(event, collection, name, item) 1002 1003 @synchronized 1004 def __eq__(self, other: Any) -> bool: 1005 """Indicate whether this collection object is equal to another""" 1006 if other is self: 1007 return True 1008 if not isinstance(other, RichCollectionBase): 1009 return False 1010 if len(self._items) != len(other): 1011 return False 1012 for k in self._items: 1013 if k not in other: 1014 return False 1015 if self._items[k] != other[k]: 1016 return False 1017 return True 1018 1019 def __ne__(self, other: Any) -> bool: 1020 """Indicate whether this collection object is not equal to another""" 1021 return not self.__eq__(other) 1022 1023 @synchronized 1024 def flush(self) -> None: 1025 """Upload any pending data to Keep""" 1026 for e in self.values(): 1027 e.flush()
Base class for Collection classes
179 def writable(self) -> bool: 180 """Indicate whether this collection object can be modified 181 182 This method returns `False` if this object is a `CollectionReader`, 183 else `True`. 184 """ 185 raise NotImplementedError()
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
187 def root_collection(self) -> 'Collection': 188 """Get this collection's root collection object 189 190 If you open a subcollection with `Collection.find`, calling this method 191 on that subcollection returns the source Collection object. 192 """ 193 raise NotImplementedError()
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
195 def stream_name(self) -> str: 196 """Get the name of the manifest stream represented by this collection 197 198 If you open a subcollection with `Collection.find`, calling this method 199 on that subcollection returns the name of the stream you opened. 200 """ 201 raise NotImplementedError()
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
203 @synchronized 204 def has_remote_blocks(self) -> bool: 205 """Indiciate whether the collection refers to remote data 206 207 Returns `True` if the collection manifest includes any Keep locators 208 with a remote hint (`+R`), else `False`. 209 """ 210 if self._has_remote_blocks: 211 return True 212 for item in self: 213 if self[item].has_remote_blocks(): 214 return True 215 return False
Indiciate whether the collection refers to remote data
Returns True
if the collection manifest includes any Keep locators
with a remote hint (+R
), else False
.
217 @synchronized 218 def set_has_remote_blocks(self, val: bool) -> None: 219 """Cache whether this collection refers to remote blocks 220 221 .. ATTENTION:: Internal 222 This method is only meant to be used by other Collection methods. 223 224 Set this collection's cached "has remote blocks" flag to the given 225 value. 226 """ 227 self._has_remote_blocks = val 228 if self.parent: 229 self.parent.set_has_remote_blocks(val)
Cache whether this collection refers to remote blocks
Set this collection’s cached “has remote blocks” flag to the given value.
231 @must_be_writable 232 @synchronized 233 def find_or_create( 234 self, 235 path: str, 236 create_type: CreateType, 237 ) -> CollectionItem: 238 """Get the item at the given path, creating it if necessary 239 240 If `path` refers to a stream in this collection, returns a 241 corresponding `Subcollection` object. If `path` refers to a file in 242 this collection, returns a corresponding 243 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 244 this collection, then this method creates a new object and returns 245 it, creating parent streams as needed. The type of object created is 246 determined by the value of `create_type`. 247 248 Arguments: 249 250 * path: str --- The path to find or create within this collection. 251 252 * create_type: Literal[COLLECTION, FILE] --- The type of object to 253 create at `path` if one does not exist. Passing `COLLECTION` 254 creates a stream and returns the corresponding 255 `Subcollection`. Passing `FILE` creates a new file and returns the 256 corresponding `arvados.arvfile.ArvadosFile`. 257 """ 258 pathcomponents = path.split("/", 1) 259 if pathcomponents[0]: 260 item = self._items.get(pathcomponents[0]) 261 if len(pathcomponents) == 1: 262 if item is None: 263 # create new file 264 if create_type == COLLECTION: 265 item = Subcollection(self, pathcomponents[0]) 266 else: 267 item = ArvadosFile(self, pathcomponents[0]) 268 self._items[pathcomponents[0]] = item 269 self.set_committed(False) 270 self.notify(ADD, self, pathcomponents[0], item) 271 return item 272 else: 273 if item is None: 274 # create new collection 275 item = Subcollection(self, pathcomponents[0]) 276 self._items[pathcomponents[0]] = item 277 self.set_committed(False) 278 self.notify(ADD, self, pathcomponents[0], item) 279 if isinstance(item, RichCollectionBase): 280 return item.find_or_create(pathcomponents[1], create_type) 281 else: 282 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 283 else: 284 return self
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
286 @synchronized 287 def find(self, path: str) -> CollectionItem: 288 """Get the item at the given path 289 290 If `path` refers to a stream in this collection, returns a 291 corresponding `Subcollection` object. If `path` refers to a file in 292 this collection, returns a corresponding 293 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 294 this collection, then this method raises `NotADirectoryError`. 295 296 Arguments: 297 298 * path: str --- The path to find or create within this collection. 299 """ 300 if not path: 301 raise errors.ArgumentError("Parameter 'path' is empty.") 302 303 pathcomponents = path.split("/", 1) 304 if pathcomponents[0] == '': 305 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 306 307 item = self._items.get(pathcomponents[0]) 308 if item is None: 309 return None 310 elif len(pathcomponents) == 1: 311 return item 312 else: 313 if isinstance(item, RichCollectionBase): 314 if pathcomponents[1]: 315 return item.find(pathcomponents[1]) 316 else: 317 return item 318 else: 319 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
321 @synchronized 322 def mkdirs(self, path: str) -> 'Subcollection': 323 """Create and return a subcollection at `path` 324 325 If `path` exists within this collection, raises `FileExistsError`. 326 Otherwise, creates a stream at that path and returns the 327 corresponding `Subcollection`. 328 """ 329 if self.find(path) != None: 330 raise IOError(errno.EEXIST, "Directory or file exists", path) 331 332 return self.find_or_create(path, COLLECTION)
Create and return a subcollection at path
If path
exists within this collection, raises FileExistsError
.
Otherwise, creates a stream at that path and returns the
corresponding Subcollection
.
334 def open( 335 self, 336 path: str, 337 mode: str="r", 338 encoding: Optional[str]=None 339 ) -> IO: 340 """Open a file-like object within the collection 341 342 This method returns a file-like object that can read and/or write the 343 file located at `path` within the collection. If you attempt to write 344 a `path` that does not exist, the file is created with `find_or_create`. 345 If the file cannot be opened for any other reason, this method raises 346 `OSError` with an appropriate errno. 347 348 Arguments: 349 350 * path: str --- The path of the file to open within this collection 351 352 * mode: str --- The mode to open this file. Supports all the same 353 values as `builtins.open`. 354 355 * encoding: str | None --- The text encoding of the file. Only used 356 when the file is opened in text mode. The default is 357 platform-dependent. 358 359 """ 360 if not re.search(r'^[rwa][bt]?\+?$', mode): 361 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 362 363 if mode[0] == 'r' and '+' not in mode: 364 fclass = ArvadosFileReader 365 arvfile = self.find(path) 366 elif not self.writable(): 367 raise IOError(errno.EROFS, "Collection is read only") 368 else: 369 fclass = ArvadosFileWriter 370 arvfile = self.find_or_create(path, FILE) 371 372 if arvfile is None: 373 raise IOError(errno.ENOENT, "File not found", path) 374 if not isinstance(arvfile, ArvadosFile): 375 raise IOError(errno.EISDIR, "Is a directory", path) 376 377 if mode[0] == 'w': 378 arvfile.truncate(0) 379 380 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 381 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 382 if 'b' not in mode: 383 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 384 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 385 return f
Open a file-like object within the collection
This method returns a file-like object that can read and/or write the
file located at path
within the collection. If you attempt to write
a path
that does not exist, the file is created with find_or_create
.
If the file cannot be opened for any other reason, this method raises
OSError
with an appropriate errno.
Arguments:
path: str — The path of the file to open within this collection
mode: str — The mode to open this file. Supports all the same values as
builtins.open
.encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.
387 def modified(self) -> bool: 388 """Indicate whether this collection has an API server record 389 390 Returns `False` if this collection corresponds to a record loaded from 391 the API server, `True` otherwise. 392 """ 393 return not self.committed()
Indicate whether this collection has an API server record
Returns False
if this collection corresponds to a record loaded from
the API server, True
otherwise.
395 @synchronized 396 def committed(self): 397 """Indicate whether this collection has an API server record 398 399 Returns `True` if this collection corresponds to a record loaded from 400 the API server, `False` otherwise. 401 """ 402 return self._committed
Indicate whether this collection has an API server record
Returns True
if this collection corresponds to a record loaded from
the API server, False
otherwise.
404 @synchronized 405 def set_committed(self, value: bool=True): 406 """Cache whether this collection has an API server record 407 408 .. ATTENTION:: Internal 409 This method is only meant to be used by other Collection methods. 410 411 Set this collection's cached "committed" flag to the given 412 value and propagates it as needed. 413 """ 414 if value == self._committed: 415 return 416 if value: 417 for k,v in self._items.items(): 418 v.set_committed(True) 419 self._committed = True 420 else: 421 self._committed = False 422 if self.parent is not None: 423 self.parent.set_committed(False)
Cache whether this collection has an API server record
Set this collection’s cached “committed” flag to the given value and propagates it as needed.
473 @synchronized 474 def keys(self) -> Iterator[str]: 475 """Iterate names of streams and files in this collection 476 477 This method does not recurse. It only iterates the contents of this 478 collection's corresponding stream. 479 """ 480 return self._items.keys()
Iterate names of streams and files in this collection
This method does not recurse. It only iterates the contents of this collection’s corresponding stream.
482 @synchronized 483 def values(self) -> List[CollectionItem]: 484 """Get a list of objects in this collection's stream 485 486 The return value includes a `Subcollection` for every stream, and an 487 `arvados.arvfile.ArvadosFile` for every file, directly within this 488 collection's stream. This method does not recurse. 489 """ 490 return list(self._items.values())
Get a list of objects in this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
492 @synchronized 493 def items(self) -> List[Tuple[str, CollectionItem]]: 494 """Get a list of `(name, object)` tuples from this collection's stream 495 496 The return value includes a `Subcollection` for every stream, and an 497 `arvados.arvfile.ArvadosFile` for every file, directly within this 498 collection's stream. This method does not recurse. 499 """ 500 return list(self._items.items())
Get a list of (name, object)
tuples from this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
502 def exists(self, path: str) -> bool: 503 """Indicate whether this collection includes an item at `path` 504 505 This method returns `True` if `path` refers to a stream or file within 506 this collection, else `False`. 507 508 Arguments: 509 510 * path: str --- The path to check for existence within this collection 511 """ 512 return self.find(path) is not None
Indicate whether this collection includes an item at path
This method returns True
if path
refers to a stream or file within
this collection, else False
.
Arguments:
- path: str — The path to check for existence within this collection
514 @must_be_writable 515 @synchronized 516 def remove(self, path: str, recursive: bool=False) -> None: 517 """Remove the file or stream at `path` 518 519 Arguments: 520 521 * path: str --- The path of the item to remove from the collection 522 523 * recursive: bool --- Controls the method's behavior if `path` refers 524 to a nonempty stream. If `False` (the default), this method raises 525 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 526 items under the stream. 527 """ 528 if not path: 529 raise errors.ArgumentError("Parameter 'path' is empty.") 530 531 pathcomponents = path.split("/", 1) 532 item = self._items.get(pathcomponents[0]) 533 if item is None: 534 raise IOError(errno.ENOENT, "File not found", path) 535 if len(pathcomponents) == 1: 536 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 537 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 538 deleteditem = self._items[pathcomponents[0]] 539 del self._items[pathcomponents[0]] 540 self.set_committed(False) 541 self.notify(DEL, self, pathcomponents[0], deleteditem) 542 else: 543 item.remove(pathcomponents[1], recursive=recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
552 @must_be_writable 553 @synchronized 554 def add( 555 self, 556 source_obj: CollectionItem, 557 target_name: str, 558 overwrite: bool=False, 559 reparent: bool=False, 560 ) -> None: 561 """Copy or move a file or subcollection object to this collection 562 563 Arguments: 564 565 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 566 to add to this collection 567 568 * target_name: str --- The path inside this collection where 569 `source_obj` should be added. 570 571 * overwrite: bool --- Controls the behavior of this method when the 572 collection already contains an object at `target_name`. If `False` 573 (the default), this method will raise `FileExistsError`. If `True`, 574 the object at `target_name` will be replaced with `source_obj`. 575 576 * reparent: bool --- Controls whether this method copies or moves 577 `source_obj`. If `False` (the default), `source_obj` is copied into 578 this collection. If `True`, `source_obj` is moved into this 579 collection. 580 """ 581 if target_name in self and not overwrite: 582 raise IOError(errno.EEXIST, "File already exists", target_name) 583 584 modified_from = None 585 if target_name in self: 586 modified_from = self[target_name] 587 588 # Actually make the move or copy. 589 if reparent: 590 source_obj._reparent(self, target_name) 591 item = source_obj 592 else: 593 item = source_obj.clone(self, target_name) 594 595 self._items[target_name] = item 596 self.set_committed(False) 597 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 598 self.set_has_remote_blocks(True) 599 600 if modified_from: 601 self.notify(MOD, self, target_name, (modified_from, item)) 602 else: 603 self.notify(ADD, self, target_name, item)
Copy or move a file or subcollection object to this collection
Arguments:
source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection
target_name: str — The path inside this collection where
source_obj
should be added.overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_name
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_name
will be replaced withsource_obj
.reparent: bool — Controls whether this method copies or moves
source_obj
. IfFalse
(the default),source_obj
is copied into this collection. IfTrue
,source_obj
is moved into this collection.
645 @must_be_writable 646 @synchronized 647 def copy( 648 self, 649 source: Union[str, CollectionItem], 650 target_path: str, 651 source_collection: Optional['RichCollectionBase']=None, 652 overwrite: bool=False, 653 ) -> None: 654 """Copy a file or subcollection object to this collection 655 656 Arguments: 657 658 * source: str | arvados.arvfile.ArvadosFile | 659 arvados.collection.Subcollection --- The file or subcollection to 660 add to this collection. If `source` is a str, the object will be 661 found by looking up this path from `source_collection` (see 662 below). 663 664 * target_path: str --- The path inside this collection where the 665 source object should be added. 666 667 * source_collection: arvados.collection.Collection | None --- The 668 collection to find the source object from when `source` is a 669 path. Defaults to the current collection (`self`). 670 671 * overwrite: bool --- Controls the behavior of this method when the 672 collection already contains an object at `target_path`. If `False` 673 (the default), this method will raise `FileExistsError`. If `True`, 674 the object at `target_path` will be replaced with `source_obj`. 675 """ 676 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 677 target_dir.add(source_obj, target_name, overwrite, False)
Copy a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
679 @must_be_writable 680 @synchronized 681 def rename( 682 self, 683 source: Union[str, CollectionItem], 684 target_path: str, 685 source_collection: Optional['RichCollectionBase']=None, 686 overwrite: bool=False, 687 ) -> None: 688 """Move a file or subcollection object to this collection 689 690 Arguments: 691 692 * source: str | arvados.arvfile.ArvadosFile | 693 arvados.collection.Subcollection --- The file or subcollection to 694 add to this collection. If `source` is a str, the object will be 695 found by looking up this path from `source_collection` (see 696 below). 697 698 * target_path: str --- The path inside this collection where the 699 source object should be added. 700 701 * source_collection: arvados.collection.Collection | None --- The 702 collection to find the source object from when `source` is a 703 path. Defaults to the current collection (`self`). 704 705 * overwrite: bool --- Controls the behavior of this method when the 706 collection already contains an object at `target_path`. If `False` 707 (the default), this method will raise `FileExistsError`. If `True`, 708 the object at `target_path` will be replaced with `source_obj`. 709 """ 710 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 711 if not source_obj.writable(): 712 raise IOError(errno.EROFS, "Source collection is read only", source) 713 target_dir.add(source_obj, target_name, overwrite, True)
Move a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
715 def portable_manifest_text(self, stream_name: str=".") -> str: 716 """Get the portable manifest text for this collection 717 718 The portable manifest text is normalized, and does not include access 719 tokens. This method does not flush outstanding blocks to Keep. 720 721 Arguments: 722 723 * stream_name: str --- The name to use for this collection's stream in 724 the generated manifest. Default `'.'`. 725 """ 726 return self._get_manifest_text(stream_name, True, True)
Get the portable manifest text for this collection
The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.
Arguments:
- stream_name: str — The name to use for this collection’s stream in
the generated manifest. Default
'.'
.
728 @synchronized 729 def manifest_text( 730 self, 731 stream_name: str=".", 732 strip: bool=False, 733 normalize: bool=False, 734 only_committed: bool=False, 735 ) -> str: 736 """Get the manifest text for this collection 737 738 Arguments: 739 740 * stream_name: str --- The name to use for this collection's stream in 741 the generated manifest. Default `'.'`. 742 743 * strip: bool --- Controls whether or not the returned manifest text 744 includes access tokens. If `False` (the default), the manifest text 745 will include access tokens. If `True`, the manifest text will not 746 include access tokens. 747 748 * normalize: bool --- Controls whether or not the returned manifest 749 text is normalized. Default `False`. 750 751 * only_committed: bool --- Controls whether or not this method uploads 752 pending data to Keep before building and returning the manifest text. 753 If `False` (the default), this method will finish uploading all data 754 to Keep, then return the final manifest. If `True`, this method will 755 build and return a manifest that only refers to the data that has 756 finished uploading at the time this method was called. 757 """ 758 if not only_committed: 759 self._my_block_manager().commit_all() 760 return self._get_manifest_text(stream_name, strip, normalize, 761 only_committed=only_committed)
Get the manifest text for this collection
Arguments:
stream_name: str — The name to use for this collection’s stream in the generated manifest. Default
'.'
.strip: bool — Controls whether or not the returned manifest text includes access tokens. If
False
(the default), the manifest text will include access tokens. IfTrue
, the manifest text will not include access tokens.normalize: bool — Controls whether or not the returned manifest text is normalized. Default
False
.only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If
False
(the default), this method will finish uploading all data to Keep, then return the final manifest. IfTrue
, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.
839 @synchronized 840 def diff( 841 self, 842 end_collection: 'RichCollectionBase', 843 prefix: str=".", 844 holding_collection: Optional['Collection']=None, 845 ) -> ChangeList: 846 """Build a list of differences between this collection and another 847 848 Arguments: 849 850 * end_collection: arvados.collection.RichCollectionBase --- A 851 collection object with the desired end state. The returned diff 852 list will describe how to go from the current collection object 853 `self` to `end_collection`. 854 855 * prefix: str --- The name to use for this collection's stream in 856 the diff list. Default `'.'`. 857 858 * holding_collection: arvados.collection.Collection | None --- A 859 collection object used to hold objects for the returned diff 860 list. By default, a new empty collection is created. 861 """ 862 changes = [] 863 if holding_collection is None: 864 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 865 for k in self: 866 if k not in end_collection: 867 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 868 for k in end_collection: 869 if k in self: 870 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 871 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 872 elif end_collection[k] != self[k]: 873 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 874 else: 875 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 876 else: 877 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 878 return changes
Build a list of differences between this collection and another
Arguments:
end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object
self
toend_collection
.prefix: str — The name to use for this collection’s stream in the diff list. Default
'.'
.holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.
880 @must_be_writable 881 @synchronized 882 def apply(self, changes: ChangeList) -> None: 883 """Apply a list of changes from to this collection 884 885 This method takes a list of changes generated by 886 `RichCollectionBase.diff` and applies it to this 887 collection. Afterward, the state of this collection object will 888 match the state of `end_collection` passed to `diff`. If a change 889 conflicts with a local change, it will be saved to an alternate path 890 indicating the conflict. 891 892 Arguments: 893 894 * changes: arvados.collection.ChangeList --- The list of differences 895 generated by `RichCollectionBase.diff`. 896 """ 897 if changes: 898 self.set_committed(False) 899 for change in changes: 900 event_type = change[0] 901 path = change[1] 902 initial = change[2] 903 local = self.find(path) 904 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 905 time.gmtime())) 906 if event_type == ADD: 907 if local is None: 908 # No local file at path, safe to copy over new file 909 self.copy(initial, path) 910 elif local is not None and local != initial: 911 # There is already local file and it is different: 912 # save change to conflict file. 913 self.copy(initial, conflictpath) 914 elif event_type == MOD or event_type == TOK: 915 final = change[3] 916 if local == initial: 917 # Local matches the "initial" item so it has not 918 # changed locally and is safe to update. 919 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 920 # Replace contents of local file with new contents 921 local.replace_contents(final) 922 else: 923 # Overwrite path with new item; this can happen if 924 # path was a file and is now a collection or vice versa 925 self.copy(final, path, overwrite=True) 926 else: 927 # Local is missing (presumably deleted) or local doesn't 928 # match the "start" value, so save change to conflict file 929 self.copy(final, conflictpath) 930 elif event_type == DEL: 931 if local == initial: 932 # Local item matches "initial" value, so it is safe to remove. 933 self.remove(path, recursive=True) 934 # else, the file is modified or already removed, in either 935 # case we don't want to try to remove it.
Apply a list of changes from to this collection
This method takes a list of changes generated by
RichCollectionBase.diff
and applies it to this
collection. Afterward, the state of this collection object will
match the state of end_collection
passed to diff
. If a change
conflicts with a local change, it will be saved to an alternate path
indicating the conflict.
Arguments:
- changes: arvados.collection.ChangeList — The list of differences
generated by
RichCollectionBase.diff
.
937 def portable_data_hash(self) -> str: 938 """Get the portable data hash for this collection's manifest""" 939 if self._manifest_locator and self.committed(): 940 # If the collection is already saved on the API server, and it's committed 941 # then return API server's PDH response. 942 return self._portable_data_hash 943 else: 944 stripped = self.portable_manifest_text().encode() 945 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
Get the portable data hash for this collection’s manifest
947 @synchronized 948 def subscribe(self, callback: ChangeCallback) -> None: 949 """Set a notify callback for changes to this collection 950 951 Arguments: 952 953 * callback: arvados.collection.ChangeCallback --- The callable to 954 call each time the collection is changed. 955 """ 956 if self._callback is None: 957 self._callback = callback 958 else: 959 raise errors.ArgumentError("A callback is already set on this collection.")
Set a notify callback for changes to this collection
Arguments:
- callback: arvados.collection.ChangeCallback — The callable to call each time the collection is changed.
961 @synchronized 962 def unsubscribe(self) -> None: 963 """Remove any notify callback set for changes to this collection""" 964 if self._callback is not None: 965 self._callback = None
Remove any notify callback set for changes to this collection
967 @synchronized 968 def notify( 969 self, 970 event: ChangeType, 971 collection: 'RichCollectionBase', 972 name: str, 973 item: CollectionItem, 974 ) -> None: 975 """Notify any subscribed callback about a change to this collection 976 977 .. ATTENTION:: Internal 978 This method is only meant to be used by other Collection methods. 979 980 If a callback has been registered with `RichCollectionBase.subscribe`, 981 it will be called with information about a change to this collection. 982 Then this notification will be propagated to this collection's root. 983 984 Arguments: 985 986 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 987 the collection. 988 989 * collection: arvados.collection.RichCollectionBase --- The 990 collection that was modified. 991 992 * name: str --- The name of the file or stream within `collection` that 993 was modified. 994 995 * item: arvados.arvfile.ArvadosFile | 996 arvados.collection.Subcollection --- The new contents at `name` 997 within `collection`. 998 """ 999 if self._callback: 1000 self._callback(event, collection, name, item) 1001 self.root_collection().notify(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1023 @synchronized 1024 def flush(self) -> None: 1025 """Upload any pending data to Keep""" 1026 for e in self.values(): 1027 e.flush()
Upload any pending data to Keep
Inherited Members
1030class Collection(RichCollectionBase): 1031 """Read and manipulate an Arvados collection 1032 1033 This class provides a high-level interface to create, read, and update 1034 Arvados collections and their contents. Refer to the Arvados Python SDK 1035 cookbook for [an introduction to using the Collection class][cookbook]. 1036 1037 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1038 """ 1039 1040 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1041 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1042 keep_client: Optional['arvados.keep.KeepClient']=None, 1043 num_retries: int=10, 1044 parent: Optional['Collection']=None, 1045 apiconfig: Optional[Mapping[str, str]]=None, 1046 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1047 replication_desired: Optional[int]=None, 1048 storage_classes_desired: Optional[List[str]]=None, 1049 put_threads: Optional[int]=None): 1050 """Initialize a Collection object 1051 1052 Arguments: 1053 1054 * manifest_locator_or_text: str | None --- This string can contain a 1055 collection manifest text, portable data hash, or UUID. When given a 1056 portable data hash or UUID, this instance will load a collection 1057 record from the API server. Otherwise, this instance will represent a 1058 new collection without an API server record. The default value `None` 1059 instantiates a new collection with an empty manifest. 1060 1061 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1062 Arvados API client object this instance uses to make requests. If 1063 none is given, this instance creates its own client using the 1064 settings from `apiconfig` (see below). If your client instantiates 1065 many Collection objects, you can help limit memory utilization by 1066 calling `arvados.api.api` to construct an 1067 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client` 1068 for every Collection. 1069 1070 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1071 object this instance uses to make requests. If none is given, this 1072 instance creates its own client using its `api_client`. 1073 1074 * num_retries: int --- The number of times that client requests are 1075 retried. Default 10. 1076 1077 * parent: arvados.collection.Collection | None --- The parent Collection 1078 object of this instance, if any. This argument is primarily used by 1079 other Collection methods; user client code shouldn't need to use it. 1080 1081 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1082 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1083 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1084 Collection object constructs one from these settings. If no 1085 mapping is provided, calls `arvados.config.settings` to get these 1086 parameters from user configuration. 1087 1088 * block_manager: arvados.arvfile._BlockManager | None --- The 1089 _BlockManager object used by this instance to coordinate reading 1090 and writing Keep data blocks. If none is given, this instance 1091 constructs its own. This argument is primarily used by other 1092 Collection methods; user client code shouldn't need to use it. 1093 1094 * replication_desired: int | None --- This controls both the value of 1095 the `replication_desired` field on API collection records saved by 1096 this class, as well as the number of Keep services that the object 1097 writes new data blocks to. If none is given, uses the default value 1098 configured for the cluster. 1099 1100 * storage_classes_desired: list[str] | None --- This controls both 1101 the value of the `storage_classes_desired` field on API collection 1102 records saved by this class, as well as selecting which specific 1103 Keep services the object writes new data blocks to. If none is 1104 given, defaults to an empty list. 1105 1106 * put_threads: int | None --- The number of threads to run 1107 simultaneously to upload data blocks to Keep. This value is used when 1108 building a new `block_manager`. It is unused when a `block_manager` 1109 is provided. 1110 """ 1111 1112 if storage_classes_desired and type(storage_classes_desired) is not list: 1113 raise errors.ArgumentError("storage_classes_desired must be list type.") 1114 1115 super(Collection, self).__init__(parent) 1116 self._api_client = api_client 1117 self._keep_client = keep_client 1118 1119 # Use the keep client from ThreadSafeAPIClient 1120 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient): 1121 self._keep_client = self._api_client.keep 1122 1123 self._block_manager = block_manager 1124 self.replication_desired = replication_desired 1125 self._storage_classes_desired = storage_classes_desired 1126 self.put_threads = put_threads 1127 1128 if apiconfig: 1129 self._config = apiconfig 1130 else: 1131 self._config = config.settings() 1132 1133 self.num_retries = num_retries 1134 self._manifest_locator = None 1135 self._manifest_text = None 1136 self._portable_data_hash = None 1137 self._api_response = None 1138 self._past_versions = set() 1139 1140 self.lock = threading.RLock() 1141 self.events = None 1142 1143 if manifest_locator_or_text: 1144 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1145 self._manifest_locator = manifest_locator_or_text 1146 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1147 self._manifest_locator = manifest_locator_or_text 1148 if not self._has_local_collection_uuid(): 1149 self._has_remote_blocks = True 1150 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1151 self._manifest_text = manifest_locator_or_text 1152 if '+R' in self._manifest_text: 1153 self._has_remote_blocks = True 1154 else: 1155 raise errors.ArgumentError( 1156 "Argument to CollectionReader is not a manifest or a collection UUID") 1157 1158 try: 1159 self._populate() 1160 except errors.SyntaxError as e: 1161 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1162 1163 def storage_classes_desired(self) -> List[str]: 1164 """Get this collection's `storage_classes_desired` value""" 1165 return self._storage_classes_desired or [] 1166 1167 def root_collection(self) -> 'Collection': 1168 return self 1169 1170 def get_properties(self) -> Properties: 1171 """Get this collection's properties 1172 1173 This method always returns a dict. If this collection object does not 1174 have an associated API record, or that record does not have any 1175 properties set, this method returns an empty dict. 1176 """ 1177 if self._api_response and self._api_response["properties"]: 1178 return self._api_response["properties"] 1179 else: 1180 return {} 1181 1182 def get_trash_at(self) -> Optional[datetime.datetime]: 1183 """Get this collection's `trash_at` field 1184 1185 This method parses the `trash_at` field of the collection's API 1186 record and returns a datetime from it. If that field is not set, or 1187 this collection object does not have an associated API record, 1188 returns None. 1189 """ 1190 if self._api_response and self._api_response["trash_at"]: 1191 try: 1192 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1193 except ValueError: 1194 return None 1195 else: 1196 return None 1197 1198 def stream_name(self) -> str: 1199 return "." 1200 1201 def writable(self) -> bool: 1202 return True 1203 1204 @synchronized 1205 def known_past_version( 1206 self, 1207 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1208 ) -> bool: 1209 """Indicate whether an API record for this collection has been seen before 1210 1211 As this collection object loads records from the API server, it records 1212 their `modified_at` and `portable_data_hash` fields. This method accepts 1213 a 2-tuple with values for those fields, and returns `True` if the 1214 combination was previously loaded. 1215 """ 1216 return modified_at_and_portable_data_hash in self._past_versions 1217 1218 @synchronized 1219 @retry_method 1220 def update( 1221 self, 1222 other: Optional['Collection']=None, 1223 num_retries: Optional[int]=None, 1224 ) -> None: 1225 """Merge another collection's contents into this one 1226 1227 This method compares the manifest of this collection instance with 1228 another, then updates this instance's manifest with changes from the 1229 other, renaming files to flag conflicts where necessary. 1230 1231 When called without any arguments, this method reloads the collection's 1232 API record, and updates this instance with any changes that have 1233 appeared server-side. If this instance does not have a corresponding 1234 API record, this method raises `arvados.errors.ArgumentError`. 1235 1236 Arguments: 1237 1238 * other: arvados.collection.Collection | None --- The collection 1239 whose contents should be merged into this instance. When not 1240 provided, this method reloads this collection's API record and 1241 constructs a Collection object from it. If this instance does not 1242 have a corresponding API record, this method raises 1243 `arvados.errors.ArgumentError`. 1244 1245 * num_retries: int | None --- The number of times to retry reloading 1246 the collection's API record from the API server. If not specified, 1247 uses the `num_retries` provided when this instance was constructed. 1248 """ 1249 if other is None: 1250 if self._manifest_locator is None: 1251 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1252 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1253 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1254 response.get("portable_data_hash") != self.portable_data_hash()): 1255 # The record on the server is different from our current one, but we've seen it before, 1256 # so ignore it because it's already been merged. 1257 # However, if it's the same as our current record, proceed with the update, because we want to update 1258 # our tokens. 1259 return 1260 else: 1261 self._remember_api_response(response) 1262 other = CollectionReader(response["manifest_text"]) 1263 baseline = CollectionReader(self._manifest_text) 1264 self.apply(baseline.diff(other)) 1265 self._manifest_text = self.manifest_text() 1266 1267 @synchronized 1268 def _my_api(self): 1269 if self._api_client is None: 1270 self._api_client = ThreadSafeAPIClient(self._config, version='v1') 1271 if self._keep_client is None: 1272 self._keep_client = self._api_client.keep 1273 return self._api_client 1274 1275 @synchronized 1276 def _my_keep(self): 1277 if self._keep_client is None: 1278 if self._api_client is None: 1279 self._my_api() 1280 else: 1281 self._keep_client = KeepClient(api_client=self._api_client) 1282 return self._keep_client 1283 1284 @synchronized 1285 def _my_block_manager(self): 1286 if self._block_manager is None: 1287 copies = (self.replication_desired or 1288 self._my_api()._rootDesc.get('defaultCollectionReplication', 1289 2)) 1290 self._block_manager = _BlockManager(self._my_keep(), 1291 copies=copies, 1292 put_threads=self.put_threads, 1293 num_retries=self.num_retries, 1294 storage_classes_func=self.storage_classes_desired) 1295 return self._block_manager 1296 1297 def _remember_api_response(self, response): 1298 self._api_response = response 1299 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1300 1301 def _populate_from_api_server(self): 1302 # As in KeepClient itself, we must wait until the last 1303 # possible moment to instantiate an API client, in order to 1304 # avoid tripping up clients that don't have access to an API 1305 # server. If we do build one, make sure our Keep client uses 1306 # it. If instantiation fails, we'll fall back to the except 1307 # clause, just like any other Collection lookup 1308 # failure. Return an exception, or None if successful. 1309 self._remember_api_response(self._my_api().collections().get( 1310 uuid=self._manifest_locator).execute( 1311 num_retries=self.num_retries)) 1312 self._manifest_text = self._api_response['manifest_text'] 1313 self._portable_data_hash = self._api_response['portable_data_hash'] 1314 # If not overriden via kwargs, we should try to load the 1315 # replication_desired and storage_classes_desired from the API server 1316 if self.replication_desired is None: 1317 self.replication_desired = self._api_response.get('replication_desired', None) 1318 if self._storage_classes_desired is None: 1319 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1320 1321 def _populate(self): 1322 if self._manifest_text is None: 1323 if self._manifest_locator is None: 1324 return 1325 else: 1326 self._populate_from_api_server() 1327 self._baseline_manifest = self._manifest_text 1328 self._import_manifest(self._manifest_text) 1329 1330 def _has_collection_uuid(self): 1331 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1332 1333 def _has_local_collection_uuid(self): 1334 return self._has_collection_uuid and \ 1335 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1336 1337 def __enter__(self): 1338 return self 1339 1340 def __exit__(self, exc_type, exc_value, traceback): 1341 """Exit a context with this collection instance 1342 1343 If no exception was raised inside the context block, and this 1344 collection is writable and has a corresponding API record, that 1345 record will be updated to match the state of this instance at the end 1346 of the block. 1347 """ 1348 if exc_type is None: 1349 if self.writable() and self._has_collection_uuid(): 1350 self.save() 1351 self.stop_threads() 1352 1353 def stop_threads(self) -> None: 1354 """Stop background Keep upload/download threads""" 1355 if self._block_manager is not None: 1356 self._block_manager.stop_threads() 1357 1358 @synchronized 1359 def manifest_locator(self) -> Optional[str]: 1360 """Get this collection's manifest locator, if any 1361 1362 * If this collection instance is associated with an API record with a 1363 UUID, return that. 1364 * Otherwise, if this collection instance was loaded from an API record 1365 by portable data hash, return that. 1366 * Otherwise, return `None`. 1367 """ 1368 return self._manifest_locator 1369 1370 @synchronized 1371 def clone( 1372 self, 1373 new_parent: Optional['Collection']=None, 1374 new_name: Optional[str]=None, 1375 readonly: bool=False, 1376 new_config: Optional[Mapping[str, str]]=None, 1377 ) -> 'Collection': 1378 """Create a Collection object with the same contents as this instance 1379 1380 This method creates a new Collection object with contents that match 1381 this instance's. The new collection will not be associated with any API 1382 record. 1383 1384 Arguments: 1385 1386 * new_parent: arvados.collection.Collection | None --- This value is 1387 passed to the new Collection's constructor as the `parent` 1388 argument. 1389 1390 * new_name: str | None --- This value is unused. 1391 1392 * readonly: bool --- If this value is true, this method constructs and 1393 returns a `CollectionReader`. Otherwise, it returns a mutable 1394 `Collection`. Default `False`. 1395 1396 * new_config: Mapping[str, str] | None --- This value is passed to the 1397 new Collection's constructor as `apiconfig`. If no value is provided, 1398 defaults to the configuration passed to this instance's constructor. 1399 """ 1400 if new_config is None: 1401 new_config = self._config 1402 if readonly: 1403 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1404 else: 1405 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1406 1407 newcollection._clonefrom(self) 1408 return newcollection 1409 1410 @synchronized 1411 def api_response(self) -> Optional[Dict[str, Any]]: 1412 """Get this instance's associated API record 1413 1414 If this Collection instance has an associated API record, return it. 1415 Otherwise, return `None`. 1416 """ 1417 return self._api_response 1418 1419 def find_or_create( 1420 self, 1421 path: str, 1422 create_type: CreateType, 1423 ) -> CollectionItem: 1424 if path == ".": 1425 return self 1426 else: 1427 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1428 1429 def find(self, path: str) -> CollectionItem: 1430 if path == ".": 1431 return self 1432 else: 1433 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1434 1435 def remove(self, path: str, recursive: bool=False) -> None: 1436 if path == ".": 1437 raise errors.ArgumentError("Cannot remove '.'") 1438 else: 1439 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1440 1441 @must_be_writable 1442 @synchronized 1443 @retry_method 1444 def save( 1445 self, 1446 properties: Optional[Properties]=None, 1447 storage_classes: Optional[StorageClasses]=None, 1448 trash_at: Optional[datetime.datetime]=None, 1449 merge: bool=True, 1450 num_retries: Optional[int]=None, 1451 preserve_version: bool=False, 1452 ) -> str: 1453 """Save collection to an existing API record 1454 1455 This method updates the instance's corresponding API record to match 1456 the instance's state. If this instance does not have a corresponding API 1457 record yet, raises `AssertionError`. (To create a new API record, use 1458 `Collection.save_new`.) This method returns the saved collection 1459 manifest. 1460 1461 Arguments: 1462 1463 * properties: dict[str, Any] | None --- If provided, the API record will 1464 be updated with these properties. Note this will completely replace 1465 any existing properties. 1466 1467 * storage_classes: list[str] | None --- If provided, the API record will 1468 be updated with this value in the `storage_classes_desired` field. 1469 This value will also be saved on the instance and used for any 1470 changes that follow. 1471 1472 * trash_at: datetime.datetime | None --- If provided, the API record 1473 will be updated with this value in the `trash_at` field. 1474 1475 * merge: bool --- If `True` (the default), this method will first 1476 reload this collection's API record, and merge any new contents into 1477 this instance before saving changes. See `Collection.update` for 1478 details. 1479 1480 * num_retries: int | None --- The number of times to retry reloading 1481 the collection's API record from the API server. If not specified, 1482 uses the `num_retries` provided when this instance was constructed. 1483 1484 * preserve_version: bool --- This value will be passed to directly 1485 to the underlying API call. If `True`, the Arvados API will 1486 preserve the versions of this collection both immediately before 1487 and after the update. If `True` when the API server is not 1488 configured with collection versioning, this method raises 1489 `arvados.errors.ArgumentError`. 1490 """ 1491 if properties and type(properties) is not dict: 1492 raise errors.ArgumentError("properties must be dictionary type.") 1493 1494 if storage_classes and type(storage_classes) is not list: 1495 raise errors.ArgumentError("storage_classes must be list type.") 1496 if storage_classes: 1497 self._storage_classes_desired = storage_classes 1498 1499 if trash_at and type(trash_at) is not datetime.datetime: 1500 raise errors.ArgumentError("trash_at must be datetime type.") 1501 1502 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1503 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1504 1505 body={} 1506 if properties: 1507 body["properties"] = properties 1508 if self.storage_classes_desired(): 1509 body["storage_classes_desired"] = self.storage_classes_desired() 1510 if trash_at: 1511 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1512 body["trash_at"] = t 1513 if preserve_version: 1514 body["preserve_version"] = preserve_version 1515 1516 if not self.committed(): 1517 if self._has_remote_blocks: 1518 # Copy any remote blocks to the local cluster. 1519 self._copy_remote_blocks(remote_blocks={}) 1520 self._has_remote_blocks = False 1521 if not self._has_collection_uuid(): 1522 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1523 elif not self._has_local_collection_uuid(): 1524 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1525 1526 self._my_block_manager().commit_all() 1527 1528 if merge: 1529 self.update() 1530 1531 text = self.manifest_text(strip=False) 1532 body['manifest_text'] = text 1533 1534 self._remember_api_response(self._my_api().collections().update( 1535 uuid=self._manifest_locator, 1536 body=body 1537 ).execute(num_retries=num_retries)) 1538 self._manifest_text = self._api_response["manifest_text"] 1539 self._portable_data_hash = self._api_response["portable_data_hash"] 1540 self.set_committed(True) 1541 elif body: 1542 self._remember_api_response(self._my_api().collections().update( 1543 uuid=self._manifest_locator, 1544 body=body 1545 ).execute(num_retries=num_retries)) 1546 1547 return self._manifest_text 1548 1549 1550 @must_be_writable 1551 @synchronized 1552 @retry_method 1553 def save_new( 1554 self, 1555 name: Optional[str]=None, 1556 create_collection_record: bool=True, 1557 owner_uuid: Optional[str]=None, 1558 properties: Optional[Properties]=None, 1559 storage_classes: Optional[StorageClasses]=None, 1560 trash_at: Optional[datetime.datetime]=None, 1561 ensure_unique_name: bool=False, 1562 num_retries: Optional[int]=None, 1563 preserve_version: bool=False, 1564 ): 1565 """Save collection to a new API record 1566 1567 This method finishes uploading new data blocks and (optionally) 1568 creates a new API collection record with the provided data. If a new 1569 record is created, this instance becomes associated with that record 1570 for future updates like `save()`. This method returns the saved 1571 collection manifest. 1572 1573 Arguments: 1574 1575 * name: str | None --- The `name` field to use on the new collection 1576 record. If not specified, a generic default name is generated. 1577 1578 * create_collection_record: bool --- If `True` (the default), creates a 1579 collection record on the API server. If `False`, the method finishes 1580 all data uploads and only returns the resulting collection manifest 1581 without sending it to the API server. 1582 1583 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1584 new collection record. 1585 1586 * properties: dict[str, Any] | None --- The `properties` field to use on 1587 the new collection record. 1588 1589 * storage_classes: list[str] | None --- The 1590 `storage_classes_desired` field to use on the new collection record. 1591 1592 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1593 on the new collection record. 1594 1595 * ensure_unique_name: bool --- This value is passed directly to the 1596 Arvados API when creating the collection record. If `True`, the API 1597 server may modify the submitted `name` to ensure the collection's 1598 `name`+`owner_uuid` combination is unique. If `False` (the default), 1599 if a collection already exists with this same `name`+`owner_uuid` 1600 combination, creating a collection record will raise a validation 1601 error. 1602 1603 * num_retries: int | None --- The number of times to retry reloading 1604 the collection's API record from the API server. If not specified, 1605 uses the `num_retries` provided when this instance was constructed. 1606 1607 * preserve_version: bool --- This value will be passed to directly 1608 to the underlying API call. If `True`, the Arvados API will 1609 preserve the versions of this collection both immediately before 1610 and after the update. If `True` when the API server is not 1611 configured with collection versioning, this method raises 1612 `arvados.errors.ArgumentError`. 1613 """ 1614 if properties and type(properties) is not dict: 1615 raise errors.ArgumentError("properties must be dictionary type.") 1616 1617 if storage_classes and type(storage_classes) is not list: 1618 raise errors.ArgumentError("storage_classes must be list type.") 1619 1620 if trash_at and type(trash_at) is not datetime.datetime: 1621 raise errors.ArgumentError("trash_at must be datetime type.") 1622 1623 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1624 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1625 1626 if self._has_remote_blocks: 1627 # Copy any remote blocks to the local cluster. 1628 self._copy_remote_blocks(remote_blocks={}) 1629 self._has_remote_blocks = False 1630 1631 if storage_classes: 1632 self._storage_classes_desired = storage_classes 1633 1634 self._my_block_manager().commit_all() 1635 text = self.manifest_text(strip=False) 1636 1637 if create_collection_record: 1638 if name is None: 1639 name = "New collection" 1640 ensure_unique_name = True 1641 1642 body = {"manifest_text": text, 1643 "name": name, 1644 "replication_desired": self.replication_desired} 1645 if owner_uuid: 1646 body["owner_uuid"] = owner_uuid 1647 if properties: 1648 body["properties"] = properties 1649 if self.storage_classes_desired(): 1650 body["storage_classes_desired"] = self.storage_classes_desired() 1651 if trash_at: 1652 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1653 body["trash_at"] = t 1654 if preserve_version: 1655 body["preserve_version"] = preserve_version 1656 1657 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1658 text = self._api_response["manifest_text"] 1659 1660 self._manifest_locator = self._api_response["uuid"] 1661 self._portable_data_hash = self._api_response["portable_data_hash"] 1662 1663 self._manifest_text = text 1664 self.set_committed(True) 1665 1666 return text 1667 1668 _token_re = re.compile(r'(\S+)(\s+|$)') 1669 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1670 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1671 1672 def _unescape_manifest_path(self, path): 1673 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1674 1675 @synchronized 1676 def _import_manifest(self, manifest_text): 1677 """Import a manifest into a `Collection`. 1678 1679 :manifest_text: 1680 The manifest text to import from. 1681 1682 """ 1683 if len(self) > 0: 1684 raise ArgumentError("Can only import manifest into an empty collection") 1685 1686 STREAM_NAME = 0 1687 BLOCKS = 1 1688 SEGMENTS = 2 1689 1690 stream_name = None 1691 state = STREAM_NAME 1692 1693 for token_and_separator in self._token_re.finditer(manifest_text): 1694 tok = token_and_separator.group(1) 1695 sep = token_and_separator.group(2) 1696 1697 if state == STREAM_NAME: 1698 # starting a new stream 1699 stream_name = self._unescape_manifest_path(tok) 1700 blocks = [] 1701 segments = [] 1702 streamoffset = 0 1703 state = BLOCKS 1704 self.find_or_create(stream_name, COLLECTION) 1705 continue 1706 1707 if state == BLOCKS: 1708 block_locator = self._block_re.match(tok) 1709 if block_locator: 1710 blocksize = int(block_locator.group(1)) 1711 blocks.append(streams.Range(tok, streamoffset, blocksize, 0)) 1712 streamoffset += blocksize 1713 else: 1714 state = SEGMENTS 1715 1716 if state == SEGMENTS: 1717 file_segment = self._segment_re.match(tok) 1718 if file_segment: 1719 pos = int(file_segment.group(1)) 1720 size = int(file_segment.group(2)) 1721 name = self._unescape_manifest_path(file_segment.group(3)) 1722 if name.split('/')[-1] == '.': 1723 # placeholder for persisting an empty directory, not a real file 1724 if len(name) > 2: 1725 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1726 else: 1727 filepath = os.path.join(stream_name, name) 1728 try: 1729 afile = self.find_or_create(filepath, FILE) 1730 except IOError as e: 1731 if e.errno == errno.ENOTDIR: 1732 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1733 else: 1734 raise e from None 1735 if isinstance(afile, ArvadosFile): 1736 afile.add_segment(blocks, pos, size) 1737 else: 1738 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1739 else: 1740 # error! 1741 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1742 1743 if sep == "\n": 1744 stream_name = None 1745 state = STREAM_NAME 1746 1747 self.set_committed(True) 1748 1749 @synchronized 1750 def notify( 1751 self, 1752 event: ChangeType, 1753 collection: 'RichCollectionBase', 1754 name: str, 1755 item: CollectionItem, 1756 ) -> None: 1757 if self._callback: 1758 self._callback(event, collection, name, item)
Read and manipulate an Arvados collection
This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.
1040 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1041 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1042 keep_client: Optional['arvados.keep.KeepClient']=None, 1043 num_retries: int=10, 1044 parent: Optional['Collection']=None, 1045 apiconfig: Optional[Mapping[str, str]]=None, 1046 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1047 replication_desired: Optional[int]=None, 1048 storage_classes_desired: Optional[List[str]]=None, 1049 put_threads: Optional[int]=None): 1050 """Initialize a Collection object 1051 1052 Arguments: 1053 1054 * manifest_locator_or_text: str | None --- This string can contain a 1055 collection manifest text, portable data hash, or UUID. When given a 1056 portable data hash or UUID, this instance will load a collection 1057 record from the API server. Otherwise, this instance will represent a 1058 new collection without an API server record. The default value `None` 1059 instantiates a new collection with an empty manifest. 1060 1061 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1062 Arvados API client object this instance uses to make requests. If 1063 none is given, this instance creates its own client using the 1064 settings from `apiconfig` (see below). If your client instantiates 1065 many Collection objects, you can help limit memory utilization by 1066 calling `arvados.api.api` to construct an 1067 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client` 1068 for every Collection. 1069 1070 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1071 object this instance uses to make requests. If none is given, this 1072 instance creates its own client using its `api_client`. 1073 1074 * num_retries: int --- The number of times that client requests are 1075 retried. Default 10. 1076 1077 * parent: arvados.collection.Collection | None --- The parent Collection 1078 object of this instance, if any. This argument is primarily used by 1079 other Collection methods; user client code shouldn't need to use it. 1080 1081 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1082 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1083 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1084 Collection object constructs one from these settings. If no 1085 mapping is provided, calls `arvados.config.settings` to get these 1086 parameters from user configuration. 1087 1088 * block_manager: arvados.arvfile._BlockManager | None --- The 1089 _BlockManager object used by this instance to coordinate reading 1090 and writing Keep data blocks. If none is given, this instance 1091 constructs its own. This argument is primarily used by other 1092 Collection methods; user client code shouldn't need to use it. 1093 1094 * replication_desired: int | None --- This controls both the value of 1095 the `replication_desired` field on API collection records saved by 1096 this class, as well as the number of Keep services that the object 1097 writes new data blocks to. If none is given, uses the default value 1098 configured for the cluster. 1099 1100 * storage_classes_desired: list[str] | None --- This controls both 1101 the value of the `storage_classes_desired` field on API collection 1102 records saved by this class, as well as selecting which specific 1103 Keep services the object writes new data blocks to. If none is 1104 given, defaults to an empty list. 1105 1106 * put_threads: int | None --- The number of threads to run 1107 simultaneously to upload data blocks to Keep. This value is used when 1108 building a new `block_manager`. It is unused when a `block_manager` 1109 is provided. 1110 """ 1111 1112 if storage_classes_desired and type(storage_classes_desired) is not list: 1113 raise errors.ArgumentError("storage_classes_desired must be list type.") 1114 1115 super(Collection, self).__init__(parent) 1116 self._api_client = api_client 1117 self._keep_client = keep_client 1118 1119 # Use the keep client from ThreadSafeAPIClient 1120 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient): 1121 self._keep_client = self._api_client.keep 1122 1123 self._block_manager = block_manager 1124 self.replication_desired = replication_desired 1125 self._storage_classes_desired = storage_classes_desired 1126 self.put_threads = put_threads 1127 1128 if apiconfig: 1129 self._config = apiconfig 1130 else: 1131 self._config = config.settings() 1132 1133 self.num_retries = num_retries 1134 self._manifest_locator = None 1135 self._manifest_text = None 1136 self._portable_data_hash = None 1137 self._api_response = None 1138 self._past_versions = set() 1139 1140 self.lock = threading.RLock() 1141 self.events = None 1142 1143 if manifest_locator_or_text: 1144 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1145 self._manifest_locator = manifest_locator_or_text 1146 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1147 self._manifest_locator = manifest_locator_or_text 1148 if not self._has_local_collection_uuid(): 1149 self._has_remote_blocks = True 1150 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1151 self._manifest_text = manifest_locator_or_text 1152 if '+R' in self._manifest_text: 1153 self._has_remote_blocks = True 1154 else: 1155 raise errors.ArgumentError( 1156 "Argument to CollectionReader is not a manifest or a collection UUID") 1157 1158 try: 1159 self._populate() 1160 except errors.SyntaxError as e: 1161 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.api.ThreadSafeAPIClient
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
1163 def storage_classes_desired(self) -> List[str]: 1164 """Get this collection's `storage_classes_desired` value""" 1165 return self._storage_classes_desired or []
Get this collection’s storage_classes_desired
value
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
1170 def get_properties(self) -> Properties: 1171 """Get this collection's properties 1172 1173 This method always returns a dict. If this collection object does not 1174 have an associated API record, or that record does not have any 1175 properties set, this method returns an empty dict. 1176 """ 1177 if self._api_response and self._api_response["properties"]: 1178 return self._api_response["properties"] 1179 else: 1180 return {}
Get this collection’s properties
This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.
1182 def get_trash_at(self) -> Optional[datetime.datetime]: 1183 """Get this collection's `trash_at` field 1184 1185 This method parses the `trash_at` field of the collection's API 1186 record and returns a datetime from it. If that field is not set, or 1187 this collection object does not have an associated API record, 1188 returns None. 1189 """ 1190 if self._api_response and self._api_response["trash_at"]: 1191 try: 1192 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1193 except ValueError: 1194 return None 1195 else: 1196 return None
Get this collection’s trash_at
field
This method parses the trash_at
field of the collection’s API
record and returns a datetime from it. If that field is not set, or
this collection object does not have an associated API record,
returns None.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
1204 @synchronized 1205 def known_past_version( 1206 self, 1207 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1208 ) -> bool: 1209 """Indicate whether an API record for this collection has been seen before 1210 1211 As this collection object loads records from the API server, it records 1212 their `modified_at` and `portable_data_hash` fields. This method accepts 1213 a 2-tuple with values for those fields, and returns `True` if the 1214 combination was previously loaded. 1215 """ 1216 return modified_at_and_portable_data_hash in self._past_versions
Indicate whether an API record for this collection has been seen before
As this collection object loads records from the API server, it records
their modified_at
and portable_data_hash
fields. This method accepts
a 2-tuple with values for those fields, and returns True
if the
combination was previously loaded.
1218 @synchronized 1219 @retry_method 1220 def update( 1221 self, 1222 other: Optional['Collection']=None, 1223 num_retries: Optional[int]=None, 1224 ) -> None: 1225 """Merge another collection's contents into this one 1226 1227 This method compares the manifest of this collection instance with 1228 another, then updates this instance's manifest with changes from the 1229 other, renaming files to flag conflicts where necessary. 1230 1231 When called without any arguments, this method reloads the collection's 1232 API record, and updates this instance with any changes that have 1233 appeared server-side. If this instance does not have a corresponding 1234 API record, this method raises `arvados.errors.ArgumentError`. 1235 1236 Arguments: 1237 1238 * other: arvados.collection.Collection | None --- The collection 1239 whose contents should be merged into this instance. When not 1240 provided, this method reloads this collection's API record and 1241 constructs a Collection object from it. If this instance does not 1242 have a corresponding API record, this method raises 1243 `arvados.errors.ArgumentError`. 1244 1245 * num_retries: int | None --- The number of times to retry reloading 1246 the collection's API record from the API server. If not specified, 1247 uses the `num_retries` provided when this instance was constructed. 1248 """ 1249 if other is None: 1250 if self._manifest_locator is None: 1251 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1252 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1253 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1254 response.get("portable_data_hash") != self.portable_data_hash()): 1255 # The record on the server is different from our current one, but we've seen it before, 1256 # so ignore it because it's already been merged. 1257 # However, if it's the same as our current record, proceed with the update, because we want to update 1258 # our tokens. 1259 return 1260 else: 1261 self._remember_api_response(response) 1262 other = CollectionReader(response["manifest_text"]) 1263 baseline = CollectionReader(self._manifest_text) 1264 self.apply(baseline.diff(other)) 1265 self._manifest_text = self.manifest_text()
Merge another collection’s contents into this one
This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.
When called without any arguments, this method reloads the collection’s
API record, and updates this instance with any changes that have
appeared server-side. If this instance does not have a corresponding
API record, this method raises arvados.errors.ArgumentError
.
Arguments:
other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises
arvados.errors.ArgumentError
.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.
1353 def stop_threads(self) -> None: 1354 """Stop background Keep upload/download threads""" 1355 if self._block_manager is not None: 1356 self._block_manager.stop_threads()
Stop background Keep upload/download threads
1358 @synchronized 1359 def manifest_locator(self) -> Optional[str]: 1360 """Get this collection's manifest locator, if any 1361 1362 * If this collection instance is associated with an API record with a 1363 UUID, return that. 1364 * Otherwise, if this collection instance was loaded from an API record 1365 by portable data hash, return that. 1366 * Otherwise, return `None`. 1367 """ 1368 return self._manifest_locator
Get this collection’s manifest locator, if any
- If this collection instance is associated with an API record with a UUID, return that.
- Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
- Otherwise, return
None
.
1370 @synchronized 1371 def clone( 1372 self, 1373 new_parent: Optional['Collection']=None, 1374 new_name: Optional[str]=None, 1375 readonly: bool=False, 1376 new_config: Optional[Mapping[str, str]]=None, 1377 ) -> 'Collection': 1378 """Create a Collection object with the same contents as this instance 1379 1380 This method creates a new Collection object with contents that match 1381 this instance's. The new collection will not be associated with any API 1382 record. 1383 1384 Arguments: 1385 1386 * new_parent: arvados.collection.Collection | None --- This value is 1387 passed to the new Collection's constructor as the `parent` 1388 argument. 1389 1390 * new_name: str | None --- This value is unused. 1391 1392 * readonly: bool --- If this value is true, this method constructs and 1393 returns a `CollectionReader`. Otherwise, it returns a mutable 1394 `Collection`. Default `False`. 1395 1396 * new_config: Mapping[str, str] | None --- This value is passed to the 1397 new Collection's constructor as `apiconfig`. If no value is provided, 1398 defaults to the configuration passed to this instance's constructor. 1399 """ 1400 if new_config is None: 1401 new_config = self._config 1402 if readonly: 1403 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1404 else: 1405 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1406 1407 newcollection._clonefrom(self) 1408 return newcollection
Create a Collection object with the same contents as this instance
This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.
Arguments:
new_parent: Collection | None — This value is passed to the new Collection’s constructor as the
parent
argument.new_name: str | None — This value is unused.
readonly: bool — If this value is true, this method constructs and returns a
CollectionReader
. Otherwise, it returns a mutableCollection
. DefaultFalse
.new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as
apiconfig
. If no value is provided, defaults to the configuration passed to this instance’s constructor.
1410 @synchronized 1411 def api_response(self) -> Optional[Dict[str, Any]]: 1412 """Get this instance's associated API record 1413 1414 If this Collection instance has an associated API record, return it. 1415 Otherwise, return `None`. 1416 """ 1417 return self._api_response
Get this instance’s associated API record
If this Collection instance has an associated API record, return it.
Otherwise, return None
.
1419 def find_or_create( 1420 self, 1421 path: str, 1422 create_type: CreateType, 1423 ) -> CollectionItem: 1424 if path == ".": 1425 return self 1426 else: 1427 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
1429 def find(self, path: str) -> CollectionItem: 1430 if path == ".": 1431 return self 1432 else: 1433 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
1435 def remove(self, path: str, recursive: bool=False) -> None: 1436 if path == ".": 1437 raise errors.ArgumentError("Cannot remove '.'") 1438 else: 1439 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
1441 @must_be_writable 1442 @synchronized 1443 @retry_method 1444 def save( 1445 self, 1446 properties: Optional[Properties]=None, 1447 storage_classes: Optional[StorageClasses]=None, 1448 trash_at: Optional[datetime.datetime]=None, 1449 merge: bool=True, 1450 num_retries: Optional[int]=None, 1451 preserve_version: bool=False, 1452 ) -> str: 1453 """Save collection to an existing API record 1454 1455 This method updates the instance's corresponding API record to match 1456 the instance's state. If this instance does not have a corresponding API 1457 record yet, raises `AssertionError`. (To create a new API record, use 1458 `Collection.save_new`.) This method returns the saved collection 1459 manifest. 1460 1461 Arguments: 1462 1463 * properties: dict[str, Any] | None --- If provided, the API record will 1464 be updated with these properties. Note this will completely replace 1465 any existing properties. 1466 1467 * storage_classes: list[str] | None --- If provided, the API record will 1468 be updated with this value in the `storage_classes_desired` field. 1469 This value will also be saved on the instance and used for any 1470 changes that follow. 1471 1472 * trash_at: datetime.datetime | None --- If provided, the API record 1473 will be updated with this value in the `trash_at` field. 1474 1475 * merge: bool --- If `True` (the default), this method will first 1476 reload this collection's API record, and merge any new contents into 1477 this instance before saving changes. See `Collection.update` for 1478 details. 1479 1480 * num_retries: int | None --- The number of times to retry reloading 1481 the collection's API record from the API server. If not specified, 1482 uses the `num_retries` provided when this instance was constructed. 1483 1484 * preserve_version: bool --- This value will be passed to directly 1485 to the underlying API call. If `True`, the Arvados API will 1486 preserve the versions of this collection both immediately before 1487 and after the update. If `True` when the API server is not 1488 configured with collection versioning, this method raises 1489 `arvados.errors.ArgumentError`. 1490 """ 1491 if properties and type(properties) is not dict: 1492 raise errors.ArgumentError("properties must be dictionary type.") 1493 1494 if storage_classes and type(storage_classes) is not list: 1495 raise errors.ArgumentError("storage_classes must be list type.") 1496 if storage_classes: 1497 self._storage_classes_desired = storage_classes 1498 1499 if trash_at and type(trash_at) is not datetime.datetime: 1500 raise errors.ArgumentError("trash_at must be datetime type.") 1501 1502 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1503 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1504 1505 body={} 1506 if properties: 1507 body["properties"] = properties 1508 if self.storage_classes_desired(): 1509 body["storage_classes_desired"] = self.storage_classes_desired() 1510 if trash_at: 1511 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1512 body["trash_at"] = t 1513 if preserve_version: 1514 body["preserve_version"] = preserve_version 1515 1516 if not self.committed(): 1517 if self._has_remote_blocks: 1518 # Copy any remote blocks to the local cluster. 1519 self._copy_remote_blocks(remote_blocks={}) 1520 self._has_remote_blocks = False 1521 if not self._has_collection_uuid(): 1522 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1523 elif not self._has_local_collection_uuid(): 1524 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1525 1526 self._my_block_manager().commit_all() 1527 1528 if merge: 1529 self.update() 1530 1531 text = self.manifest_text(strip=False) 1532 body['manifest_text'] = text 1533 1534 self._remember_api_response(self._my_api().collections().update( 1535 uuid=self._manifest_locator, 1536 body=body 1537 ).execute(num_retries=num_retries)) 1538 self._manifest_text = self._api_response["manifest_text"] 1539 self._portable_data_hash = self._api_response["portable_data_hash"] 1540 self.set_committed(True) 1541 elif body: 1542 self._remember_api_response(self._my_api().collections().update( 1543 uuid=self._manifest_locator, 1544 body=body 1545 ).execute(num_retries=num_retries)) 1546 1547 return self._manifest_text
Save collection to an existing API record
This method updates the instance’s corresponding API record to match
the instance’s state. If this instance does not have a corresponding API
record yet, raises AssertionError
. (To create a new API record, use
Collection.save_new
.) This method returns the saved collection
manifest.
Arguments:
properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.
storage_classes: list[str] | None — If provided, the API record will be updated with this value in the
storage_classes_desired
field. This value will also be saved on the instance and used for any changes that follow.trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the
trash_at
field.merge: bool — If
True
(the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. SeeCollection.update
for details.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1550 @must_be_writable 1551 @synchronized 1552 @retry_method 1553 def save_new( 1554 self, 1555 name: Optional[str]=None, 1556 create_collection_record: bool=True, 1557 owner_uuid: Optional[str]=None, 1558 properties: Optional[Properties]=None, 1559 storage_classes: Optional[StorageClasses]=None, 1560 trash_at: Optional[datetime.datetime]=None, 1561 ensure_unique_name: bool=False, 1562 num_retries: Optional[int]=None, 1563 preserve_version: bool=False, 1564 ): 1565 """Save collection to a new API record 1566 1567 This method finishes uploading new data blocks and (optionally) 1568 creates a new API collection record with the provided data. If a new 1569 record is created, this instance becomes associated with that record 1570 for future updates like `save()`. This method returns the saved 1571 collection manifest. 1572 1573 Arguments: 1574 1575 * name: str | None --- The `name` field to use on the new collection 1576 record. If not specified, a generic default name is generated. 1577 1578 * create_collection_record: bool --- If `True` (the default), creates a 1579 collection record on the API server. If `False`, the method finishes 1580 all data uploads and only returns the resulting collection manifest 1581 without sending it to the API server. 1582 1583 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1584 new collection record. 1585 1586 * properties: dict[str, Any] | None --- The `properties` field to use on 1587 the new collection record. 1588 1589 * storage_classes: list[str] | None --- The 1590 `storage_classes_desired` field to use on the new collection record. 1591 1592 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1593 on the new collection record. 1594 1595 * ensure_unique_name: bool --- This value is passed directly to the 1596 Arvados API when creating the collection record. If `True`, the API 1597 server may modify the submitted `name` to ensure the collection's 1598 `name`+`owner_uuid` combination is unique. If `False` (the default), 1599 if a collection already exists with this same `name`+`owner_uuid` 1600 combination, creating a collection record will raise a validation 1601 error. 1602 1603 * num_retries: int | None --- The number of times to retry reloading 1604 the collection's API record from the API server. If not specified, 1605 uses the `num_retries` provided when this instance was constructed. 1606 1607 * preserve_version: bool --- This value will be passed to directly 1608 to the underlying API call. If `True`, the Arvados API will 1609 preserve the versions of this collection both immediately before 1610 and after the update. If `True` when the API server is not 1611 configured with collection versioning, this method raises 1612 `arvados.errors.ArgumentError`. 1613 """ 1614 if properties and type(properties) is not dict: 1615 raise errors.ArgumentError("properties must be dictionary type.") 1616 1617 if storage_classes and type(storage_classes) is not list: 1618 raise errors.ArgumentError("storage_classes must be list type.") 1619 1620 if trash_at and type(trash_at) is not datetime.datetime: 1621 raise errors.ArgumentError("trash_at must be datetime type.") 1622 1623 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1624 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1625 1626 if self._has_remote_blocks: 1627 # Copy any remote blocks to the local cluster. 1628 self._copy_remote_blocks(remote_blocks={}) 1629 self._has_remote_blocks = False 1630 1631 if storage_classes: 1632 self._storage_classes_desired = storage_classes 1633 1634 self._my_block_manager().commit_all() 1635 text = self.manifest_text(strip=False) 1636 1637 if create_collection_record: 1638 if name is None: 1639 name = "New collection" 1640 ensure_unique_name = True 1641 1642 body = {"manifest_text": text, 1643 "name": name, 1644 "replication_desired": self.replication_desired} 1645 if owner_uuid: 1646 body["owner_uuid"] = owner_uuid 1647 if properties: 1648 body["properties"] = properties 1649 if self.storage_classes_desired(): 1650 body["storage_classes_desired"] = self.storage_classes_desired() 1651 if trash_at: 1652 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1653 body["trash_at"] = t 1654 if preserve_version: 1655 body["preserve_version"] = preserve_version 1656 1657 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1658 text = self._api_response["manifest_text"] 1659 1660 self._manifest_locator = self._api_response["uuid"] 1661 self._portable_data_hash = self._api_response["portable_data_hash"] 1662 1663 self._manifest_text = text 1664 self.set_committed(True) 1665 1666 return text
Save collection to a new API record
This method finishes uploading new data blocks and (optionally)
creates a new API collection record with the provided data. If a new
record is created, this instance becomes associated with that record
for future updates like save()
. This method returns the saved
collection manifest.
Arguments:
name: str | None — The
name
field to use on the new collection record. If not specified, a generic default name is generated.create_collection_record: bool — If
True
(the default), creates a collection record on the API server. IfFalse
, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.owner_uuid: str | None — The
owner_uuid
field to use on the new collection record.properties: dict[str, Any] | None — The
properties
field to use on the new collection record.storage_classes: list[str] | None — The
storage_classes_desired
field to use on the new collection record.trash_at: datetime.datetime | None — The
trash_at
field to use on the new collection record.ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If
True
, the API server may modify the submittedname
to ensure the collection’sname
+owner_uuid
combination is unique. IfFalse
(the default), if a collection already exists with this samename
+owner_uuid
combination, creating a collection record will raise a validation error.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1749 @synchronized 1750 def notify( 1751 self, 1752 event: ChangeType, 1753 collection: 'RichCollectionBase', 1754 name: str, 1755 item: CollectionItem, 1756 ) -> None: 1757 if self._callback: 1758 self._callback(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1761class Subcollection(RichCollectionBase): 1762 """Read and manipulate a stream/directory within an Arvados collection 1763 1764 This class represents a single stream (like a directory) within an Arvados 1765 `Collection`. It is returned by `Collection.find` and provides the same API. 1766 Operations that work on the API collection record propagate to the parent 1767 `Collection` object. 1768 """ 1769 1770 def __init__(self, parent, name): 1771 super(Subcollection, self).__init__(parent) 1772 self.lock = self.root_collection().lock 1773 self._manifest_text = None 1774 self.name = name 1775 self.num_retries = parent.num_retries 1776 1777 def root_collection(self) -> 'Collection': 1778 return self.parent.root_collection() 1779 1780 def writable(self) -> bool: 1781 return self.root_collection().writable() 1782 1783 def _my_api(self): 1784 return self.root_collection()._my_api() 1785 1786 def _my_keep(self): 1787 return self.root_collection()._my_keep() 1788 1789 def _my_block_manager(self): 1790 return self.root_collection()._my_block_manager() 1791 1792 def stream_name(self) -> str: 1793 return os.path.join(self.parent.stream_name(), self.name) 1794 1795 @synchronized 1796 def clone( 1797 self, 1798 new_parent: Optional['Collection']=None, 1799 new_name: Optional[str]=None, 1800 ) -> 'Subcollection': 1801 c = Subcollection(new_parent, new_name) 1802 c._clonefrom(self) 1803 return c 1804 1805 @must_be_writable 1806 @synchronized 1807 def _reparent(self, newparent, newname): 1808 self.set_committed(False) 1809 self.flush() 1810 self.parent.remove(self.name, recursive=True) 1811 self.parent = newparent 1812 self.name = newname 1813 self.lock = self.parent.root_collection().lock 1814 1815 @synchronized 1816 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1817 """Encode empty directories by using an \056-named (".") empty file""" 1818 if len(self._items) == 0: 1819 return "%s %s 0:0:\\056\n" % ( 1820 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1821 return super(Subcollection, self)._get_manifest_text(stream_name, 1822 strip, normalize, 1823 only_committed)
Read and manipulate a stream/directory within an Arvados collection
This class represents a single stream (like a directory) within an Arvados
Collection
. It is returned by Collection.find
and provides the same API.
Operations that work on the API collection record propagate to the parent
Collection
object.
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Inherited Members
1826class CollectionReader(Collection): 1827 """Read-only `Collection` subclass 1828 1829 This class will never create or update any API collection records. You can 1830 use this class for additional code safety when you only need to read 1831 existing collections. 1832 """ 1833 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1834 self._in_init = True 1835 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1836 self._in_init = False 1837 1838 # Forego any locking since it should never change once initialized. 1839 self.lock = NoopLock() 1840 1841 # Backwards compatability with old CollectionReader 1842 # all_streams() and all_files() 1843 self._streams = None 1844 1845 def writable(self) -> bool: 1846 return self._in_init
Read-only Collection
subclass
This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.
1833 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1834 self._in_init = True 1835 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1836 self._in_init = False 1837 1838 # Forego any locking since it should never change once initialized. 1839 self.lock = NoopLock() 1840 1841 # Backwards compatability with old CollectionReader 1842 # all_streams() and all_files() 1843 self._streams = None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.api.ThreadSafeAPIClient
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.