arvados.collection
Tools to work with Arvados collections
This module provides high-level interfaces to create, read, and update
Arvados collections. Most users will want to instantiate Collection
objects, and use methods like Collection.open
and Collection.mkdirs
to
read and write data in the collection. Refer to the Arvados Python SDK
cookbook for an introduction to using the Collection class.
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Tools to work with Arvados collections 5 6This module provides high-level interfaces to create, read, and update 7Arvados collections. Most users will want to instantiate `Collection` 8objects, and use methods like `Collection.open` and `Collection.mkdirs` to 9read and write data in the collection. Refer to the Arvados Python SDK 10cookbook for [an introduction to using the Collection class][cookbook]. 11 12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 13""" 14 15import ciso8601 16import datetime 17import errno 18import functools 19import hashlib 20import io 21import logging 22import os 23import re 24import sys 25import threading 26import time 27 28from collections import deque 29from stat import * 30 31from ._internal import streams 32from .api import ThreadSafeAPIClient 33from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock, ADD, DEL, MOD, TOK, WRITE 34from .keep import KeepLocator, KeepClient 35import arvados.config as config 36import arvados.errors as errors 37import arvados.util 38import arvados.events as events 39from arvados.retry import retry_method 40 41from typing import ( 42 Any, 43 Callable, 44 Dict, 45 IO, 46 Iterator, 47 List, 48 Mapping, 49 Optional, 50 Tuple, 51 Union, 52) 53 54if sys.version_info < (3, 8): 55 from typing_extensions import Literal 56else: 57 from typing import Literal 58 59_logger = logging.getLogger('arvados.collection') 60 61 62FILE = "file" 63"""`create_type` value for `Collection.find_or_create`""" 64COLLECTION = "collection" 65"""`create_type` value for `Collection.find_or_create`""" 66 67ChangeList = List[Union[ 68 Tuple[Literal[ADD, DEL], str, 'Collection'], 69 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'], 70]] 71ChangeType = Literal[ADD, DEL, MOD, TOK] 72CollectionItem = Union[ArvadosFile, 'Collection'] 73ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object] 74CreateType = Literal[COLLECTION, FILE] 75Properties = Dict[str, Any] 76StorageClasses = List[str] 77 78class CollectionBase(object): 79 """Abstract base class for Collection classes 80 81 .. ATTENTION:: Internal 82 This class is meant to be used by other parts of the SDK. User code 83 should instantiate or subclass `Collection` or one of its subclasses 84 directly. 85 """ 86 87 def __enter__(self): 88 """Enter a context block with this collection instance""" 89 return self 90 91 def __exit__(self, exc_type, exc_value, traceback): 92 """Exit a context block with this collection instance""" 93 pass 94 95 def _my_keep(self): 96 if self._keep_client is None: 97 self._keep_client = KeepClient(api_client=self._api_client, 98 num_retries=self.num_retries) 99 return self._keep_client 100 101 def stripped_manifest(self) -> str: 102 """Create a copy of the collection manifest with only size hints 103 104 This method returns a string with the current collection's manifest 105 text with all non-portable locator hints like permission hints and 106 remote cluster hints removed. The only hints in the returned manifest 107 will be size hints. 108 """ 109 raw = self.manifest_text() 110 clean = [] 111 for line in raw.split("\n"): 112 fields = line.split() 113 if fields: 114 clean_fields = fields[:1] + [ 115 (re.sub(r'\+[^\d][^\+]*', '', x) 116 if re.match(arvados.util.keep_locator_pattern, x) 117 else x) 118 for x in fields[1:]] 119 clean += [' '.join(clean_fields), "\n"] 120 return ''.join(clean) 121 122 123class _WriterFile(_FileLikeObjectBase): 124 def __init__(self, coll_writer, name): 125 super(_WriterFile, self).__init__(name, 'wb') 126 self.dest = coll_writer 127 128 def close(self): 129 super(_WriterFile, self).close() 130 self.dest.finish_current_file() 131 132 @_FileLikeObjectBase._before_close 133 def write(self, data): 134 self.dest.write(data) 135 136 @_FileLikeObjectBase._before_close 137 def writelines(self, seq): 138 for data in seq: 139 self.write(data) 140 141 @_FileLikeObjectBase._before_close 142 def flush(self): 143 self.dest.flush_data() 144 145 146class RichCollectionBase(CollectionBase): 147 """Base class for Collection classes 148 149 .. ATTENTION:: Internal 150 This class is meant to be used by other parts of the SDK. User code 151 should instantiate or subclass `Collection` or one of its subclasses 152 directly. 153 """ 154 155 def __init__(self, parent=None): 156 self.parent = parent 157 self._committed = False 158 self._has_remote_blocks = False 159 self._callback = None 160 self._items = {} 161 162 def _my_api(self): 163 raise NotImplementedError() 164 165 def _my_keep(self): 166 raise NotImplementedError() 167 168 def _my_block_manager(self): 169 raise NotImplementedError() 170 171 def writable(self) -> bool: 172 """Indicate whether this collection object can be modified 173 174 This method returns `False` if this object is a `CollectionReader`, 175 else `True`. 176 """ 177 raise NotImplementedError() 178 179 def root_collection(self) -> 'Collection': 180 """Get this collection's root collection object 181 182 If you open a subcollection with `Collection.find`, calling this method 183 on that subcollection returns the source Collection object. 184 """ 185 raise NotImplementedError() 186 187 def stream_name(self) -> str: 188 """Get the name of the manifest stream represented by this collection 189 190 If you open a subcollection with `Collection.find`, calling this method 191 on that subcollection returns the name of the stream you opened. 192 """ 193 raise NotImplementedError() 194 195 @synchronized 196 def has_remote_blocks(self) -> bool: 197 """Indiciate whether the collection refers to remote data 198 199 Returns `True` if the collection manifest includes any Keep locators 200 with a remote hint (`+R`), else `False`. 201 """ 202 if self._has_remote_blocks: 203 return True 204 for item in self: 205 if self[item].has_remote_blocks(): 206 return True 207 return False 208 209 @synchronized 210 def set_has_remote_blocks(self, val: bool) -> None: 211 """Cache whether this collection refers to remote blocks 212 213 .. ATTENTION:: Internal 214 This method is only meant to be used by other Collection methods. 215 216 Set this collection's cached "has remote blocks" flag to the given 217 value. 218 """ 219 self._has_remote_blocks = val 220 if self.parent: 221 self.parent.set_has_remote_blocks(val) 222 223 @must_be_writable 224 @synchronized 225 def find_or_create( 226 self, 227 path: str, 228 create_type: CreateType, 229 ) -> CollectionItem: 230 """Get the item at the given path, creating it if necessary 231 232 If `path` refers to a stream in this collection, returns a 233 corresponding `Subcollection` object. If `path` refers to a file in 234 this collection, returns a corresponding 235 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 236 this collection, then this method creates a new object and returns 237 it, creating parent streams as needed. The type of object created is 238 determined by the value of `create_type`. 239 240 Arguments: 241 242 * path: str --- The path to find or create within this collection. 243 244 * create_type: Literal[COLLECTION, FILE] --- The type of object to 245 create at `path` if one does not exist. Passing `COLLECTION` 246 creates a stream and returns the corresponding 247 `Subcollection`. Passing `FILE` creates a new file and returns the 248 corresponding `arvados.arvfile.ArvadosFile`. 249 """ 250 pathcomponents = path.split("/", 1) 251 if pathcomponents[0]: 252 item = self._items.get(pathcomponents[0]) 253 if len(pathcomponents) == 1: 254 if item is None: 255 # create new file 256 if create_type == COLLECTION: 257 item = Subcollection(self, pathcomponents[0]) 258 else: 259 item = ArvadosFile(self, pathcomponents[0]) 260 self._items[pathcomponents[0]] = item 261 self.set_committed(False) 262 self.notify(ADD, self, pathcomponents[0], item) 263 return item 264 else: 265 if item is None: 266 # create new collection 267 item = Subcollection(self, pathcomponents[0]) 268 self._items[pathcomponents[0]] = item 269 self.set_committed(False) 270 self.notify(ADD, self, pathcomponents[0], item) 271 if isinstance(item, RichCollectionBase): 272 return item.find_or_create(pathcomponents[1], create_type) 273 else: 274 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 275 else: 276 return self 277 278 @synchronized 279 def find(self, path: str) -> CollectionItem: 280 """Get the item at the given path 281 282 If `path` refers to a stream in this collection, returns a 283 corresponding `Subcollection` object. If `path` refers to a file in 284 this collection, returns a corresponding 285 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 286 this collection, then this method raises `NotADirectoryError`. 287 288 Arguments: 289 290 * path: str --- The path to find or create within this collection. 291 """ 292 if not path: 293 raise errors.ArgumentError("Parameter 'path' is empty.") 294 295 pathcomponents = path.split("/", 1) 296 if pathcomponents[0] == '': 297 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 298 299 item = self._items.get(pathcomponents[0]) 300 if item is None: 301 return None 302 elif len(pathcomponents) == 1: 303 return item 304 else: 305 if isinstance(item, RichCollectionBase): 306 if pathcomponents[1]: 307 return item.find(pathcomponents[1]) 308 else: 309 return item 310 else: 311 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 312 313 @synchronized 314 def mkdirs(self, path: str) -> 'Subcollection': 315 """Create and return a subcollection at `path` 316 317 If `path` exists within this collection, raises `FileExistsError`. 318 Otherwise, creates a stream at that path and returns the 319 corresponding `Subcollection`. 320 """ 321 if self.find(path) != None: 322 raise IOError(errno.EEXIST, "Directory or file exists", path) 323 324 return self.find_or_create(path, COLLECTION) 325 326 def open( 327 self, 328 path: str, 329 mode: str="r", 330 encoding: Optional[str]=None 331 ) -> IO: 332 """Open a file-like object within the collection 333 334 This method returns a file-like object that can read and/or write the 335 file located at `path` within the collection. If you attempt to write 336 a `path` that does not exist, the file is created with `find_or_create`. 337 If the file cannot be opened for any other reason, this method raises 338 `OSError` with an appropriate errno. 339 340 Arguments: 341 342 * path: str --- The path of the file to open within this collection 343 344 * mode: str --- The mode to open this file. Supports all the same 345 values as `builtins.open`. 346 347 * encoding: str | None --- The text encoding of the file. Only used 348 when the file is opened in text mode. The default is 349 platform-dependent. 350 351 """ 352 if not re.search(r'^[rwa][bt]?\+?$', mode): 353 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 354 355 if mode[0] == 'r' and '+' not in mode: 356 fclass = ArvadosFileReader 357 arvfile = self.find(path) 358 elif not self.writable(): 359 raise IOError(errno.EROFS, "Collection is read only") 360 else: 361 fclass = ArvadosFileWriter 362 arvfile = self.find_or_create(path, FILE) 363 364 if arvfile is None: 365 raise IOError(errno.ENOENT, "File not found", path) 366 if not isinstance(arvfile, ArvadosFile): 367 raise IOError(errno.EISDIR, "Is a directory", path) 368 369 if mode[0] == 'w': 370 arvfile.truncate(0) 371 372 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 373 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 374 if 'b' not in mode: 375 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 376 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 377 return f 378 379 def modified(self) -> bool: 380 """Indicate whether this collection has an API server record 381 382 Returns `False` if this collection corresponds to a record loaded from 383 the API server, `True` otherwise. 384 """ 385 return not self.committed() 386 387 @synchronized 388 def committed(self): 389 """Indicate whether this collection has an API server record 390 391 Returns `True` if this collection corresponds to a record loaded from 392 the API server, `False` otherwise. 393 """ 394 return self._committed 395 396 @synchronized 397 def set_committed(self, value: bool=True): 398 """Cache whether this collection has an API server record 399 400 .. ATTENTION:: Internal 401 This method is only meant to be used by other Collection methods. 402 403 Set this collection's cached "committed" flag to the given 404 value and propagates it as needed. 405 """ 406 if value == self._committed: 407 return 408 if value: 409 for k,v in self._items.items(): 410 v.set_committed(True) 411 self._committed = True 412 else: 413 self._committed = False 414 if self.parent is not None: 415 self.parent.set_committed(False) 416 417 @synchronized 418 def __iter__(self) -> Iterator[str]: 419 """Iterate names of streams and files in this collection 420 421 This method does not recurse. It only iterates the contents of this 422 collection's corresponding stream. 423 """ 424 return iter(self._items) 425 426 @synchronized 427 def __getitem__(self, k: str) -> CollectionItem: 428 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 429 430 This method does not recurse. If you want to search a path, use 431 `RichCollectionBase.find` instead. 432 """ 433 return self._items[k] 434 435 @synchronized 436 def __contains__(self, k: str) -> bool: 437 """Indicate whether this collection has an item with this name 438 439 This method does not recurse. It you want to check a path, use 440 `RichCollectionBase.exists` instead. 441 """ 442 return k in self._items 443 444 @synchronized 445 def __len__(self): 446 """Get the number of items directly contained in this collection 447 448 This method does not recurse. It only counts the streams and files 449 in this collection's corresponding stream. 450 """ 451 return len(self._items) 452 453 @must_be_writable 454 @synchronized 455 def __delitem__(self, p: str) -> None: 456 """Delete an item from this collection's stream 457 458 This method does not recurse. If you want to remove an item by a 459 path, use `RichCollectionBase.remove` instead. 460 """ 461 del self._items[p] 462 self.set_committed(False) 463 self.notify(DEL, self, p, None) 464 465 @synchronized 466 def keys(self) -> Iterator[str]: 467 """Iterate names of streams and files in this collection 468 469 This method does not recurse. It only iterates the contents of this 470 collection's corresponding stream. 471 """ 472 return self._items.keys() 473 474 @synchronized 475 def values(self) -> List[CollectionItem]: 476 """Get a list of objects in this collection's stream 477 478 The return value includes a `Subcollection` for every stream, and an 479 `arvados.arvfile.ArvadosFile` for every file, directly within this 480 collection's stream. This method does not recurse. 481 """ 482 return list(self._items.values()) 483 484 @synchronized 485 def items(self) -> List[Tuple[str, CollectionItem]]: 486 """Get a list of `(name, object)` tuples from this collection's stream 487 488 The return value includes a `Subcollection` for every stream, and an 489 `arvados.arvfile.ArvadosFile` for every file, directly within this 490 collection's stream. This method does not recurse. 491 """ 492 return list(self._items.items()) 493 494 def exists(self, path: str) -> bool: 495 """Indicate whether this collection includes an item at `path` 496 497 This method returns `True` if `path` refers to a stream or file within 498 this collection, else `False`. 499 500 Arguments: 501 502 * path: str --- The path to check for existence within this collection 503 """ 504 return self.find(path) is not None 505 506 @must_be_writable 507 @synchronized 508 def remove(self, path: str, recursive: bool=False) -> None: 509 """Remove the file or stream at `path` 510 511 Arguments: 512 513 * path: str --- The path of the item to remove from the collection 514 515 * recursive: bool --- Controls the method's behavior if `path` refers 516 to a nonempty stream. If `False` (the default), this method raises 517 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 518 items under the stream. 519 """ 520 if not path: 521 raise errors.ArgumentError("Parameter 'path' is empty.") 522 523 pathcomponents = path.split("/", 1) 524 item = self._items.get(pathcomponents[0]) 525 if item is None: 526 raise IOError(errno.ENOENT, "File not found", path) 527 if len(pathcomponents) == 1: 528 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 529 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 530 deleteditem = self._items[pathcomponents[0]] 531 del self._items[pathcomponents[0]] 532 self.set_committed(False) 533 self.notify(DEL, self, pathcomponents[0], deleteditem) 534 else: 535 item.remove(pathcomponents[1], recursive=recursive) 536 537 def _clonefrom(self, source): 538 for k,v in source.items(): 539 self._items[k] = v.clone(self, k) 540 541 def clone(self): 542 raise NotImplementedError() 543 544 @must_be_writable 545 @synchronized 546 def add( 547 self, 548 source_obj: CollectionItem, 549 target_name: str, 550 overwrite: bool=False, 551 reparent: bool=False, 552 ) -> None: 553 """Copy or move a file or subcollection object to this collection 554 555 Arguments: 556 557 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 558 to add to this collection 559 560 * target_name: str --- The path inside this collection where 561 `source_obj` should be added. 562 563 * overwrite: bool --- Controls the behavior of this method when the 564 collection already contains an object at `target_name`. If `False` 565 (the default), this method will raise `FileExistsError`. If `True`, 566 the object at `target_name` will be replaced with `source_obj`. 567 568 * reparent: bool --- Controls whether this method copies or moves 569 `source_obj`. If `False` (the default), `source_obj` is copied into 570 this collection. If `True`, `source_obj` is moved into this 571 collection. 572 """ 573 if target_name in self and not overwrite: 574 raise IOError(errno.EEXIST, "File already exists", target_name) 575 576 modified_from = None 577 if target_name in self: 578 modified_from = self[target_name] 579 580 # Actually make the move or copy. 581 if reparent: 582 source_obj._reparent(self, target_name) 583 item = source_obj 584 else: 585 item = source_obj.clone(self, target_name) 586 587 self._items[target_name] = item 588 self.set_committed(False) 589 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 590 self.set_has_remote_blocks(True) 591 592 if modified_from: 593 self.notify(MOD, self, target_name, (modified_from, item)) 594 else: 595 self.notify(ADD, self, target_name, item) 596 597 def _get_src_target(self, source, target_path, source_collection, create_dest): 598 if source_collection is None: 599 source_collection = self 600 601 # Find the object 602 if isinstance(source, str): 603 source_obj = source_collection.find(source) 604 if source_obj is None: 605 raise IOError(errno.ENOENT, "File not found", source) 606 sourcecomponents = source.split("/") 607 else: 608 source_obj = source 609 sourcecomponents = None 610 611 # Find parent collection the target path 612 targetcomponents = target_path.split("/") 613 614 # Determine the name to use. 615 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 616 617 if not target_name: 618 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 619 620 if create_dest: 621 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 622 else: 623 if len(targetcomponents) > 1: 624 target_dir = self.find("/".join(targetcomponents[0:-1])) 625 else: 626 target_dir = self 627 628 if target_dir is None: 629 raise IOError(errno.ENOENT, "Target directory not found", target_name) 630 631 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 632 target_dir = target_dir[target_name] 633 target_name = sourcecomponents[-1] 634 635 return (source_obj, target_dir, target_name) 636 637 @must_be_writable 638 @synchronized 639 def copy( 640 self, 641 source: Union[str, CollectionItem], 642 target_path: str, 643 source_collection: Optional['RichCollectionBase']=None, 644 overwrite: bool=False, 645 ) -> None: 646 """Copy a file or subcollection object to this collection 647 648 Arguments: 649 650 * source: str | arvados.arvfile.ArvadosFile | 651 arvados.collection.Subcollection --- The file or subcollection to 652 add to this collection. If `source` is a str, the object will be 653 found by looking up this path from `source_collection` (see 654 below). 655 656 * target_path: str --- The path inside this collection where the 657 source object should be added. 658 659 * source_collection: arvados.collection.Collection | None --- The 660 collection to find the source object from when `source` is a 661 path. Defaults to the current collection (`self`). 662 663 * overwrite: bool --- Controls the behavior of this method when the 664 collection already contains an object at `target_path`. If `False` 665 (the default), this method will raise `FileExistsError`. If `True`, 666 the object at `target_path` will be replaced with `source_obj`. 667 """ 668 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 669 target_dir.add(source_obj, target_name, overwrite, False) 670 671 @must_be_writable 672 @synchronized 673 def rename( 674 self, 675 source: Union[str, CollectionItem], 676 target_path: str, 677 source_collection: Optional['RichCollectionBase']=None, 678 overwrite: bool=False, 679 ) -> None: 680 """Move a file or subcollection object to this collection 681 682 Arguments: 683 684 * source: str | arvados.arvfile.ArvadosFile | 685 arvados.collection.Subcollection --- The file or subcollection to 686 add to this collection. If `source` is a str, the object will be 687 found by looking up this path from `source_collection` (see 688 below). 689 690 * target_path: str --- The path inside this collection where the 691 source object should be added. 692 693 * source_collection: arvados.collection.Collection | None --- The 694 collection to find the source object from when `source` is a 695 path. Defaults to the current collection (`self`). 696 697 * overwrite: bool --- Controls the behavior of this method when the 698 collection already contains an object at `target_path`. If `False` 699 (the default), this method will raise `FileExistsError`. If `True`, 700 the object at `target_path` will be replaced with `source_obj`. 701 """ 702 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 703 if not source_obj.writable(): 704 raise IOError(errno.EROFS, "Source collection is read only", source) 705 target_dir.add(source_obj, target_name, overwrite, True) 706 707 def portable_manifest_text(self, stream_name: str=".") -> str: 708 """Get the portable manifest text for this collection 709 710 The portable manifest text is normalized, and does not include access 711 tokens. This method does not flush outstanding blocks to Keep. 712 713 Arguments: 714 715 * stream_name: str --- The name to use for this collection's stream in 716 the generated manifest. Default `'.'`. 717 """ 718 return self._get_manifest_text(stream_name, True, True) 719 720 @synchronized 721 def manifest_text( 722 self, 723 stream_name: str=".", 724 strip: bool=False, 725 normalize: bool=False, 726 only_committed: bool=False, 727 ) -> str: 728 """Get the manifest text for this collection 729 730 Arguments: 731 732 * stream_name: str --- The name to use for this collection's stream in 733 the generated manifest. Default `'.'`. 734 735 * strip: bool --- Controls whether or not the returned manifest text 736 includes access tokens. If `False` (the default), the manifest text 737 will include access tokens. If `True`, the manifest text will not 738 include access tokens. 739 740 * normalize: bool --- Controls whether or not the returned manifest 741 text is normalized. Default `False`. 742 743 * only_committed: bool --- Controls whether or not this method uploads 744 pending data to Keep before building and returning the manifest text. 745 If `False` (the default), this method will finish uploading all data 746 to Keep, then return the final manifest. If `True`, this method will 747 build and return a manifest that only refers to the data that has 748 finished uploading at the time this method was called. 749 """ 750 if not only_committed: 751 self._my_block_manager().commit_all() 752 return self._get_manifest_text(stream_name, strip, normalize, 753 only_committed=only_committed) 754 755 @synchronized 756 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 757 """Get the manifest text for this collection, sub collections and files. 758 759 :stream_name: 760 Name to use for this stream (directory) 761 762 :strip: 763 If True, remove signing tokens from block locators if present. 764 If False (default), block locators are left unchanged. 765 766 :normalize: 767 If True, always export the manifest text in normalized form 768 even if the Collection is not modified. If False (default) and the collection 769 is not modified, return the original manifest text even if it is not 770 in normalized form. 771 772 :only_committed: 773 If True, only include blocks that were already committed to Keep. 774 775 """ 776 777 if not self.committed() or self._manifest_text is None or normalize: 778 stream = {} 779 buf = [] 780 sorted_keys = sorted(self.keys()) 781 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 782 # Create a stream per file `k` 783 arvfile = self[filename] 784 filestream = [] 785 for segment in arvfile.segments(): 786 loc = segment.locator 787 if arvfile.parent._my_block_manager().is_bufferblock(loc): 788 if only_committed: 789 continue 790 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 791 if strip: 792 loc = KeepLocator(loc).stripped() 793 filestream.append(streams.LocatorAndRange( 794 loc, 795 KeepLocator(loc).size, 796 segment.segment_offset, 797 segment.range_size, 798 )) 799 stream[filename] = filestream 800 if stream: 801 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n") 802 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 803 buf.append(self[dirname].manifest_text( 804 stream_name=os.path.join(stream_name, dirname), 805 strip=strip, normalize=True, only_committed=only_committed)) 806 return "".join(buf) 807 else: 808 if strip: 809 return self.stripped_manifest() 810 else: 811 return self._manifest_text 812 813 @synchronized 814 def _copy_remote_blocks(self, remote_blocks={}): 815 """Scan through the entire collection and ask Keep to copy remote blocks. 816 817 When accessing a remote collection, blocks will have a remote signature 818 (+R instead of +A). Collect these signatures and request Keep to copy the 819 blocks to the local cluster, returning local (+A) signatures. 820 821 :remote_blocks: 822 Shared cache of remote to local block mappings. This is used to avoid 823 doing extra work when blocks are shared by more than one file in 824 different subdirectories. 825 826 """ 827 for item in self: 828 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 829 return remote_blocks 830 831 @synchronized 832 def diff( 833 self, 834 end_collection: 'RichCollectionBase', 835 prefix: str=".", 836 holding_collection: Optional['Collection']=None, 837 ) -> ChangeList: 838 """Build a list of differences between this collection and another 839 840 Arguments: 841 842 * end_collection: arvados.collection.RichCollectionBase --- A 843 collection object with the desired end state. The returned diff 844 list will describe how to go from the current collection object 845 `self` to `end_collection`. 846 847 * prefix: str --- The name to use for this collection's stream in 848 the diff list. Default `'.'`. 849 850 * holding_collection: arvados.collection.Collection | None --- A 851 collection object used to hold objects for the returned diff 852 list. By default, a new empty collection is created. 853 """ 854 changes = [] 855 if holding_collection is None: 856 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 857 for k in self: 858 if k not in end_collection: 859 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 860 for k in end_collection: 861 if k in self: 862 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 863 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 864 elif end_collection[k] != self[k]: 865 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 866 else: 867 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 868 else: 869 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 870 return changes 871 872 @must_be_writable 873 @synchronized 874 def apply(self, changes: ChangeList) -> None: 875 """Apply a list of changes from to this collection 876 877 This method takes a list of changes generated by 878 `RichCollectionBase.diff` and applies it to this 879 collection. Afterward, the state of this collection object will 880 match the state of `end_collection` passed to `diff`. If a change 881 conflicts with a local change, it will be saved to an alternate path 882 indicating the conflict. 883 884 Arguments: 885 886 * changes: arvados.collection.ChangeList --- The list of differences 887 generated by `RichCollectionBase.diff`. 888 """ 889 if changes: 890 self.set_committed(False) 891 for change in changes: 892 event_type = change[0] 893 path = change[1] 894 initial = change[2] 895 local = self.find(path) 896 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 897 time.gmtime())) 898 if event_type == ADD: 899 if local is None: 900 # No local file at path, safe to copy over new file 901 self.copy(initial, path) 902 elif local is not None and local != initial: 903 # There is already local file and it is different: 904 # save change to conflict file. 905 self.copy(initial, conflictpath) 906 elif event_type == MOD or event_type == TOK: 907 final = change[3] 908 if local == initial: 909 # Local matches the "initial" item so it has not 910 # changed locally and is safe to update. 911 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 912 # Replace contents of local file with new contents 913 local.replace_contents(final) 914 else: 915 # Overwrite path with new item; this can happen if 916 # path was a file and is now a collection or vice versa 917 self.copy(final, path, overwrite=True) 918 elif event_type == MOD: 919 # Local doesn't match the "start" value or local 920 # is missing (presumably deleted) so save change 921 # to conflict file. Don't do this for TOK events 922 # which means the file didn't change but only had 923 # tokens updated. 924 self.copy(final, conflictpath) 925 elif event_type == DEL: 926 if local == initial: 927 # Local item matches "initial" value, so it is safe to remove. 928 self.remove(path, recursive=True) 929 # else, the file is modified or already removed, in either 930 # case we don't want to try to remove it. 931 932 def portable_data_hash(self) -> str: 933 """Get the portable data hash for this collection's manifest""" 934 if self._manifest_locator and self.committed(): 935 # If the collection is already saved on the API server, and it's committed 936 # then return API server's PDH response. 937 return self._portable_data_hash 938 else: 939 stripped = self.portable_manifest_text().encode() 940 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 941 942 @synchronized 943 def subscribe(self, callback: ChangeCallback) -> None: 944 """Set a notify callback for changes to this collection 945 946 Arguments: 947 948 * callback: arvados.collection.ChangeCallback --- The callable to 949 call each time the collection is changed. 950 """ 951 if self._callback is None: 952 self._callback = callback 953 else: 954 raise errors.ArgumentError("A callback is already set on this collection.") 955 956 @synchronized 957 def unsubscribe(self) -> None: 958 """Remove any notify callback set for changes to this collection""" 959 if self._callback is not None: 960 self._callback = None 961 962 @synchronized 963 def notify( 964 self, 965 event: ChangeType, 966 collection: 'RichCollectionBase', 967 name: str, 968 item: CollectionItem, 969 ) -> None: 970 """Notify any subscribed callback about a change to this collection 971 972 .. ATTENTION:: Internal 973 This method is only meant to be used by other Collection methods. 974 975 If a callback has been registered with `RichCollectionBase.subscribe`, 976 it will be called with information about a change to this collection. 977 Then this notification will be propagated to this collection's root. 978 979 Arguments: 980 981 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 982 the collection. 983 984 * collection: arvados.collection.RichCollectionBase --- The 985 collection that was modified. 986 987 * name: str --- The name of the file or stream within `collection` that 988 was modified. 989 990 * item: arvados.arvfile.ArvadosFile | 991 arvados.collection.Subcollection --- For ADD events, the new 992 contents at `name` within `collection`; for DEL events, the 993 item that was removed. For MOD and TOK events, a 2-tuple of 994 the previous item and the new item (may be the same object 995 or different, depending on whether the action involved it 996 being modified in place or replaced). 997 998 """ 999 if self._callback: 1000 self._callback(event, collection, name, item) 1001 self.root_collection().notify(event, collection, name, item) 1002 1003 @synchronized 1004 def __eq__(self, other: Any) -> bool: 1005 """Indicate whether this collection object is equal to another""" 1006 if other is self: 1007 return True 1008 if not isinstance(other, RichCollectionBase): 1009 return False 1010 if len(self._items) != len(other): 1011 return False 1012 for k in self._items: 1013 if k not in other: 1014 return False 1015 if self._items[k] != other[k]: 1016 return False 1017 return True 1018 1019 def __ne__(self, other: Any) -> bool: 1020 """Indicate whether this collection object is not equal to another""" 1021 return not self.__eq__(other) 1022 1023 @synchronized 1024 def flush(self) -> None: 1025 """Upload any pending data to Keep""" 1026 for e in self.values(): 1027 e.flush() 1028 1029 1030class Collection(RichCollectionBase): 1031 """Read and manipulate an Arvados collection 1032 1033 This class provides a high-level interface to create, read, and update 1034 Arvados collections and their contents. Refer to the Arvados Python SDK 1035 cookbook for [an introduction to using the Collection class][cookbook]. 1036 1037 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1038 """ 1039 1040 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1041 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1042 keep_client: Optional['arvados.keep.KeepClient']=None, 1043 num_retries: int=10, 1044 parent: Optional['Collection']=None, 1045 apiconfig: Optional[Mapping[str, str]]=None, 1046 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1047 replication_desired: Optional[int]=None, 1048 storage_classes_desired: Optional[List[str]]=None, 1049 put_threads: Optional[int]=None): 1050 """Initialize a Collection object 1051 1052 Arguments: 1053 1054 * manifest_locator_or_text: str | None --- This string can contain a 1055 collection manifest text, portable data hash, or UUID. When given a 1056 portable data hash or UUID, this instance will load a collection 1057 record from the API server. Otherwise, this instance will represent a 1058 new collection without an API server record. The default value `None` 1059 instantiates a new collection with an empty manifest. 1060 1061 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1062 Arvados API client object this instance uses to make requests. If 1063 none is given, this instance creates its own client using the 1064 settings from `apiconfig` (see below). If your client instantiates 1065 many Collection objects, you can help limit memory utilization by 1066 calling `arvados.api.api` to construct an 1067 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client` 1068 for every Collection. 1069 1070 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1071 object this instance uses to make requests. If none is given, this 1072 instance creates its own client using its `api_client`. 1073 1074 * num_retries: int --- The number of times that client requests are 1075 retried. Default 10. 1076 1077 * parent: arvados.collection.Collection | None --- The parent Collection 1078 object of this instance, if any. This argument is primarily used by 1079 other Collection methods; user client code shouldn't need to use it. 1080 1081 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1082 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1083 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1084 Collection object constructs one from these settings. If no 1085 mapping is provided, calls `arvados.config.settings` to get these 1086 parameters from user configuration. 1087 1088 * block_manager: arvados.arvfile._BlockManager | None --- The 1089 _BlockManager object used by this instance to coordinate reading 1090 and writing Keep data blocks. If none is given, this instance 1091 constructs its own. This argument is primarily used by other 1092 Collection methods; user client code shouldn't need to use it. 1093 1094 * replication_desired: int | None --- This controls both the value of 1095 the `replication_desired` field on API collection records saved by 1096 this class, as well as the number of Keep services that the object 1097 writes new data blocks to. If none is given, uses the default value 1098 configured for the cluster. 1099 1100 * storage_classes_desired: list[str] | None --- This controls both 1101 the value of the `storage_classes_desired` field on API collection 1102 records saved by this class, as well as selecting which specific 1103 Keep services the object writes new data blocks to. If none is 1104 given, defaults to an empty list. 1105 1106 * put_threads: int | None --- The number of threads to run 1107 simultaneously to upload data blocks to Keep. This value is used when 1108 building a new `block_manager`. It is unused when a `block_manager` 1109 is provided. 1110 """ 1111 1112 if storage_classes_desired and type(storage_classes_desired) is not list: 1113 raise errors.ArgumentError("storage_classes_desired must be list type.") 1114 1115 super(Collection, self).__init__(parent) 1116 self._api_client = api_client 1117 self._keep_client = keep_client 1118 1119 # Use the keep client from ThreadSafeAPIClient 1120 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient): 1121 self._keep_client = self._api_client.keep 1122 1123 self._block_manager = block_manager 1124 self.replication_desired = replication_desired 1125 self._storage_classes_desired = storage_classes_desired 1126 self.put_threads = put_threads 1127 1128 if apiconfig: 1129 self._config = apiconfig 1130 else: 1131 self._config = config.settings() 1132 1133 self.num_retries = num_retries 1134 self._manifest_locator = None 1135 self._manifest_text = None 1136 self._portable_data_hash = None 1137 self._api_response = None 1138 self._token_refresh_timestamp = 0 1139 1140 self.lock = threading.RLock() 1141 self.events = None 1142 1143 if manifest_locator_or_text: 1144 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1145 self._manifest_locator = manifest_locator_or_text 1146 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1147 self._manifest_locator = manifest_locator_or_text 1148 if not self._has_local_collection_uuid(): 1149 self._has_remote_blocks = True 1150 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1151 self._manifest_text = manifest_locator_or_text 1152 if '+R' in self._manifest_text: 1153 self._has_remote_blocks = True 1154 else: 1155 raise errors.ArgumentError( 1156 "Argument to CollectionReader is not a manifest or a collection UUID") 1157 1158 try: 1159 self._populate() 1160 except errors.SyntaxError as e: 1161 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1162 1163 def storage_classes_desired(self) -> List[str]: 1164 """Get this collection's `storage_classes_desired` value""" 1165 return self._storage_classes_desired or [] 1166 1167 def root_collection(self) -> 'Collection': 1168 return self 1169 1170 def get_properties(self) -> Properties: 1171 """Get this collection's properties 1172 1173 This method always returns a dict. If this collection object does not 1174 have an associated API record, or that record does not have any 1175 properties set, this method returns an empty dict. 1176 """ 1177 if self._api_response and self._api_response["properties"]: 1178 return self._api_response["properties"] 1179 else: 1180 return {} 1181 1182 def get_trash_at(self) -> Optional[datetime.datetime]: 1183 """Get this collection's `trash_at` field 1184 1185 This method parses the `trash_at` field of the collection's API 1186 record and returns a datetime from it. If that field is not set, or 1187 this collection object does not have an associated API record, 1188 returns None. 1189 """ 1190 if self._api_response and self._api_response["trash_at"]: 1191 try: 1192 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1193 except ValueError: 1194 return None 1195 else: 1196 return None 1197 1198 def stream_name(self) -> str: 1199 return "." 1200 1201 def writable(self) -> bool: 1202 return True 1203 1204 @synchronized 1205 @retry_method 1206 def update( 1207 self, 1208 other: Optional['Collection']=None, 1209 num_retries: Optional[int]=None, 1210 ) -> None: 1211 """Merge another collection's contents into this one 1212 1213 This method compares the manifest of this collection instance with 1214 another, then updates this instance's manifest with changes from the 1215 other, renaming files to flag conflicts where necessary. 1216 1217 When called without any arguments, this method reloads the collection's 1218 API record, and updates this instance with any changes that have 1219 appeared server-side. If this instance does not have a corresponding 1220 API record, this method raises `arvados.errors.ArgumentError`. 1221 1222 Arguments: 1223 1224 * other: arvados.collection.Collection | None --- The collection 1225 whose contents should be merged into this instance. When not 1226 provided, this method reloads this collection's API record and 1227 constructs a Collection object from it. If this instance does not 1228 have a corresponding API record, this method raises 1229 `arvados.errors.ArgumentError`. 1230 1231 * num_retries: int | None --- The number of times to retry reloading 1232 the collection's API record from the API server. If not specified, 1233 uses the `num_retries` provided when this instance was constructed. 1234 """ 1235 1236 token_refresh_period = 60*60 1237 time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp) 1238 upstream_response = None 1239 1240 if other is None: 1241 if self._manifest_locator is None: 1242 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1243 1244 if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period: 1245 return 1246 1247 upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1248 other = CollectionReader(upstream_response["manifest_text"]) 1249 1250 if self.committed(): 1251 # 1st case, no local changes, content is the same 1252 if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period: 1253 # No difference in content. Remember the API record 1254 # (metadata such as name or properties may have changed) 1255 # but don't update the token refresh timestamp. 1256 if upstream_response is not None: 1257 self._remember_api_response(upstream_response) 1258 return 1259 1260 # 2nd case, no local changes, but either upstream changed 1261 # or we want to refresh tokens. 1262 1263 self.apply(self.diff(other)) 1264 if upstream_response is not None: 1265 self._remember_api_response(upstream_response) 1266 self._update_token_timestamp() 1267 self.set_committed(True) 1268 return 1269 1270 # 3rd case, upstream changed, but we also have uncommitted 1271 # changes that we want to incorporate so they don't get lost. 1272 1273 # _manifest_text stores the text from last time we received a 1274 # record from the API server. This is the state of the 1275 # collection before our uncommitted changes. 1276 baseline = Collection(self._manifest_text) 1277 1278 # Get the set of changes between our baseline and the other 1279 # collection and apply them to self. 1280 # 1281 # If a file was modified in both 'self' and 'other', the 1282 # 'apply' method keeps the contents of 'self' and creates a 1283 # conflict file with the contents of 'other'. 1284 self.apply(baseline.diff(other)) 1285 1286 # Remember the new baseline, changes to a file 1287 if upstream_response is not None: 1288 self._remember_api_response(upstream_response) 1289 1290 1291 @synchronized 1292 def _my_api(self): 1293 if self._api_client is None: 1294 self._api_client = ThreadSafeAPIClient(self._config, version='v1') 1295 if self._keep_client is None: 1296 self._keep_client = self._api_client.keep 1297 return self._api_client 1298 1299 @synchronized 1300 def _my_keep(self): 1301 if self._keep_client is None: 1302 if self._api_client is None: 1303 self._my_api() 1304 else: 1305 self._keep_client = KeepClient(api_client=self._api_client) 1306 return self._keep_client 1307 1308 @synchronized 1309 def _my_block_manager(self): 1310 if self._block_manager is None: 1311 copies = (self.replication_desired or 1312 self._my_api()._rootDesc.get('defaultCollectionReplication', 1313 2)) 1314 self._block_manager = _BlockManager(self._my_keep(), 1315 copies=copies, 1316 put_threads=self.put_threads, 1317 num_retries=self.num_retries, 1318 storage_classes_func=self.storage_classes_desired) 1319 return self._block_manager 1320 1321 def _remember_api_response(self, response): 1322 self._api_response = response 1323 self._manifest_text = self._api_response['manifest_text'] 1324 self._portable_data_hash = self._api_response['portable_data_hash'] 1325 1326 def _update_token_timestamp(self): 1327 self._token_refresh_timestamp = time.time() 1328 1329 def _populate_from_api_server(self): 1330 # As in KeepClient itself, we must wait until the last 1331 # possible moment to instantiate an API client, in order to 1332 # avoid tripping up clients that don't have access to an API 1333 # server. If we do build one, make sure our Keep client uses 1334 # it. If instantiation fails, we'll fall back to the except 1335 # clause, just like any other Collection lookup 1336 # failure. Return an exception, or None if successful. 1337 self._remember_api_response(self._my_api().collections().get( 1338 uuid=self._manifest_locator).execute( 1339 num_retries=self.num_retries)) 1340 1341 # If not overriden via kwargs, we should try to load the 1342 # replication_desired and storage_classes_desired from the API server 1343 if self.replication_desired is None: 1344 self.replication_desired = self._api_response.get('replication_desired', None) 1345 if self._storage_classes_desired is None: 1346 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1347 1348 def _populate(self): 1349 if self._manifest_text is None: 1350 if self._manifest_locator is None: 1351 return 1352 else: 1353 self._populate_from_api_server() 1354 self._baseline_manifest = self._manifest_text 1355 self._import_manifest(self._manifest_text) 1356 1357 def _has_collection_uuid(self): 1358 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1359 1360 def _has_local_collection_uuid(self): 1361 return self._has_collection_uuid and \ 1362 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1363 1364 def __enter__(self): 1365 return self 1366 1367 def __exit__(self, exc_type, exc_value, traceback): 1368 """Exit a context with this collection instance 1369 1370 If no exception was raised inside the context block, and this 1371 collection is writable and has a corresponding API record, that 1372 record will be updated to match the state of this instance at the end 1373 of the block. 1374 """ 1375 if exc_type is None: 1376 if self.writable() and self._has_collection_uuid(): 1377 self.save() 1378 self.stop_threads() 1379 1380 def stop_threads(self) -> None: 1381 """Stop background Keep upload/download threads""" 1382 if self._block_manager is not None: 1383 self._block_manager.stop_threads() 1384 1385 @synchronized 1386 def manifest_locator(self) -> Optional[str]: 1387 """Get this collection's manifest locator, if any 1388 1389 * If this collection instance is associated with an API record with a 1390 UUID, return that. 1391 * Otherwise, if this collection instance was loaded from an API record 1392 by portable data hash, return that. 1393 * Otherwise, return `None`. 1394 """ 1395 return self._manifest_locator 1396 1397 @synchronized 1398 def clone( 1399 self, 1400 new_parent: Optional['Collection']=None, 1401 new_name: Optional[str]=None, 1402 readonly: bool=False, 1403 new_config: Optional[Mapping[str, str]]=None, 1404 ) -> 'Collection': 1405 """Create a Collection object with the same contents as this instance 1406 1407 This method creates a new Collection object with contents that match 1408 this instance's. The new collection will not be associated with any API 1409 record. 1410 1411 Arguments: 1412 1413 * new_parent: arvados.collection.Collection | None --- This value is 1414 passed to the new Collection's constructor as the `parent` 1415 argument. 1416 1417 * new_name: str | None --- This value is unused. 1418 1419 * readonly: bool --- If this value is true, this method constructs and 1420 returns a `CollectionReader`. Otherwise, it returns a mutable 1421 `Collection`. Default `False`. 1422 1423 * new_config: Mapping[str, str] | None --- This value is passed to the 1424 new Collection's constructor as `apiconfig`. If no value is provided, 1425 defaults to the configuration passed to this instance's constructor. 1426 """ 1427 if new_config is None: 1428 new_config = self._config 1429 if readonly: 1430 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1431 else: 1432 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1433 1434 newcollection._clonefrom(self) 1435 return newcollection 1436 1437 @synchronized 1438 def api_response(self) -> Optional[Dict[str, Any]]: 1439 """Get this instance's associated API record 1440 1441 If this Collection instance has an associated API record, return it. 1442 Otherwise, return `None`. 1443 """ 1444 return self._api_response 1445 1446 def find_or_create( 1447 self, 1448 path: str, 1449 create_type: CreateType, 1450 ) -> CollectionItem: 1451 if path == ".": 1452 return self 1453 else: 1454 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1455 1456 def find(self, path: str) -> CollectionItem: 1457 if path == ".": 1458 return self 1459 else: 1460 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1461 1462 def remove(self, path: str, recursive: bool=False) -> None: 1463 if path == ".": 1464 raise errors.ArgumentError("Cannot remove '.'") 1465 else: 1466 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1467 1468 @must_be_writable 1469 @synchronized 1470 @retry_method 1471 def save( 1472 self, 1473 properties: Optional[Properties]=None, 1474 storage_classes: Optional[StorageClasses]=None, 1475 trash_at: Optional[datetime.datetime]=None, 1476 merge: bool=True, 1477 num_retries: Optional[int]=None, 1478 preserve_version: bool=False, 1479 ) -> str: 1480 """Save collection to an existing API record 1481 1482 This method updates the instance's corresponding API record to match 1483 the instance's state. If this instance does not have a corresponding API 1484 record yet, raises `AssertionError`. (To create a new API record, use 1485 `Collection.save_new`.) This method returns the saved collection 1486 manifest. 1487 1488 Arguments: 1489 1490 * properties: dict[str, Any] | None --- If provided, the API record will 1491 be updated with these properties. Note this will completely replace 1492 any existing properties. 1493 1494 * storage_classes: list[str] | None --- If provided, the API record will 1495 be updated with this value in the `storage_classes_desired` field. 1496 This value will also be saved on the instance and used for any 1497 changes that follow. 1498 1499 * trash_at: datetime.datetime | None --- If provided, the API record 1500 will be updated with this value in the `trash_at` field. 1501 1502 * merge: bool --- If `True` (the default), this method will first 1503 reload this collection's API record, and merge any new contents into 1504 this instance before saving changes. See `Collection.update` for 1505 details. 1506 1507 * num_retries: int | None --- The number of times to retry reloading 1508 the collection's API record from the API server. If not specified, 1509 uses the `num_retries` provided when this instance was constructed. 1510 1511 * preserve_version: bool --- This value will be passed to directly 1512 to the underlying API call. If `True`, the Arvados API will 1513 preserve the versions of this collection both immediately before 1514 and after the update. If `True` when the API server is not 1515 configured with collection versioning, this method raises 1516 `arvados.errors.ArgumentError`. 1517 """ 1518 if properties and type(properties) is not dict: 1519 raise errors.ArgumentError("properties must be dictionary type.") 1520 1521 if storage_classes and type(storage_classes) is not list: 1522 raise errors.ArgumentError("storage_classes must be list type.") 1523 if storage_classes: 1524 self._storage_classes_desired = storage_classes 1525 1526 if trash_at and type(trash_at) is not datetime.datetime: 1527 raise errors.ArgumentError("trash_at must be datetime type.") 1528 1529 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1530 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1531 1532 body={} 1533 if properties: 1534 body["properties"] = properties 1535 if self.storage_classes_desired(): 1536 body["storage_classes_desired"] = self.storage_classes_desired() 1537 if trash_at: 1538 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1539 body["trash_at"] = t 1540 if preserve_version: 1541 body["preserve_version"] = preserve_version 1542 1543 if not self.committed(): 1544 if self._has_remote_blocks: 1545 # Copy any remote blocks to the local cluster. 1546 self._copy_remote_blocks(remote_blocks={}) 1547 self._has_remote_blocks = False 1548 if not self._has_collection_uuid(): 1549 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1550 elif not self._has_local_collection_uuid(): 1551 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1552 1553 self._my_block_manager().commit_all() 1554 1555 if merge: 1556 self.update() 1557 1558 text = self.manifest_text(strip=False) 1559 body['manifest_text'] = text 1560 1561 self._remember_api_response(self._my_api().collections().update( 1562 uuid=self._manifest_locator, 1563 body=body 1564 ).execute(num_retries=num_retries)) 1565 self.set_committed(True) 1566 elif body: 1567 self._remember_api_response(self._my_api().collections().update( 1568 uuid=self._manifest_locator, 1569 body=body 1570 ).execute(num_retries=num_retries)) 1571 1572 return self._manifest_text 1573 1574 1575 @must_be_writable 1576 @synchronized 1577 @retry_method 1578 def save_new( 1579 self, 1580 name: Optional[str]=None, 1581 create_collection_record: bool=True, 1582 owner_uuid: Optional[str]=None, 1583 properties: Optional[Properties]=None, 1584 storage_classes: Optional[StorageClasses]=None, 1585 trash_at: Optional[datetime.datetime]=None, 1586 ensure_unique_name: bool=False, 1587 num_retries: Optional[int]=None, 1588 preserve_version: bool=False, 1589 ): 1590 """Save collection to a new API record 1591 1592 This method finishes uploading new data blocks and (optionally) 1593 creates a new API collection record with the provided data. If a new 1594 record is created, this instance becomes associated with that record 1595 for future updates like `save()`. This method returns the saved 1596 collection manifest. 1597 1598 Arguments: 1599 1600 * name: str | None --- The `name` field to use on the new collection 1601 record. If not specified, a generic default name is generated. 1602 1603 * create_collection_record: bool --- If `True` (the default), creates a 1604 collection record on the API server. If `False`, the method finishes 1605 all data uploads and only returns the resulting collection manifest 1606 without sending it to the API server. 1607 1608 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1609 new collection record. 1610 1611 * properties: dict[str, Any] | None --- The `properties` field to use on 1612 the new collection record. 1613 1614 * storage_classes: list[str] | None --- The 1615 `storage_classes_desired` field to use on the new collection record. 1616 1617 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1618 on the new collection record. 1619 1620 * ensure_unique_name: bool --- This value is passed directly to the 1621 Arvados API when creating the collection record. If `True`, the API 1622 server may modify the submitted `name` to ensure the collection's 1623 `name`+`owner_uuid` combination is unique. If `False` (the default), 1624 if a collection already exists with this same `name`+`owner_uuid` 1625 combination, creating a collection record will raise a validation 1626 error. 1627 1628 * num_retries: int | None --- The number of times to retry reloading 1629 the collection's API record from the API server. If not specified, 1630 uses the `num_retries` provided when this instance was constructed. 1631 1632 * preserve_version: bool --- This value will be passed to directly 1633 to the underlying API call. If `True`, the Arvados API will 1634 preserve the versions of this collection both immediately before 1635 and after the update. If `True` when the API server is not 1636 configured with collection versioning, this method raises 1637 `arvados.errors.ArgumentError`. 1638 """ 1639 if properties and type(properties) is not dict: 1640 raise errors.ArgumentError("properties must be dictionary type.") 1641 1642 if storage_classes and type(storage_classes) is not list: 1643 raise errors.ArgumentError("storage_classes must be list type.") 1644 1645 if trash_at and type(trash_at) is not datetime.datetime: 1646 raise errors.ArgumentError("trash_at must be datetime type.") 1647 1648 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1649 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1650 1651 if self._has_remote_blocks: 1652 # Copy any remote blocks to the local cluster. 1653 self._copy_remote_blocks(remote_blocks={}) 1654 self._has_remote_blocks = False 1655 1656 if storage_classes: 1657 self._storage_classes_desired = storage_classes 1658 1659 self._my_block_manager().commit_all() 1660 text = self.manifest_text(strip=False) 1661 1662 if create_collection_record: 1663 if name is None: 1664 name = "New collection" 1665 ensure_unique_name = True 1666 1667 body = {"manifest_text": text, 1668 "name": name, 1669 "replication_desired": self.replication_desired} 1670 if owner_uuid: 1671 body["owner_uuid"] = owner_uuid 1672 if properties: 1673 body["properties"] = properties 1674 if self.storage_classes_desired(): 1675 body["storage_classes_desired"] = self.storage_classes_desired() 1676 if trash_at: 1677 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1678 body["trash_at"] = t 1679 if preserve_version: 1680 body["preserve_version"] = preserve_version 1681 1682 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1683 self._manifest_locator = self._api_response["uuid"] 1684 self.set_committed(True) 1685 1686 return text 1687 1688 _token_re = re.compile(r'(\S+)(\s+|$)') 1689 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1690 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1691 1692 def _unescape_manifest_path(self, path): 1693 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1694 1695 @synchronized 1696 def _import_manifest(self, manifest_text): 1697 """Import a manifest into a `Collection`. 1698 1699 :manifest_text: 1700 The manifest text to import from. 1701 1702 """ 1703 if len(self) > 0: 1704 raise ArgumentError("Can only import manifest into an empty collection") 1705 1706 STREAM_NAME = 0 1707 BLOCKS = 1 1708 SEGMENTS = 2 1709 1710 stream_name = None 1711 state = STREAM_NAME 1712 1713 for token_and_separator in self._token_re.finditer(manifest_text): 1714 tok = token_and_separator.group(1) 1715 sep = token_and_separator.group(2) 1716 1717 if state == STREAM_NAME: 1718 # starting a new stream 1719 stream_name = self._unescape_manifest_path(tok) 1720 blocks = [] 1721 segments = [] 1722 streamoffset = 0 1723 state = BLOCKS 1724 self.find_or_create(stream_name, COLLECTION) 1725 continue 1726 1727 if state == BLOCKS: 1728 block_locator = self._block_re.match(tok) 1729 if block_locator: 1730 blocksize = int(block_locator.group(1)) 1731 blocks.append(streams.Range(tok, streamoffset, blocksize, 0)) 1732 streamoffset += blocksize 1733 else: 1734 state = SEGMENTS 1735 1736 if state == SEGMENTS: 1737 file_segment = self._segment_re.match(tok) 1738 if file_segment: 1739 pos = int(file_segment.group(1)) 1740 size = int(file_segment.group(2)) 1741 name = self._unescape_manifest_path(file_segment.group(3)) 1742 if name.split('/')[-1] == '.': 1743 # placeholder for persisting an empty directory, not a real file 1744 if len(name) > 2: 1745 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1746 else: 1747 filepath = os.path.join(stream_name, name) 1748 try: 1749 afile = self.find_or_create(filepath, FILE) 1750 except IOError as e: 1751 if e.errno == errno.ENOTDIR: 1752 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1753 else: 1754 raise e from None 1755 if isinstance(afile, ArvadosFile): 1756 afile.add_segment(blocks, pos, size) 1757 else: 1758 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1759 else: 1760 # error! 1761 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1762 1763 if sep == "\n": 1764 stream_name = None 1765 state = STREAM_NAME 1766 1767 self._update_token_timestamp() 1768 self.set_committed(True) 1769 1770 @synchronized 1771 def notify( 1772 self, 1773 event: ChangeType, 1774 collection: 'RichCollectionBase', 1775 name: str, 1776 item: CollectionItem, 1777 ) -> None: 1778 if self._callback: 1779 self._callback(event, collection, name, item) 1780 1781 1782class Subcollection(RichCollectionBase): 1783 """Read and manipulate a stream/directory within an Arvados collection 1784 1785 This class represents a single stream (like a directory) within an Arvados 1786 `Collection`. It is returned by `Collection.find` and provides the same API. 1787 Operations that work on the API collection record propagate to the parent 1788 `Collection` object. 1789 """ 1790 1791 def __init__(self, parent, name): 1792 super(Subcollection, self).__init__(parent) 1793 self.lock = self.root_collection().lock 1794 self._manifest_text = None 1795 self.name = name 1796 self.num_retries = parent.num_retries 1797 1798 def root_collection(self) -> 'Collection': 1799 return self.parent.root_collection() 1800 1801 def writable(self) -> bool: 1802 return self.root_collection().writable() 1803 1804 def _my_api(self): 1805 return self.root_collection()._my_api() 1806 1807 def _my_keep(self): 1808 return self.root_collection()._my_keep() 1809 1810 def _my_block_manager(self): 1811 return self.root_collection()._my_block_manager() 1812 1813 def stream_name(self) -> str: 1814 return os.path.join(self.parent.stream_name(), self.name) 1815 1816 @synchronized 1817 def clone( 1818 self, 1819 new_parent: Optional['Collection']=None, 1820 new_name: Optional[str]=None, 1821 ) -> 'Subcollection': 1822 c = Subcollection(new_parent, new_name) 1823 c._clonefrom(self) 1824 return c 1825 1826 @must_be_writable 1827 @synchronized 1828 def _reparent(self, newparent, newname): 1829 self.set_committed(False) 1830 self.flush() 1831 self.parent.remove(self.name, recursive=True) 1832 self.parent = newparent 1833 self.name = newname 1834 self.lock = self.parent.root_collection().lock 1835 1836 @synchronized 1837 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1838 """Encode empty directories by using an \056-named (".") empty file""" 1839 if len(self._items) == 0: 1840 return "%s %s 0:0:\\056\n" % ( 1841 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1842 return super(Subcollection, self)._get_manifest_text(stream_name, 1843 strip, normalize, 1844 only_committed) 1845 1846 1847class CollectionReader(Collection): 1848 """Read-only `Collection` subclass 1849 1850 This class will never create or update any API collection records. You can 1851 use this class for additional code safety when you only need to read 1852 existing collections. 1853 """ 1854 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1855 self._in_init = True 1856 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1857 self._in_init = False 1858 1859 # Forego any locking since it should never change once initialized. 1860 self.lock = NoopLock() 1861 1862 # Backwards compatability with old CollectionReader 1863 # all_streams() and all_files() 1864 self._streams = None 1865 1866 def writable(self) -> bool: 1867 return self._in_init
create_type
value for Collection.find_or_create
create_type
value for Collection.find_or_create
79class CollectionBase(object): 80 """Abstract base class for Collection classes 81 82 .. ATTENTION:: Internal 83 This class is meant to be used by other parts of the SDK. User code 84 should instantiate or subclass `Collection` or one of its subclasses 85 directly. 86 """ 87 88 def __enter__(self): 89 """Enter a context block with this collection instance""" 90 return self 91 92 def __exit__(self, exc_type, exc_value, traceback): 93 """Exit a context block with this collection instance""" 94 pass 95 96 def _my_keep(self): 97 if self._keep_client is None: 98 self._keep_client = KeepClient(api_client=self._api_client, 99 num_retries=self.num_retries) 100 return self._keep_client 101 102 def stripped_manifest(self) -> str: 103 """Create a copy of the collection manifest with only size hints 104 105 This method returns a string with the current collection's manifest 106 text with all non-portable locator hints like permission hints and 107 remote cluster hints removed. The only hints in the returned manifest 108 will be size hints. 109 """ 110 raw = self.manifest_text() 111 clean = [] 112 for line in raw.split("\n"): 113 fields = line.split() 114 if fields: 115 clean_fields = fields[:1] + [ 116 (re.sub(r'\+[^\d][^\+]*', '', x) 117 if re.match(arvados.util.keep_locator_pattern, x) 118 else x) 119 for x in fields[1:]] 120 clean += [' '.join(clean_fields), "\n"] 121 return ''.join(clean)
Abstract base class for Collection classes
102 def stripped_manifest(self) -> str: 103 """Create a copy of the collection manifest with only size hints 104 105 This method returns a string with the current collection's manifest 106 text with all non-portable locator hints like permission hints and 107 remote cluster hints removed. The only hints in the returned manifest 108 will be size hints. 109 """ 110 raw = self.manifest_text() 111 clean = [] 112 for line in raw.split("\n"): 113 fields = line.split() 114 if fields: 115 clean_fields = fields[:1] + [ 116 (re.sub(r'\+[^\d][^\+]*', '', x) 117 if re.match(arvados.util.keep_locator_pattern, x) 118 else x) 119 for x in fields[1:]] 120 clean += [' '.join(clean_fields), "\n"] 121 return ''.join(clean)
Create a copy of the collection manifest with only size hints
This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.
147class RichCollectionBase(CollectionBase): 148 """Base class for Collection classes 149 150 .. ATTENTION:: Internal 151 This class is meant to be used by other parts of the SDK. User code 152 should instantiate or subclass `Collection` or one of its subclasses 153 directly. 154 """ 155 156 def __init__(self, parent=None): 157 self.parent = parent 158 self._committed = False 159 self._has_remote_blocks = False 160 self._callback = None 161 self._items = {} 162 163 def _my_api(self): 164 raise NotImplementedError() 165 166 def _my_keep(self): 167 raise NotImplementedError() 168 169 def _my_block_manager(self): 170 raise NotImplementedError() 171 172 def writable(self) -> bool: 173 """Indicate whether this collection object can be modified 174 175 This method returns `False` if this object is a `CollectionReader`, 176 else `True`. 177 """ 178 raise NotImplementedError() 179 180 def root_collection(self) -> 'Collection': 181 """Get this collection's root collection object 182 183 If you open a subcollection with `Collection.find`, calling this method 184 on that subcollection returns the source Collection object. 185 """ 186 raise NotImplementedError() 187 188 def stream_name(self) -> str: 189 """Get the name of the manifest stream represented by this collection 190 191 If you open a subcollection with `Collection.find`, calling this method 192 on that subcollection returns the name of the stream you opened. 193 """ 194 raise NotImplementedError() 195 196 @synchronized 197 def has_remote_blocks(self) -> bool: 198 """Indiciate whether the collection refers to remote data 199 200 Returns `True` if the collection manifest includes any Keep locators 201 with a remote hint (`+R`), else `False`. 202 """ 203 if self._has_remote_blocks: 204 return True 205 for item in self: 206 if self[item].has_remote_blocks(): 207 return True 208 return False 209 210 @synchronized 211 def set_has_remote_blocks(self, val: bool) -> None: 212 """Cache whether this collection refers to remote blocks 213 214 .. ATTENTION:: Internal 215 This method is only meant to be used by other Collection methods. 216 217 Set this collection's cached "has remote blocks" flag to the given 218 value. 219 """ 220 self._has_remote_blocks = val 221 if self.parent: 222 self.parent.set_has_remote_blocks(val) 223 224 @must_be_writable 225 @synchronized 226 def find_or_create( 227 self, 228 path: str, 229 create_type: CreateType, 230 ) -> CollectionItem: 231 """Get the item at the given path, creating it if necessary 232 233 If `path` refers to a stream in this collection, returns a 234 corresponding `Subcollection` object. If `path` refers to a file in 235 this collection, returns a corresponding 236 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 237 this collection, then this method creates a new object and returns 238 it, creating parent streams as needed. The type of object created is 239 determined by the value of `create_type`. 240 241 Arguments: 242 243 * path: str --- The path to find or create within this collection. 244 245 * create_type: Literal[COLLECTION, FILE] --- The type of object to 246 create at `path` if one does not exist. Passing `COLLECTION` 247 creates a stream and returns the corresponding 248 `Subcollection`. Passing `FILE` creates a new file and returns the 249 corresponding `arvados.arvfile.ArvadosFile`. 250 """ 251 pathcomponents = path.split("/", 1) 252 if pathcomponents[0]: 253 item = self._items.get(pathcomponents[0]) 254 if len(pathcomponents) == 1: 255 if item is None: 256 # create new file 257 if create_type == COLLECTION: 258 item = Subcollection(self, pathcomponents[0]) 259 else: 260 item = ArvadosFile(self, pathcomponents[0]) 261 self._items[pathcomponents[0]] = item 262 self.set_committed(False) 263 self.notify(ADD, self, pathcomponents[0], item) 264 return item 265 else: 266 if item is None: 267 # create new collection 268 item = Subcollection(self, pathcomponents[0]) 269 self._items[pathcomponents[0]] = item 270 self.set_committed(False) 271 self.notify(ADD, self, pathcomponents[0], item) 272 if isinstance(item, RichCollectionBase): 273 return item.find_or_create(pathcomponents[1], create_type) 274 else: 275 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 276 else: 277 return self 278 279 @synchronized 280 def find(self, path: str) -> CollectionItem: 281 """Get the item at the given path 282 283 If `path` refers to a stream in this collection, returns a 284 corresponding `Subcollection` object. If `path` refers to a file in 285 this collection, returns a corresponding 286 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 287 this collection, then this method raises `NotADirectoryError`. 288 289 Arguments: 290 291 * path: str --- The path to find or create within this collection. 292 """ 293 if not path: 294 raise errors.ArgumentError("Parameter 'path' is empty.") 295 296 pathcomponents = path.split("/", 1) 297 if pathcomponents[0] == '': 298 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 299 300 item = self._items.get(pathcomponents[0]) 301 if item is None: 302 return None 303 elif len(pathcomponents) == 1: 304 return item 305 else: 306 if isinstance(item, RichCollectionBase): 307 if pathcomponents[1]: 308 return item.find(pathcomponents[1]) 309 else: 310 return item 311 else: 312 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 313 314 @synchronized 315 def mkdirs(self, path: str) -> 'Subcollection': 316 """Create and return a subcollection at `path` 317 318 If `path` exists within this collection, raises `FileExistsError`. 319 Otherwise, creates a stream at that path and returns the 320 corresponding `Subcollection`. 321 """ 322 if self.find(path) != None: 323 raise IOError(errno.EEXIST, "Directory or file exists", path) 324 325 return self.find_or_create(path, COLLECTION) 326 327 def open( 328 self, 329 path: str, 330 mode: str="r", 331 encoding: Optional[str]=None 332 ) -> IO: 333 """Open a file-like object within the collection 334 335 This method returns a file-like object that can read and/or write the 336 file located at `path` within the collection. If you attempt to write 337 a `path` that does not exist, the file is created with `find_or_create`. 338 If the file cannot be opened for any other reason, this method raises 339 `OSError` with an appropriate errno. 340 341 Arguments: 342 343 * path: str --- The path of the file to open within this collection 344 345 * mode: str --- The mode to open this file. Supports all the same 346 values as `builtins.open`. 347 348 * encoding: str | None --- The text encoding of the file. Only used 349 when the file is opened in text mode. The default is 350 platform-dependent. 351 352 """ 353 if not re.search(r'^[rwa][bt]?\+?$', mode): 354 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 355 356 if mode[0] == 'r' and '+' not in mode: 357 fclass = ArvadosFileReader 358 arvfile = self.find(path) 359 elif not self.writable(): 360 raise IOError(errno.EROFS, "Collection is read only") 361 else: 362 fclass = ArvadosFileWriter 363 arvfile = self.find_or_create(path, FILE) 364 365 if arvfile is None: 366 raise IOError(errno.ENOENT, "File not found", path) 367 if not isinstance(arvfile, ArvadosFile): 368 raise IOError(errno.EISDIR, "Is a directory", path) 369 370 if mode[0] == 'w': 371 arvfile.truncate(0) 372 373 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 374 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 375 if 'b' not in mode: 376 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 377 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 378 return f 379 380 def modified(self) -> bool: 381 """Indicate whether this collection has an API server record 382 383 Returns `False` if this collection corresponds to a record loaded from 384 the API server, `True` otherwise. 385 """ 386 return not self.committed() 387 388 @synchronized 389 def committed(self): 390 """Indicate whether this collection has an API server record 391 392 Returns `True` if this collection corresponds to a record loaded from 393 the API server, `False` otherwise. 394 """ 395 return self._committed 396 397 @synchronized 398 def set_committed(self, value: bool=True): 399 """Cache whether this collection has an API server record 400 401 .. ATTENTION:: Internal 402 This method is only meant to be used by other Collection methods. 403 404 Set this collection's cached "committed" flag to the given 405 value and propagates it as needed. 406 """ 407 if value == self._committed: 408 return 409 if value: 410 for k,v in self._items.items(): 411 v.set_committed(True) 412 self._committed = True 413 else: 414 self._committed = False 415 if self.parent is not None: 416 self.parent.set_committed(False) 417 418 @synchronized 419 def __iter__(self) -> Iterator[str]: 420 """Iterate names of streams and files in this collection 421 422 This method does not recurse. It only iterates the contents of this 423 collection's corresponding stream. 424 """ 425 return iter(self._items) 426 427 @synchronized 428 def __getitem__(self, k: str) -> CollectionItem: 429 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 430 431 This method does not recurse. If you want to search a path, use 432 `RichCollectionBase.find` instead. 433 """ 434 return self._items[k] 435 436 @synchronized 437 def __contains__(self, k: str) -> bool: 438 """Indicate whether this collection has an item with this name 439 440 This method does not recurse. It you want to check a path, use 441 `RichCollectionBase.exists` instead. 442 """ 443 return k in self._items 444 445 @synchronized 446 def __len__(self): 447 """Get the number of items directly contained in this collection 448 449 This method does not recurse. It only counts the streams and files 450 in this collection's corresponding stream. 451 """ 452 return len(self._items) 453 454 @must_be_writable 455 @synchronized 456 def __delitem__(self, p: str) -> None: 457 """Delete an item from this collection's stream 458 459 This method does not recurse. If you want to remove an item by a 460 path, use `RichCollectionBase.remove` instead. 461 """ 462 del self._items[p] 463 self.set_committed(False) 464 self.notify(DEL, self, p, None) 465 466 @synchronized 467 def keys(self) -> Iterator[str]: 468 """Iterate names of streams and files in this collection 469 470 This method does not recurse. It only iterates the contents of this 471 collection's corresponding stream. 472 """ 473 return self._items.keys() 474 475 @synchronized 476 def values(self) -> List[CollectionItem]: 477 """Get a list of objects in this collection's stream 478 479 The return value includes a `Subcollection` for every stream, and an 480 `arvados.arvfile.ArvadosFile` for every file, directly within this 481 collection's stream. This method does not recurse. 482 """ 483 return list(self._items.values()) 484 485 @synchronized 486 def items(self) -> List[Tuple[str, CollectionItem]]: 487 """Get a list of `(name, object)` tuples from this collection's stream 488 489 The return value includes a `Subcollection` for every stream, and an 490 `arvados.arvfile.ArvadosFile` for every file, directly within this 491 collection's stream. This method does not recurse. 492 """ 493 return list(self._items.items()) 494 495 def exists(self, path: str) -> bool: 496 """Indicate whether this collection includes an item at `path` 497 498 This method returns `True` if `path` refers to a stream or file within 499 this collection, else `False`. 500 501 Arguments: 502 503 * path: str --- The path to check for existence within this collection 504 """ 505 return self.find(path) is not None 506 507 @must_be_writable 508 @synchronized 509 def remove(self, path: str, recursive: bool=False) -> None: 510 """Remove the file or stream at `path` 511 512 Arguments: 513 514 * path: str --- The path of the item to remove from the collection 515 516 * recursive: bool --- Controls the method's behavior if `path` refers 517 to a nonempty stream. If `False` (the default), this method raises 518 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 519 items under the stream. 520 """ 521 if not path: 522 raise errors.ArgumentError("Parameter 'path' is empty.") 523 524 pathcomponents = path.split("/", 1) 525 item = self._items.get(pathcomponents[0]) 526 if item is None: 527 raise IOError(errno.ENOENT, "File not found", path) 528 if len(pathcomponents) == 1: 529 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 530 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 531 deleteditem = self._items[pathcomponents[0]] 532 del self._items[pathcomponents[0]] 533 self.set_committed(False) 534 self.notify(DEL, self, pathcomponents[0], deleteditem) 535 else: 536 item.remove(pathcomponents[1], recursive=recursive) 537 538 def _clonefrom(self, source): 539 for k,v in source.items(): 540 self._items[k] = v.clone(self, k) 541 542 def clone(self): 543 raise NotImplementedError() 544 545 @must_be_writable 546 @synchronized 547 def add( 548 self, 549 source_obj: CollectionItem, 550 target_name: str, 551 overwrite: bool=False, 552 reparent: bool=False, 553 ) -> None: 554 """Copy or move a file or subcollection object to this collection 555 556 Arguments: 557 558 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 559 to add to this collection 560 561 * target_name: str --- The path inside this collection where 562 `source_obj` should be added. 563 564 * overwrite: bool --- Controls the behavior of this method when the 565 collection already contains an object at `target_name`. If `False` 566 (the default), this method will raise `FileExistsError`. If `True`, 567 the object at `target_name` will be replaced with `source_obj`. 568 569 * reparent: bool --- Controls whether this method copies or moves 570 `source_obj`. If `False` (the default), `source_obj` is copied into 571 this collection. If `True`, `source_obj` is moved into this 572 collection. 573 """ 574 if target_name in self and not overwrite: 575 raise IOError(errno.EEXIST, "File already exists", target_name) 576 577 modified_from = None 578 if target_name in self: 579 modified_from = self[target_name] 580 581 # Actually make the move or copy. 582 if reparent: 583 source_obj._reparent(self, target_name) 584 item = source_obj 585 else: 586 item = source_obj.clone(self, target_name) 587 588 self._items[target_name] = item 589 self.set_committed(False) 590 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 591 self.set_has_remote_blocks(True) 592 593 if modified_from: 594 self.notify(MOD, self, target_name, (modified_from, item)) 595 else: 596 self.notify(ADD, self, target_name, item) 597 598 def _get_src_target(self, source, target_path, source_collection, create_dest): 599 if source_collection is None: 600 source_collection = self 601 602 # Find the object 603 if isinstance(source, str): 604 source_obj = source_collection.find(source) 605 if source_obj is None: 606 raise IOError(errno.ENOENT, "File not found", source) 607 sourcecomponents = source.split("/") 608 else: 609 source_obj = source 610 sourcecomponents = None 611 612 # Find parent collection the target path 613 targetcomponents = target_path.split("/") 614 615 # Determine the name to use. 616 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 617 618 if not target_name: 619 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 620 621 if create_dest: 622 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 623 else: 624 if len(targetcomponents) > 1: 625 target_dir = self.find("/".join(targetcomponents[0:-1])) 626 else: 627 target_dir = self 628 629 if target_dir is None: 630 raise IOError(errno.ENOENT, "Target directory not found", target_name) 631 632 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 633 target_dir = target_dir[target_name] 634 target_name = sourcecomponents[-1] 635 636 return (source_obj, target_dir, target_name) 637 638 @must_be_writable 639 @synchronized 640 def copy( 641 self, 642 source: Union[str, CollectionItem], 643 target_path: str, 644 source_collection: Optional['RichCollectionBase']=None, 645 overwrite: bool=False, 646 ) -> None: 647 """Copy a file or subcollection object to this collection 648 649 Arguments: 650 651 * source: str | arvados.arvfile.ArvadosFile | 652 arvados.collection.Subcollection --- The file or subcollection to 653 add to this collection. If `source` is a str, the object will be 654 found by looking up this path from `source_collection` (see 655 below). 656 657 * target_path: str --- The path inside this collection where the 658 source object should be added. 659 660 * source_collection: arvados.collection.Collection | None --- The 661 collection to find the source object from when `source` is a 662 path. Defaults to the current collection (`self`). 663 664 * overwrite: bool --- Controls the behavior of this method when the 665 collection already contains an object at `target_path`. If `False` 666 (the default), this method will raise `FileExistsError`. If `True`, 667 the object at `target_path` will be replaced with `source_obj`. 668 """ 669 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 670 target_dir.add(source_obj, target_name, overwrite, False) 671 672 @must_be_writable 673 @synchronized 674 def rename( 675 self, 676 source: Union[str, CollectionItem], 677 target_path: str, 678 source_collection: Optional['RichCollectionBase']=None, 679 overwrite: bool=False, 680 ) -> None: 681 """Move a file or subcollection object to this collection 682 683 Arguments: 684 685 * source: str | arvados.arvfile.ArvadosFile | 686 arvados.collection.Subcollection --- The file or subcollection to 687 add to this collection. If `source` is a str, the object will be 688 found by looking up this path from `source_collection` (see 689 below). 690 691 * target_path: str --- The path inside this collection where the 692 source object should be added. 693 694 * source_collection: arvados.collection.Collection | None --- The 695 collection to find the source object from when `source` is a 696 path. Defaults to the current collection (`self`). 697 698 * overwrite: bool --- Controls the behavior of this method when the 699 collection already contains an object at `target_path`. If `False` 700 (the default), this method will raise `FileExistsError`. If `True`, 701 the object at `target_path` will be replaced with `source_obj`. 702 """ 703 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 704 if not source_obj.writable(): 705 raise IOError(errno.EROFS, "Source collection is read only", source) 706 target_dir.add(source_obj, target_name, overwrite, True) 707 708 def portable_manifest_text(self, stream_name: str=".") -> str: 709 """Get the portable manifest text for this collection 710 711 The portable manifest text is normalized, and does not include access 712 tokens. This method does not flush outstanding blocks to Keep. 713 714 Arguments: 715 716 * stream_name: str --- The name to use for this collection's stream in 717 the generated manifest. Default `'.'`. 718 """ 719 return self._get_manifest_text(stream_name, True, True) 720 721 @synchronized 722 def manifest_text( 723 self, 724 stream_name: str=".", 725 strip: bool=False, 726 normalize: bool=False, 727 only_committed: bool=False, 728 ) -> str: 729 """Get the manifest text for this collection 730 731 Arguments: 732 733 * stream_name: str --- The name to use for this collection's stream in 734 the generated manifest. Default `'.'`. 735 736 * strip: bool --- Controls whether or not the returned manifest text 737 includes access tokens. If `False` (the default), the manifest text 738 will include access tokens. If `True`, the manifest text will not 739 include access tokens. 740 741 * normalize: bool --- Controls whether or not the returned manifest 742 text is normalized. Default `False`. 743 744 * only_committed: bool --- Controls whether or not this method uploads 745 pending data to Keep before building and returning the manifest text. 746 If `False` (the default), this method will finish uploading all data 747 to Keep, then return the final manifest. If `True`, this method will 748 build and return a manifest that only refers to the data that has 749 finished uploading at the time this method was called. 750 """ 751 if not only_committed: 752 self._my_block_manager().commit_all() 753 return self._get_manifest_text(stream_name, strip, normalize, 754 only_committed=only_committed) 755 756 @synchronized 757 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 758 """Get the manifest text for this collection, sub collections and files. 759 760 :stream_name: 761 Name to use for this stream (directory) 762 763 :strip: 764 If True, remove signing tokens from block locators if present. 765 If False (default), block locators are left unchanged. 766 767 :normalize: 768 If True, always export the manifest text in normalized form 769 even if the Collection is not modified. If False (default) and the collection 770 is not modified, return the original manifest text even if it is not 771 in normalized form. 772 773 :only_committed: 774 If True, only include blocks that were already committed to Keep. 775 776 """ 777 778 if not self.committed() or self._manifest_text is None or normalize: 779 stream = {} 780 buf = [] 781 sorted_keys = sorted(self.keys()) 782 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 783 # Create a stream per file `k` 784 arvfile = self[filename] 785 filestream = [] 786 for segment in arvfile.segments(): 787 loc = segment.locator 788 if arvfile.parent._my_block_manager().is_bufferblock(loc): 789 if only_committed: 790 continue 791 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 792 if strip: 793 loc = KeepLocator(loc).stripped() 794 filestream.append(streams.LocatorAndRange( 795 loc, 796 KeepLocator(loc).size, 797 segment.segment_offset, 798 segment.range_size, 799 )) 800 stream[filename] = filestream 801 if stream: 802 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n") 803 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 804 buf.append(self[dirname].manifest_text( 805 stream_name=os.path.join(stream_name, dirname), 806 strip=strip, normalize=True, only_committed=only_committed)) 807 return "".join(buf) 808 else: 809 if strip: 810 return self.stripped_manifest() 811 else: 812 return self._manifest_text 813 814 @synchronized 815 def _copy_remote_blocks(self, remote_blocks={}): 816 """Scan through the entire collection and ask Keep to copy remote blocks. 817 818 When accessing a remote collection, blocks will have a remote signature 819 (+R instead of +A). Collect these signatures and request Keep to copy the 820 blocks to the local cluster, returning local (+A) signatures. 821 822 :remote_blocks: 823 Shared cache of remote to local block mappings. This is used to avoid 824 doing extra work when blocks are shared by more than one file in 825 different subdirectories. 826 827 """ 828 for item in self: 829 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 830 return remote_blocks 831 832 @synchronized 833 def diff( 834 self, 835 end_collection: 'RichCollectionBase', 836 prefix: str=".", 837 holding_collection: Optional['Collection']=None, 838 ) -> ChangeList: 839 """Build a list of differences between this collection and another 840 841 Arguments: 842 843 * end_collection: arvados.collection.RichCollectionBase --- A 844 collection object with the desired end state. The returned diff 845 list will describe how to go from the current collection object 846 `self` to `end_collection`. 847 848 * prefix: str --- The name to use for this collection's stream in 849 the diff list. Default `'.'`. 850 851 * holding_collection: arvados.collection.Collection | None --- A 852 collection object used to hold objects for the returned diff 853 list. By default, a new empty collection is created. 854 """ 855 changes = [] 856 if holding_collection is None: 857 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 858 for k in self: 859 if k not in end_collection: 860 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 861 for k in end_collection: 862 if k in self: 863 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 864 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 865 elif end_collection[k] != self[k]: 866 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 867 else: 868 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 869 else: 870 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 871 return changes 872 873 @must_be_writable 874 @synchronized 875 def apply(self, changes: ChangeList) -> None: 876 """Apply a list of changes from to this collection 877 878 This method takes a list of changes generated by 879 `RichCollectionBase.diff` and applies it to this 880 collection. Afterward, the state of this collection object will 881 match the state of `end_collection` passed to `diff`. If a change 882 conflicts with a local change, it will be saved to an alternate path 883 indicating the conflict. 884 885 Arguments: 886 887 * changes: arvados.collection.ChangeList --- The list of differences 888 generated by `RichCollectionBase.diff`. 889 """ 890 if changes: 891 self.set_committed(False) 892 for change in changes: 893 event_type = change[0] 894 path = change[1] 895 initial = change[2] 896 local = self.find(path) 897 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 898 time.gmtime())) 899 if event_type == ADD: 900 if local is None: 901 # No local file at path, safe to copy over new file 902 self.copy(initial, path) 903 elif local is not None and local != initial: 904 # There is already local file and it is different: 905 # save change to conflict file. 906 self.copy(initial, conflictpath) 907 elif event_type == MOD or event_type == TOK: 908 final = change[3] 909 if local == initial: 910 # Local matches the "initial" item so it has not 911 # changed locally and is safe to update. 912 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 913 # Replace contents of local file with new contents 914 local.replace_contents(final) 915 else: 916 # Overwrite path with new item; this can happen if 917 # path was a file and is now a collection or vice versa 918 self.copy(final, path, overwrite=True) 919 elif event_type == MOD: 920 # Local doesn't match the "start" value or local 921 # is missing (presumably deleted) so save change 922 # to conflict file. Don't do this for TOK events 923 # which means the file didn't change but only had 924 # tokens updated. 925 self.copy(final, conflictpath) 926 elif event_type == DEL: 927 if local == initial: 928 # Local item matches "initial" value, so it is safe to remove. 929 self.remove(path, recursive=True) 930 # else, the file is modified or already removed, in either 931 # case we don't want to try to remove it. 932 933 def portable_data_hash(self) -> str: 934 """Get the portable data hash for this collection's manifest""" 935 if self._manifest_locator and self.committed(): 936 # If the collection is already saved on the API server, and it's committed 937 # then return API server's PDH response. 938 return self._portable_data_hash 939 else: 940 stripped = self.portable_manifest_text().encode() 941 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 942 943 @synchronized 944 def subscribe(self, callback: ChangeCallback) -> None: 945 """Set a notify callback for changes to this collection 946 947 Arguments: 948 949 * callback: arvados.collection.ChangeCallback --- The callable to 950 call each time the collection is changed. 951 """ 952 if self._callback is None: 953 self._callback = callback 954 else: 955 raise errors.ArgumentError("A callback is already set on this collection.") 956 957 @synchronized 958 def unsubscribe(self) -> None: 959 """Remove any notify callback set for changes to this collection""" 960 if self._callback is not None: 961 self._callback = None 962 963 @synchronized 964 def notify( 965 self, 966 event: ChangeType, 967 collection: 'RichCollectionBase', 968 name: str, 969 item: CollectionItem, 970 ) -> None: 971 """Notify any subscribed callback about a change to this collection 972 973 .. ATTENTION:: Internal 974 This method is only meant to be used by other Collection methods. 975 976 If a callback has been registered with `RichCollectionBase.subscribe`, 977 it will be called with information about a change to this collection. 978 Then this notification will be propagated to this collection's root. 979 980 Arguments: 981 982 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 983 the collection. 984 985 * collection: arvados.collection.RichCollectionBase --- The 986 collection that was modified. 987 988 * name: str --- The name of the file or stream within `collection` that 989 was modified. 990 991 * item: arvados.arvfile.ArvadosFile | 992 arvados.collection.Subcollection --- For ADD events, the new 993 contents at `name` within `collection`; for DEL events, the 994 item that was removed. For MOD and TOK events, a 2-tuple of 995 the previous item and the new item (may be the same object 996 or different, depending on whether the action involved it 997 being modified in place or replaced). 998 999 """ 1000 if self._callback: 1001 self._callback(event, collection, name, item) 1002 self.root_collection().notify(event, collection, name, item) 1003 1004 @synchronized 1005 def __eq__(self, other: Any) -> bool: 1006 """Indicate whether this collection object is equal to another""" 1007 if other is self: 1008 return True 1009 if not isinstance(other, RichCollectionBase): 1010 return False 1011 if len(self._items) != len(other): 1012 return False 1013 for k in self._items: 1014 if k not in other: 1015 return False 1016 if self._items[k] != other[k]: 1017 return False 1018 return True 1019 1020 def __ne__(self, other: Any) -> bool: 1021 """Indicate whether this collection object is not equal to another""" 1022 return not self.__eq__(other) 1023 1024 @synchronized 1025 def flush(self) -> None: 1026 """Upload any pending data to Keep""" 1027 for e in self.values(): 1028 e.flush()
Base class for Collection classes
172 def writable(self) -> bool: 173 """Indicate whether this collection object can be modified 174 175 This method returns `False` if this object is a `CollectionReader`, 176 else `True`. 177 """ 178 raise NotImplementedError()
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
180 def root_collection(self) -> 'Collection': 181 """Get this collection's root collection object 182 183 If you open a subcollection with `Collection.find`, calling this method 184 on that subcollection returns the source Collection object. 185 """ 186 raise NotImplementedError()
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
188 def stream_name(self) -> str: 189 """Get the name of the manifest stream represented by this collection 190 191 If you open a subcollection with `Collection.find`, calling this method 192 on that subcollection returns the name of the stream you opened. 193 """ 194 raise NotImplementedError()
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
196 @synchronized 197 def has_remote_blocks(self) -> bool: 198 """Indiciate whether the collection refers to remote data 199 200 Returns `True` if the collection manifest includes any Keep locators 201 with a remote hint (`+R`), else `False`. 202 """ 203 if self._has_remote_blocks: 204 return True 205 for item in self: 206 if self[item].has_remote_blocks(): 207 return True 208 return False
Indiciate whether the collection refers to remote data
Returns True
if the collection manifest includes any Keep locators
with a remote hint (+R
), else False
.
210 @synchronized 211 def set_has_remote_blocks(self, val: bool) -> None: 212 """Cache whether this collection refers to remote blocks 213 214 .. ATTENTION:: Internal 215 This method is only meant to be used by other Collection methods. 216 217 Set this collection's cached "has remote blocks" flag to the given 218 value. 219 """ 220 self._has_remote_blocks = val 221 if self.parent: 222 self.parent.set_has_remote_blocks(val)
Cache whether this collection refers to remote blocks
Set this collection’s cached “has remote blocks” flag to the given value.
224 @must_be_writable 225 @synchronized 226 def find_or_create( 227 self, 228 path: str, 229 create_type: CreateType, 230 ) -> CollectionItem: 231 """Get the item at the given path, creating it if necessary 232 233 If `path` refers to a stream in this collection, returns a 234 corresponding `Subcollection` object. If `path` refers to a file in 235 this collection, returns a corresponding 236 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 237 this collection, then this method creates a new object and returns 238 it, creating parent streams as needed. The type of object created is 239 determined by the value of `create_type`. 240 241 Arguments: 242 243 * path: str --- The path to find or create within this collection. 244 245 * create_type: Literal[COLLECTION, FILE] --- The type of object to 246 create at `path` if one does not exist. Passing `COLLECTION` 247 creates a stream and returns the corresponding 248 `Subcollection`. Passing `FILE` creates a new file and returns the 249 corresponding `arvados.arvfile.ArvadosFile`. 250 """ 251 pathcomponents = path.split("/", 1) 252 if pathcomponents[0]: 253 item = self._items.get(pathcomponents[0]) 254 if len(pathcomponents) == 1: 255 if item is None: 256 # create new file 257 if create_type == COLLECTION: 258 item = Subcollection(self, pathcomponents[0]) 259 else: 260 item = ArvadosFile(self, pathcomponents[0]) 261 self._items[pathcomponents[0]] = item 262 self.set_committed(False) 263 self.notify(ADD, self, pathcomponents[0], item) 264 return item 265 else: 266 if item is None: 267 # create new collection 268 item = Subcollection(self, pathcomponents[0]) 269 self._items[pathcomponents[0]] = item 270 self.set_committed(False) 271 self.notify(ADD, self, pathcomponents[0], item) 272 if isinstance(item, RichCollectionBase): 273 return item.find_or_create(pathcomponents[1], create_type) 274 else: 275 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 276 else: 277 return self
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
279 @synchronized 280 def find(self, path: str) -> CollectionItem: 281 """Get the item at the given path 282 283 If `path` refers to a stream in this collection, returns a 284 corresponding `Subcollection` object. If `path` refers to a file in 285 this collection, returns a corresponding 286 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 287 this collection, then this method raises `NotADirectoryError`. 288 289 Arguments: 290 291 * path: str --- The path to find or create within this collection. 292 """ 293 if not path: 294 raise errors.ArgumentError("Parameter 'path' is empty.") 295 296 pathcomponents = path.split("/", 1) 297 if pathcomponents[0] == '': 298 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 299 300 item = self._items.get(pathcomponents[0]) 301 if item is None: 302 return None 303 elif len(pathcomponents) == 1: 304 return item 305 else: 306 if isinstance(item, RichCollectionBase): 307 if pathcomponents[1]: 308 return item.find(pathcomponents[1]) 309 else: 310 return item 311 else: 312 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
314 @synchronized 315 def mkdirs(self, path: str) -> 'Subcollection': 316 """Create and return a subcollection at `path` 317 318 If `path` exists within this collection, raises `FileExistsError`. 319 Otherwise, creates a stream at that path and returns the 320 corresponding `Subcollection`. 321 """ 322 if self.find(path) != None: 323 raise IOError(errno.EEXIST, "Directory or file exists", path) 324 325 return self.find_or_create(path, COLLECTION)
Create and return a subcollection at path
If path
exists within this collection, raises FileExistsError
.
Otherwise, creates a stream at that path and returns the
corresponding Subcollection
.
327 def open( 328 self, 329 path: str, 330 mode: str="r", 331 encoding: Optional[str]=None 332 ) -> IO: 333 """Open a file-like object within the collection 334 335 This method returns a file-like object that can read and/or write the 336 file located at `path` within the collection. If you attempt to write 337 a `path` that does not exist, the file is created with `find_or_create`. 338 If the file cannot be opened for any other reason, this method raises 339 `OSError` with an appropriate errno. 340 341 Arguments: 342 343 * path: str --- The path of the file to open within this collection 344 345 * mode: str --- The mode to open this file. Supports all the same 346 values as `builtins.open`. 347 348 * encoding: str | None --- The text encoding of the file. Only used 349 when the file is opened in text mode. The default is 350 platform-dependent. 351 352 """ 353 if not re.search(r'^[rwa][bt]?\+?$', mode): 354 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 355 356 if mode[0] == 'r' and '+' not in mode: 357 fclass = ArvadosFileReader 358 arvfile = self.find(path) 359 elif not self.writable(): 360 raise IOError(errno.EROFS, "Collection is read only") 361 else: 362 fclass = ArvadosFileWriter 363 arvfile = self.find_or_create(path, FILE) 364 365 if arvfile is None: 366 raise IOError(errno.ENOENT, "File not found", path) 367 if not isinstance(arvfile, ArvadosFile): 368 raise IOError(errno.EISDIR, "Is a directory", path) 369 370 if mode[0] == 'w': 371 arvfile.truncate(0) 372 373 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 374 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 375 if 'b' not in mode: 376 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 377 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 378 return f
Open a file-like object within the collection
This method returns a file-like object that can read and/or write the
file located at path
within the collection. If you attempt to write
a path
that does not exist, the file is created with find_or_create
.
If the file cannot be opened for any other reason, this method raises
OSError
with an appropriate errno.
Arguments:
path: str — The path of the file to open within this collection
mode: str — The mode to open this file. Supports all the same values as
builtins.open
.encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.
380 def modified(self) -> bool: 381 """Indicate whether this collection has an API server record 382 383 Returns `False` if this collection corresponds to a record loaded from 384 the API server, `True` otherwise. 385 """ 386 return not self.committed()
Indicate whether this collection has an API server record
Returns False
if this collection corresponds to a record loaded from
the API server, True
otherwise.
388 @synchronized 389 def committed(self): 390 """Indicate whether this collection has an API server record 391 392 Returns `True` if this collection corresponds to a record loaded from 393 the API server, `False` otherwise. 394 """ 395 return self._committed
Indicate whether this collection has an API server record
Returns True
if this collection corresponds to a record loaded from
the API server, False
otherwise.
397 @synchronized 398 def set_committed(self, value: bool=True): 399 """Cache whether this collection has an API server record 400 401 .. ATTENTION:: Internal 402 This method is only meant to be used by other Collection methods. 403 404 Set this collection's cached "committed" flag to the given 405 value and propagates it as needed. 406 """ 407 if value == self._committed: 408 return 409 if value: 410 for k,v in self._items.items(): 411 v.set_committed(True) 412 self._committed = True 413 else: 414 self._committed = False 415 if self.parent is not None: 416 self.parent.set_committed(False)
Cache whether this collection has an API server record
Set this collection’s cached “committed” flag to the given value and propagates it as needed.
466 @synchronized 467 def keys(self) -> Iterator[str]: 468 """Iterate names of streams and files in this collection 469 470 This method does not recurse. It only iterates the contents of this 471 collection's corresponding stream. 472 """ 473 return self._items.keys()
Iterate names of streams and files in this collection
This method does not recurse. It only iterates the contents of this collection’s corresponding stream.
475 @synchronized 476 def values(self) -> List[CollectionItem]: 477 """Get a list of objects in this collection's stream 478 479 The return value includes a `Subcollection` for every stream, and an 480 `arvados.arvfile.ArvadosFile` for every file, directly within this 481 collection's stream. This method does not recurse. 482 """ 483 return list(self._items.values())
Get a list of objects in this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
485 @synchronized 486 def items(self) -> List[Tuple[str, CollectionItem]]: 487 """Get a list of `(name, object)` tuples from this collection's stream 488 489 The return value includes a `Subcollection` for every stream, and an 490 `arvados.arvfile.ArvadosFile` for every file, directly within this 491 collection's stream. This method does not recurse. 492 """ 493 return list(self._items.items())
Get a list of (name, object)
tuples from this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
495 def exists(self, path: str) -> bool: 496 """Indicate whether this collection includes an item at `path` 497 498 This method returns `True` if `path` refers to a stream or file within 499 this collection, else `False`. 500 501 Arguments: 502 503 * path: str --- The path to check for existence within this collection 504 """ 505 return self.find(path) is not None
Indicate whether this collection includes an item at path
This method returns True
if path
refers to a stream or file within
this collection, else False
.
Arguments:
- path: str — The path to check for existence within this collection
507 @must_be_writable 508 @synchronized 509 def remove(self, path: str, recursive: bool=False) -> None: 510 """Remove the file or stream at `path` 511 512 Arguments: 513 514 * path: str --- The path of the item to remove from the collection 515 516 * recursive: bool --- Controls the method's behavior if `path` refers 517 to a nonempty stream. If `False` (the default), this method raises 518 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 519 items under the stream. 520 """ 521 if not path: 522 raise errors.ArgumentError("Parameter 'path' is empty.") 523 524 pathcomponents = path.split("/", 1) 525 item = self._items.get(pathcomponents[0]) 526 if item is None: 527 raise IOError(errno.ENOENT, "File not found", path) 528 if len(pathcomponents) == 1: 529 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 530 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 531 deleteditem = self._items[pathcomponents[0]] 532 del self._items[pathcomponents[0]] 533 self.set_committed(False) 534 self.notify(DEL, self, pathcomponents[0], deleteditem) 535 else: 536 item.remove(pathcomponents[1], recursive=recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
545 @must_be_writable 546 @synchronized 547 def add( 548 self, 549 source_obj: CollectionItem, 550 target_name: str, 551 overwrite: bool=False, 552 reparent: bool=False, 553 ) -> None: 554 """Copy or move a file or subcollection object to this collection 555 556 Arguments: 557 558 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 559 to add to this collection 560 561 * target_name: str --- The path inside this collection where 562 `source_obj` should be added. 563 564 * overwrite: bool --- Controls the behavior of this method when the 565 collection already contains an object at `target_name`. If `False` 566 (the default), this method will raise `FileExistsError`. If `True`, 567 the object at `target_name` will be replaced with `source_obj`. 568 569 * reparent: bool --- Controls whether this method copies or moves 570 `source_obj`. If `False` (the default), `source_obj` is copied into 571 this collection. If `True`, `source_obj` is moved into this 572 collection. 573 """ 574 if target_name in self and not overwrite: 575 raise IOError(errno.EEXIST, "File already exists", target_name) 576 577 modified_from = None 578 if target_name in self: 579 modified_from = self[target_name] 580 581 # Actually make the move or copy. 582 if reparent: 583 source_obj._reparent(self, target_name) 584 item = source_obj 585 else: 586 item = source_obj.clone(self, target_name) 587 588 self._items[target_name] = item 589 self.set_committed(False) 590 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 591 self.set_has_remote_blocks(True) 592 593 if modified_from: 594 self.notify(MOD, self, target_name, (modified_from, item)) 595 else: 596 self.notify(ADD, self, target_name, item)
Copy or move a file or subcollection object to this collection
Arguments:
source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection
target_name: str — The path inside this collection where
source_obj
should be added.overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_name
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_name
will be replaced withsource_obj
.reparent: bool — Controls whether this method copies or moves
source_obj
. IfFalse
(the default),source_obj
is copied into this collection. IfTrue
,source_obj
is moved into this collection.
638 @must_be_writable 639 @synchronized 640 def copy( 641 self, 642 source: Union[str, CollectionItem], 643 target_path: str, 644 source_collection: Optional['RichCollectionBase']=None, 645 overwrite: bool=False, 646 ) -> None: 647 """Copy a file or subcollection object to this collection 648 649 Arguments: 650 651 * source: str | arvados.arvfile.ArvadosFile | 652 arvados.collection.Subcollection --- The file or subcollection to 653 add to this collection. If `source` is a str, the object will be 654 found by looking up this path from `source_collection` (see 655 below). 656 657 * target_path: str --- The path inside this collection where the 658 source object should be added. 659 660 * source_collection: arvados.collection.Collection | None --- The 661 collection to find the source object from when `source` is a 662 path. Defaults to the current collection (`self`). 663 664 * overwrite: bool --- Controls the behavior of this method when the 665 collection already contains an object at `target_path`. If `False` 666 (the default), this method will raise `FileExistsError`. If `True`, 667 the object at `target_path` will be replaced with `source_obj`. 668 """ 669 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 670 target_dir.add(source_obj, target_name, overwrite, False)
Copy a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
672 @must_be_writable 673 @synchronized 674 def rename( 675 self, 676 source: Union[str, CollectionItem], 677 target_path: str, 678 source_collection: Optional['RichCollectionBase']=None, 679 overwrite: bool=False, 680 ) -> None: 681 """Move a file or subcollection object to this collection 682 683 Arguments: 684 685 * source: str | arvados.arvfile.ArvadosFile | 686 arvados.collection.Subcollection --- The file or subcollection to 687 add to this collection. If `source` is a str, the object will be 688 found by looking up this path from `source_collection` (see 689 below). 690 691 * target_path: str --- The path inside this collection where the 692 source object should be added. 693 694 * source_collection: arvados.collection.Collection | None --- The 695 collection to find the source object from when `source` is a 696 path. Defaults to the current collection (`self`). 697 698 * overwrite: bool --- Controls the behavior of this method when the 699 collection already contains an object at `target_path`. If `False` 700 (the default), this method will raise `FileExistsError`. If `True`, 701 the object at `target_path` will be replaced with `source_obj`. 702 """ 703 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 704 if not source_obj.writable(): 705 raise IOError(errno.EROFS, "Source collection is read only", source) 706 target_dir.add(source_obj, target_name, overwrite, True)
Move a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
708 def portable_manifest_text(self, stream_name: str=".") -> str: 709 """Get the portable manifest text for this collection 710 711 The portable manifest text is normalized, and does not include access 712 tokens. This method does not flush outstanding blocks to Keep. 713 714 Arguments: 715 716 * stream_name: str --- The name to use for this collection's stream in 717 the generated manifest. Default `'.'`. 718 """ 719 return self._get_manifest_text(stream_name, True, True)
Get the portable manifest text for this collection
The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.
Arguments:
- stream_name: str — The name to use for this collection’s stream in
the generated manifest. Default
'.'
.
721 @synchronized 722 def manifest_text( 723 self, 724 stream_name: str=".", 725 strip: bool=False, 726 normalize: bool=False, 727 only_committed: bool=False, 728 ) -> str: 729 """Get the manifest text for this collection 730 731 Arguments: 732 733 * stream_name: str --- The name to use for this collection's stream in 734 the generated manifest. Default `'.'`. 735 736 * strip: bool --- Controls whether or not the returned manifest text 737 includes access tokens. If `False` (the default), the manifest text 738 will include access tokens. If `True`, the manifest text will not 739 include access tokens. 740 741 * normalize: bool --- Controls whether or not the returned manifest 742 text is normalized. Default `False`. 743 744 * only_committed: bool --- Controls whether or not this method uploads 745 pending data to Keep before building and returning the manifest text. 746 If `False` (the default), this method will finish uploading all data 747 to Keep, then return the final manifest. If `True`, this method will 748 build and return a manifest that only refers to the data that has 749 finished uploading at the time this method was called. 750 """ 751 if not only_committed: 752 self._my_block_manager().commit_all() 753 return self._get_manifest_text(stream_name, strip, normalize, 754 only_committed=only_committed)
Get the manifest text for this collection
Arguments:
stream_name: str — The name to use for this collection’s stream in the generated manifest. Default
'.'
.strip: bool — Controls whether or not the returned manifest text includes access tokens. If
False
(the default), the manifest text will include access tokens. IfTrue
, the manifest text will not include access tokens.normalize: bool — Controls whether or not the returned manifest text is normalized. Default
False
.only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If
False
(the default), this method will finish uploading all data to Keep, then return the final manifest. IfTrue
, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.
832 @synchronized 833 def diff( 834 self, 835 end_collection: 'RichCollectionBase', 836 prefix: str=".", 837 holding_collection: Optional['Collection']=None, 838 ) -> ChangeList: 839 """Build a list of differences between this collection and another 840 841 Arguments: 842 843 * end_collection: arvados.collection.RichCollectionBase --- A 844 collection object with the desired end state. The returned diff 845 list will describe how to go from the current collection object 846 `self` to `end_collection`. 847 848 * prefix: str --- The name to use for this collection's stream in 849 the diff list. Default `'.'`. 850 851 * holding_collection: arvados.collection.Collection | None --- A 852 collection object used to hold objects for the returned diff 853 list. By default, a new empty collection is created. 854 """ 855 changes = [] 856 if holding_collection is None: 857 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 858 for k in self: 859 if k not in end_collection: 860 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 861 for k in end_collection: 862 if k in self: 863 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 864 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 865 elif end_collection[k] != self[k]: 866 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 867 else: 868 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 869 else: 870 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 871 return changes
Build a list of differences between this collection and another
Arguments:
end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object
self
toend_collection
.prefix: str — The name to use for this collection’s stream in the diff list. Default
'.'
.holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.
873 @must_be_writable 874 @synchronized 875 def apply(self, changes: ChangeList) -> None: 876 """Apply a list of changes from to this collection 877 878 This method takes a list of changes generated by 879 `RichCollectionBase.diff` and applies it to this 880 collection. Afterward, the state of this collection object will 881 match the state of `end_collection` passed to `diff`. If a change 882 conflicts with a local change, it will be saved to an alternate path 883 indicating the conflict. 884 885 Arguments: 886 887 * changes: arvados.collection.ChangeList --- The list of differences 888 generated by `RichCollectionBase.diff`. 889 """ 890 if changes: 891 self.set_committed(False) 892 for change in changes: 893 event_type = change[0] 894 path = change[1] 895 initial = change[2] 896 local = self.find(path) 897 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 898 time.gmtime())) 899 if event_type == ADD: 900 if local is None: 901 # No local file at path, safe to copy over new file 902 self.copy(initial, path) 903 elif local is not None and local != initial: 904 # There is already local file and it is different: 905 # save change to conflict file. 906 self.copy(initial, conflictpath) 907 elif event_type == MOD or event_type == TOK: 908 final = change[3] 909 if local == initial: 910 # Local matches the "initial" item so it has not 911 # changed locally and is safe to update. 912 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 913 # Replace contents of local file with new contents 914 local.replace_contents(final) 915 else: 916 # Overwrite path with new item; this can happen if 917 # path was a file and is now a collection or vice versa 918 self.copy(final, path, overwrite=True) 919 elif event_type == MOD: 920 # Local doesn't match the "start" value or local 921 # is missing (presumably deleted) so save change 922 # to conflict file. Don't do this for TOK events 923 # which means the file didn't change but only had 924 # tokens updated. 925 self.copy(final, conflictpath) 926 elif event_type == DEL: 927 if local == initial: 928 # Local item matches "initial" value, so it is safe to remove. 929 self.remove(path, recursive=True) 930 # else, the file is modified or already removed, in either 931 # case we don't want to try to remove it.
Apply a list of changes from to this collection
This method takes a list of changes generated by
RichCollectionBase.diff
and applies it to this
collection. Afterward, the state of this collection object will
match the state of end_collection
passed to diff
. If a change
conflicts with a local change, it will be saved to an alternate path
indicating the conflict.
Arguments:
- changes: arvados.collection.ChangeList — The list of differences
generated by
RichCollectionBase.diff
.
933 def portable_data_hash(self) -> str: 934 """Get the portable data hash for this collection's manifest""" 935 if self._manifest_locator and self.committed(): 936 # If the collection is already saved on the API server, and it's committed 937 # then return API server's PDH response. 938 return self._portable_data_hash 939 else: 940 stripped = self.portable_manifest_text().encode() 941 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
Get the portable data hash for this collection’s manifest
943 @synchronized 944 def subscribe(self, callback: ChangeCallback) -> None: 945 """Set a notify callback for changes to this collection 946 947 Arguments: 948 949 * callback: arvados.collection.ChangeCallback --- The callable to 950 call each time the collection is changed. 951 """ 952 if self._callback is None: 953 self._callback = callback 954 else: 955 raise errors.ArgumentError("A callback is already set on this collection.")
Set a notify callback for changes to this collection
Arguments:
- callback: arvados.collection.ChangeCallback — The callable to call each time the collection is changed.
957 @synchronized 958 def unsubscribe(self) -> None: 959 """Remove any notify callback set for changes to this collection""" 960 if self._callback is not None: 961 self._callback = None
Remove any notify callback set for changes to this collection
963 @synchronized 964 def notify( 965 self, 966 event: ChangeType, 967 collection: 'RichCollectionBase', 968 name: str, 969 item: CollectionItem, 970 ) -> None: 971 """Notify any subscribed callback about a change to this collection 972 973 .. ATTENTION:: Internal 974 This method is only meant to be used by other Collection methods. 975 976 If a callback has been registered with `RichCollectionBase.subscribe`, 977 it will be called with information about a change to this collection. 978 Then this notification will be propagated to this collection's root. 979 980 Arguments: 981 982 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 983 the collection. 984 985 * collection: arvados.collection.RichCollectionBase --- The 986 collection that was modified. 987 988 * name: str --- The name of the file or stream within `collection` that 989 was modified. 990 991 * item: arvados.arvfile.ArvadosFile | 992 arvados.collection.Subcollection --- For ADD events, the new 993 contents at `name` within `collection`; for DEL events, the 994 item that was removed. For MOD and TOK events, a 2-tuple of 995 the previous item and the new item (may be the same object 996 or different, depending on whether the action involved it 997 being modified in place or replaced). 998 999 """ 1000 if self._callback: 1001 self._callback(event, collection, name, item) 1002 self.root_collection().notify(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — For ADD events, the new contents at
name
withincollection
; for DEL events, the item that was removed. For MOD and TOK events, a 2-tuple of the previous item and the new item (may be the same object or different, depending on whether the action involved it being modified in place or replaced).
1024 @synchronized 1025 def flush(self) -> None: 1026 """Upload any pending data to Keep""" 1027 for e in self.values(): 1028 e.flush()
Upload any pending data to Keep
Inherited Members
1031class Collection(RichCollectionBase): 1032 """Read and manipulate an Arvados collection 1033 1034 This class provides a high-level interface to create, read, and update 1035 Arvados collections and their contents. Refer to the Arvados Python SDK 1036 cookbook for [an introduction to using the Collection class][cookbook]. 1037 1038 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1039 """ 1040 1041 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1042 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1043 keep_client: Optional['arvados.keep.KeepClient']=None, 1044 num_retries: int=10, 1045 parent: Optional['Collection']=None, 1046 apiconfig: Optional[Mapping[str, str]]=None, 1047 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1048 replication_desired: Optional[int]=None, 1049 storage_classes_desired: Optional[List[str]]=None, 1050 put_threads: Optional[int]=None): 1051 """Initialize a Collection object 1052 1053 Arguments: 1054 1055 * manifest_locator_or_text: str | None --- This string can contain a 1056 collection manifest text, portable data hash, or UUID. When given a 1057 portable data hash or UUID, this instance will load a collection 1058 record from the API server. Otherwise, this instance will represent a 1059 new collection without an API server record. The default value `None` 1060 instantiates a new collection with an empty manifest. 1061 1062 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1063 Arvados API client object this instance uses to make requests. If 1064 none is given, this instance creates its own client using the 1065 settings from `apiconfig` (see below). If your client instantiates 1066 many Collection objects, you can help limit memory utilization by 1067 calling `arvados.api.api` to construct an 1068 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client` 1069 for every Collection. 1070 1071 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1072 object this instance uses to make requests. If none is given, this 1073 instance creates its own client using its `api_client`. 1074 1075 * num_retries: int --- The number of times that client requests are 1076 retried. Default 10. 1077 1078 * parent: arvados.collection.Collection | None --- The parent Collection 1079 object of this instance, if any. This argument is primarily used by 1080 other Collection methods; user client code shouldn't need to use it. 1081 1082 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1083 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1084 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1085 Collection object constructs one from these settings. If no 1086 mapping is provided, calls `arvados.config.settings` to get these 1087 parameters from user configuration. 1088 1089 * block_manager: arvados.arvfile._BlockManager | None --- The 1090 _BlockManager object used by this instance to coordinate reading 1091 and writing Keep data blocks. If none is given, this instance 1092 constructs its own. This argument is primarily used by other 1093 Collection methods; user client code shouldn't need to use it. 1094 1095 * replication_desired: int | None --- This controls both the value of 1096 the `replication_desired` field on API collection records saved by 1097 this class, as well as the number of Keep services that the object 1098 writes new data blocks to. If none is given, uses the default value 1099 configured for the cluster. 1100 1101 * storage_classes_desired: list[str] | None --- This controls both 1102 the value of the `storage_classes_desired` field on API collection 1103 records saved by this class, as well as selecting which specific 1104 Keep services the object writes new data blocks to. If none is 1105 given, defaults to an empty list. 1106 1107 * put_threads: int | None --- The number of threads to run 1108 simultaneously to upload data blocks to Keep. This value is used when 1109 building a new `block_manager`. It is unused when a `block_manager` 1110 is provided. 1111 """ 1112 1113 if storage_classes_desired and type(storage_classes_desired) is not list: 1114 raise errors.ArgumentError("storage_classes_desired must be list type.") 1115 1116 super(Collection, self).__init__(parent) 1117 self._api_client = api_client 1118 self._keep_client = keep_client 1119 1120 # Use the keep client from ThreadSafeAPIClient 1121 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient): 1122 self._keep_client = self._api_client.keep 1123 1124 self._block_manager = block_manager 1125 self.replication_desired = replication_desired 1126 self._storage_classes_desired = storage_classes_desired 1127 self.put_threads = put_threads 1128 1129 if apiconfig: 1130 self._config = apiconfig 1131 else: 1132 self._config = config.settings() 1133 1134 self.num_retries = num_retries 1135 self._manifest_locator = None 1136 self._manifest_text = None 1137 self._portable_data_hash = None 1138 self._api_response = None 1139 self._token_refresh_timestamp = 0 1140 1141 self.lock = threading.RLock() 1142 self.events = None 1143 1144 if manifest_locator_or_text: 1145 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1146 self._manifest_locator = manifest_locator_or_text 1147 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1148 self._manifest_locator = manifest_locator_or_text 1149 if not self._has_local_collection_uuid(): 1150 self._has_remote_blocks = True 1151 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1152 self._manifest_text = manifest_locator_or_text 1153 if '+R' in self._manifest_text: 1154 self._has_remote_blocks = True 1155 else: 1156 raise errors.ArgumentError( 1157 "Argument to CollectionReader is not a manifest or a collection UUID") 1158 1159 try: 1160 self._populate() 1161 except errors.SyntaxError as e: 1162 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1163 1164 def storage_classes_desired(self) -> List[str]: 1165 """Get this collection's `storage_classes_desired` value""" 1166 return self._storage_classes_desired or [] 1167 1168 def root_collection(self) -> 'Collection': 1169 return self 1170 1171 def get_properties(self) -> Properties: 1172 """Get this collection's properties 1173 1174 This method always returns a dict. If this collection object does not 1175 have an associated API record, or that record does not have any 1176 properties set, this method returns an empty dict. 1177 """ 1178 if self._api_response and self._api_response["properties"]: 1179 return self._api_response["properties"] 1180 else: 1181 return {} 1182 1183 def get_trash_at(self) -> Optional[datetime.datetime]: 1184 """Get this collection's `trash_at` field 1185 1186 This method parses the `trash_at` field of the collection's API 1187 record and returns a datetime from it. If that field is not set, or 1188 this collection object does not have an associated API record, 1189 returns None. 1190 """ 1191 if self._api_response and self._api_response["trash_at"]: 1192 try: 1193 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1194 except ValueError: 1195 return None 1196 else: 1197 return None 1198 1199 def stream_name(self) -> str: 1200 return "." 1201 1202 def writable(self) -> bool: 1203 return True 1204 1205 @synchronized 1206 @retry_method 1207 def update( 1208 self, 1209 other: Optional['Collection']=None, 1210 num_retries: Optional[int]=None, 1211 ) -> None: 1212 """Merge another collection's contents into this one 1213 1214 This method compares the manifest of this collection instance with 1215 another, then updates this instance's manifest with changes from the 1216 other, renaming files to flag conflicts where necessary. 1217 1218 When called without any arguments, this method reloads the collection's 1219 API record, and updates this instance with any changes that have 1220 appeared server-side. If this instance does not have a corresponding 1221 API record, this method raises `arvados.errors.ArgumentError`. 1222 1223 Arguments: 1224 1225 * other: arvados.collection.Collection | None --- The collection 1226 whose contents should be merged into this instance. When not 1227 provided, this method reloads this collection's API record and 1228 constructs a Collection object from it. If this instance does not 1229 have a corresponding API record, this method raises 1230 `arvados.errors.ArgumentError`. 1231 1232 * num_retries: int | None --- The number of times to retry reloading 1233 the collection's API record from the API server. If not specified, 1234 uses the `num_retries` provided when this instance was constructed. 1235 """ 1236 1237 token_refresh_period = 60*60 1238 time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp) 1239 upstream_response = None 1240 1241 if other is None: 1242 if self._manifest_locator is None: 1243 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1244 1245 if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period: 1246 return 1247 1248 upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1249 other = CollectionReader(upstream_response["manifest_text"]) 1250 1251 if self.committed(): 1252 # 1st case, no local changes, content is the same 1253 if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period: 1254 # No difference in content. Remember the API record 1255 # (metadata such as name or properties may have changed) 1256 # but don't update the token refresh timestamp. 1257 if upstream_response is not None: 1258 self._remember_api_response(upstream_response) 1259 return 1260 1261 # 2nd case, no local changes, but either upstream changed 1262 # or we want to refresh tokens. 1263 1264 self.apply(self.diff(other)) 1265 if upstream_response is not None: 1266 self._remember_api_response(upstream_response) 1267 self._update_token_timestamp() 1268 self.set_committed(True) 1269 return 1270 1271 # 3rd case, upstream changed, but we also have uncommitted 1272 # changes that we want to incorporate so they don't get lost. 1273 1274 # _manifest_text stores the text from last time we received a 1275 # record from the API server. This is the state of the 1276 # collection before our uncommitted changes. 1277 baseline = Collection(self._manifest_text) 1278 1279 # Get the set of changes between our baseline and the other 1280 # collection and apply them to self. 1281 # 1282 # If a file was modified in both 'self' and 'other', the 1283 # 'apply' method keeps the contents of 'self' and creates a 1284 # conflict file with the contents of 'other'. 1285 self.apply(baseline.diff(other)) 1286 1287 # Remember the new baseline, changes to a file 1288 if upstream_response is not None: 1289 self._remember_api_response(upstream_response) 1290 1291 1292 @synchronized 1293 def _my_api(self): 1294 if self._api_client is None: 1295 self._api_client = ThreadSafeAPIClient(self._config, version='v1') 1296 if self._keep_client is None: 1297 self._keep_client = self._api_client.keep 1298 return self._api_client 1299 1300 @synchronized 1301 def _my_keep(self): 1302 if self._keep_client is None: 1303 if self._api_client is None: 1304 self._my_api() 1305 else: 1306 self._keep_client = KeepClient(api_client=self._api_client) 1307 return self._keep_client 1308 1309 @synchronized 1310 def _my_block_manager(self): 1311 if self._block_manager is None: 1312 copies = (self.replication_desired or 1313 self._my_api()._rootDesc.get('defaultCollectionReplication', 1314 2)) 1315 self._block_manager = _BlockManager(self._my_keep(), 1316 copies=copies, 1317 put_threads=self.put_threads, 1318 num_retries=self.num_retries, 1319 storage_classes_func=self.storage_classes_desired) 1320 return self._block_manager 1321 1322 def _remember_api_response(self, response): 1323 self._api_response = response 1324 self._manifest_text = self._api_response['manifest_text'] 1325 self._portable_data_hash = self._api_response['portable_data_hash'] 1326 1327 def _update_token_timestamp(self): 1328 self._token_refresh_timestamp = time.time() 1329 1330 def _populate_from_api_server(self): 1331 # As in KeepClient itself, we must wait until the last 1332 # possible moment to instantiate an API client, in order to 1333 # avoid tripping up clients that don't have access to an API 1334 # server. If we do build one, make sure our Keep client uses 1335 # it. If instantiation fails, we'll fall back to the except 1336 # clause, just like any other Collection lookup 1337 # failure. Return an exception, or None if successful. 1338 self._remember_api_response(self._my_api().collections().get( 1339 uuid=self._manifest_locator).execute( 1340 num_retries=self.num_retries)) 1341 1342 # If not overriden via kwargs, we should try to load the 1343 # replication_desired and storage_classes_desired from the API server 1344 if self.replication_desired is None: 1345 self.replication_desired = self._api_response.get('replication_desired', None) 1346 if self._storage_classes_desired is None: 1347 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1348 1349 def _populate(self): 1350 if self._manifest_text is None: 1351 if self._manifest_locator is None: 1352 return 1353 else: 1354 self._populate_from_api_server() 1355 self._baseline_manifest = self._manifest_text 1356 self._import_manifest(self._manifest_text) 1357 1358 def _has_collection_uuid(self): 1359 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1360 1361 def _has_local_collection_uuid(self): 1362 return self._has_collection_uuid and \ 1363 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1364 1365 def __enter__(self): 1366 return self 1367 1368 def __exit__(self, exc_type, exc_value, traceback): 1369 """Exit a context with this collection instance 1370 1371 If no exception was raised inside the context block, and this 1372 collection is writable and has a corresponding API record, that 1373 record will be updated to match the state of this instance at the end 1374 of the block. 1375 """ 1376 if exc_type is None: 1377 if self.writable() and self._has_collection_uuid(): 1378 self.save() 1379 self.stop_threads() 1380 1381 def stop_threads(self) -> None: 1382 """Stop background Keep upload/download threads""" 1383 if self._block_manager is not None: 1384 self._block_manager.stop_threads() 1385 1386 @synchronized 1387 def manifest_locator(self) -> Optional[str]: 1388 """Get this collection's manifest locator, if any 1389 1390 * If this collection instance is associated with an API record with a 1391 UUID, return that. 1392 * Otherwise, if this collection instance was loaded from an API record 1393 by portable data hash, return that. 1394 * Otherwise, return `None`. 1395 """ 1396 return self._manifest_locator 1397 1398 @synchronized 1399 def clone( 1400 self, 1401 new_parent: Optional['Collection']=None, 1402 new_name: Optional[str]=None, 1403 readonly: bool=False, 1404 new_config: Optional[Mapping[str, str]]=None, 1405 ) -> 'Collection': 1406 """Create a Collection object with the same contents as this instance 1407 1408 This method creates a new Collection object with contents that match 1409 this instance's. The new collection will not be associated with any API 1410 record. 1411 1412 Arguments: 1413 1414 * new_parent: arvados.collection.Collection | None --- This value is 1415 passed to the new Collection's constructor as the `parent` 1416 argument. 1417 1418 * new_name: str | None --- This value is unused. 1419 1420 * readonly: bool --- If this value is true, this method constructs and 1421 returns a `CollectionReader`. Otherwise, it returns a mutable 1422 `Collection`. Default `False`. 1423 1424 * new_config: Mapping[str, str] | None --- This value is passed to the 1425 new Collection's constructor as `apiconfig`. If no value is provided, 1426 defaults to the configuration passed to this instance's constructor. 1427 """ 1428 if new_config is None: 1429 new_config = self._config 1430 if readonly: 1431 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1432 else: 1433 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1434 1435 newcollection._clonefrom(self) 1436 return newcollection 1437 1438 @synchronized 1439 def api_response(self) -> Optional[Dict[str, Any]]: 1440 """Get this instance's associated API record 1441 1442 If this Collection instance has an associated API record, return it. 1443 Otherwise, return `None`. 1444 """ 1445 return self._api_response 1446 1447 def find_or_create( 1448 self, 1449 path: str, 1450 create_type: CreateType, 1451 ) -> CollectionItem: 1452 if path == ".": 1453 return self 1454 else: 1455 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1456 1457 def find(self, path: str) -> CollectionItem: 1458 if path == ".": 1459 return self 1460 else: 1461 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1462 1463 def remove(self, path: str, recursive: bool=False) -> None: 1464 if path == ".": 1465 raise errors.ArgumentError("Cannot remove '.'") 1466 else: 1467 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1468 1469 @must_be_writable 1470 @synchronized 1471 @retry_method 1472 def save( 1473 self, 1474 properties: Optional[Properties]=None, 1475 storage_classes: Optional[StorageClasses]=None, 1476 trash_at: Optional[datetime.datetime]=None, 1477 merge: bool=True, 1478 num_retries: Optional[int]=None, 1479 preserve_version: bool=False, 1480 ) -> str: 1481 """Save collection to an existing API record 1482 1483 This method updates the instance's corresponding API record to match 1484 the instance's state. If this instance does not have a corresponding API 1485 record yet, raises `AssertionError`. (To create a new API record, use 1486 `Collection.save_new`.) This method returns the saved collection 1487 manifest. 1488 1489 Arguments: 1490 1491 * properties: dict[str, Any] | None --- If provided, the API record will 1492 be updated with these properties. Note this will completely replace 1493 any existing properties. 1494 1495 * storage_classes: list[str] | None --- If provided, the API record will 1496 be updated with this value in the `storage_classes_desired` field. 1497 This value will also be saved on the instance and used for any 1498 changes that follow. 1499 1500 * trash_at: datetime.datetime | None --- If provided, the API record 1501 will be updated with this value in the `trash_at` field. 1502 1503 * merge: bool --- If `True` (the default), this method will first 1504 reload this collection's API record, and merge any new contents into 1505 this instance before saving changes. See `Collection.update` for 1506 details. 1507 1508 * num_retries: int | None --- The number of times to retry reloading 1509 the collection's API record from the API server. If not specified, 1510 uses the `num_retries` provided when this instance was constructed. 1511 1512 * preserve_version: bool --- This value will be passed to directly 1513 to the underlying API call. If `True`, the Arvados API will 1514 preserve the versions of this collection both immediately before 1515 and after the update. If `True` when the API server is not 1516 configured with collection versioning, this method raises 1517 `arvados.errors.ArgumentError`. 1518 """ 1519 if properties and type(properties) is not dict: 1520 raise errors.ArgumentError("properties must be dictionary type.") 1521 1522 if storage_classes and type(storage_classes) is not list: 1523 raise errors.ArgumentError("storage_classes must be list type.") 1524 if storage_classes: 1525 self._storage_classes_desired = storage_classes 1526 1527 if trash_at and type(trash_at) is not datetime.datetime: 1528 raise errors.ArgumentError("trash_at must be datetime type.") 1529 1530 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1531 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1532 1533 body={} 1534 if properties: 1535 body["properties"] = properties 1536 if self.storage_classes_desired(): 1537 body["storage_classes_desired"] = self.storage_classes_desired() 1538 if trash_at: 1539 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1540 body["trash_at"] = t 1541 if preserve_version: 1542 body["preserve_version"] = preserve_version 1543 1544 if not self.committed(): 1545 if self._has_remote_blocks: 1546 # Copy any remote blocks to the local cluster. 1547 self._copy_remote_blocks(remote_blocks={}) 1548 self._has_remote_blocks = False 1549 if not self._has_collection_uuid(): 1550 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1551 elif not self._has_local_collection_uuid(): 1552 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1553 1554 self._my_block_manager().commit_all() 1555 1556 if merge: 1557 self.update() 1558 1559 text = self.manifest_text(strip=False) 1560 body['manifest_text'] = text 1561 1562 self._remember_api_response(self._my_api().collections().update( 1563 uuid=self._manifest_locator, 1564 body=body 1565 ).execute(num_retries=num_retries)) 1566 self.set_committed(True) 1567 elif body: 1568 self._remember_api_response(self._my_api().collections().update( 1569 uuid=self._manifest_locator, 1570 body=body 1571 ).execute(num_retries=num_retries)) 1572 1573 return self._manifest_text 1574 1575 1576 @must_be_writable 1577 @synchronized 1578 @retry_method 1579 def save_new( 1580 self, 1581 name: Optional[str]=None, 1582 create_collection_record: bool=True, 1583 owner_uuid: Optional[str]=None, 1584 properties: Optional[Properties]=None, 1585 storage_classes: Optional[StorageClasses]=None, 1586 trash_at: Optional[datetime.datetime]=None, 1587 ensure_unique_name: bool=False, 1588 num_retries: Optional[int]=None, 1589 preserve_version: bool=False, 1590 ): 1591 """Save collection to a new API record 1592 1593 This method finishes uploading new data blocks and (optionally) 1594 creates a new API collection record with the provided data. If a new 1595 record is created, this instance becomes associated with that record 1596 for future updates like `save()`. This method returns the saved 1597 collection manifest. 1598 1599 Arguments: 1600 1601 * name: str | None --- The `name` field to use on the new collection 1602 record. If not specified, a generic default name is generated. 1603 1604 * create_collection_record: bool --- If `True` (the default), creates a 1605 collection record on the API server. If `False`, the method finishes 1606 all data uploads and only returns the resulting collection manifest 1607 without sending it to the API server. 1608 1609 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1610 new collection record. 1611 1612 * properties: dict[str, Any] | None --- The `properties` field to use on 1613 the new collection record. 1614 1615 * storage_classes: list[str] | None --- The 1616 `storage_classes_desired` field to use on the new collection record. 1617 1618 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1619 on the new collection record. 1620 1621 * ensure_unique_name: bool --- This value is passed directly to the 1622 Arvados API when creating the collection record. If `True`, the API 1623 server may modify the submitted `name` to ensure the collection's 1624 `name`+`owner_uuid` combination is unique. If `False` (the default), 1625 if a collection already exists with this same `name`+`owner_uuid` 1626 combination, creating a collection record will raise a validation 1627 error. 1628 1629 * num_retries: int | None --- The number of times to retry reloading 1630 the collection's API record from the API server. If not specified, 1631 uses the `num_retries` provided when this instance was constructed. 1632 1633 * preserve_version: bool --- This value will be passed to directly 1634 to the underlying API call. If `True`, the Arvados API will 1635 preserve the versions of this collection both immediately before 1636 and after the update. If `True` when the API server is not 1637 configured with collection versioning, this method raises 1638 `arvados.errors.ArgumentError`. 1639 """ 1640 if properties and type(properties) is not dict: 1641 raise errors.ArgumentError("properties must be dictionary type.") 1642 1643 if storage_classes and type(storage_classes) is not list: 1644 raise errors.ArgumentError("storage_classes must be list type.") 1645 1646 if trash_at and type(trash_at) is not datetime.datetime: 1647 raise errors.ArgumentError("trash_at must be datetime type.") 1648 1649 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1650 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1651 1652 if self._has_remote_blocks: 1653 # Copy any remote blocks to the local cluster. 1654 self._copy_remote_blocks(remote_blocks={}) 1655 self._has_remote_blocks = False 1656 1657 if storage_classes: 1658 self._storage_classes_desired = storage_classes 1659 1660 self._my_block_manager().commit_all() 1661 text = self.manifest_text(strip=False) 1662 1663 if create_collection_record: 1664 if name is None: 1665 name = "New collection" 1666 ensure_unique_name = True 1667 1668 body = {"manifest_text": text, 1669 "name": name, 1670 "replication_desired": self.replication_desired} 1671 if owner_uuid: 1672 body["owner_uuid"] = owner_uuid 1673 if properties: 1674 body["properties"] = properties 1675 if self.storage_classes_desired(): 1676 body["storage_classes_desired"] = self.storage_classes_desired() 1677 if trash_at: 1678 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1679 body["trash_at"] = t 1680 if preserve_version: 1681 body["preserve_version"] = preserve_version 1682 1683 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1684 self._manifest_locator = self._api_response["uuid"] 1685 self.set_committed(True) 1686 1687 return text 1688 1689 _token_re = re.compile(r'(\S+)(\s+|$)') 1690 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1691 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1692 1693 def _unescape_manifest_path(self, path): 1694 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1695 1696 @synchronized 1697 def _import_manifest(self, manifest_text): 1698 """Import a manifest into a `Collection`. 1699 1700 :manifest_text: 1701 The manifest text to import from. 1702 1703 """ 1704 if len(self) > 0: 1705 raise ArgumentError("Can only import manifest into an empty collection") 1706 1707 STREAM_NAME = 0 1708 BLOCKS = 1 1709 SEGMENTS = 2 1710 1711 stream_name = None 1712 state = STREAM_NAME 1713 1714 for token_and_separator in self._token_re.finditer(manifest_text): 1715 tok = token_and_separator.group(1) 1716 sep = token_and_separator.group(2) 1717 1718 if state == STREAM_NAME: 1719 # starting a new stream 1720 stream_name = self._unescape_manifest_path(tok) 1721 blocks = [] 1722 segments = [] 1723 streamoffset = 0 1724 state = BLOCKS 1725 self.find_or_create(stream_name, COLLECTION) 1726 continue 1727 1728 if state == BLOCKS: 1729 block_locator = self._block_re.match(tok) 1730 if block_locator: 1731 blocksize = int(block_locator.group(1)) 1732 blocks.append(streams.Range(tok, streamoffset, blocksize, 0)) 1733 streamoffset += blocksize 1734 else: 1735 state = SEGMENTS 1736 1737 if state == SEGMENTS: 1738 file_segment = self._segment_re.match(tok) 1739 if file_segment: 1740 pos = int(file_segment.group(1)) 1741 size = int(file_segment.group(2)) 1742 name = self._unescape_manifest_path(file_segment.group(3)) 1743 if name.split('/')[-1] == '.': 1744 # placeholder for persisting an empty directory, not a real file 1745 if len(name) > 2: 1746 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1747 else: 1748 filepath = os.path.join(stream_name, name) 1749 try: 1750 afile = self.find_or_create(filepath, FILE) 1751 except IOError as e: 1752 if e.errno == errno.ENOTDIR: 1753 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1754 else: 1755 raise e from None 1756 if isinstance(afile, ArvadosFile): 1757 afile.add_segment(blocks, pos, size) 1758 else: 1759 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1760 else: 1761 # error! 1762 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1763 1764 if sep == "\n": 1765 stream_name = None 1766 state = STREAM_NAME 1767 1768 self._update_token_timestamp() 1769 self.set_committed(True) 1770 1771 @synchronized 1772 def notify( 1773 self, 1774 event: ChangeType, 1775 collection: 'RichCollectionBase', 1776 name: str, 1777 item: CollectionItem, 1778 ) -> None: 1779 if self._callback: 1780 self._callback(event, collection, name, item)
Read and manipulate an Arvados collection
This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.
1041 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1042 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1043 keep_client: Optional['arvados.keep.KeepClient']=None, 1044 num_retries: int=10, 1045 parent: Optional['Collection']=None, 1046 apiconfig: Optional[Mapping[str, str]]=None, 1047 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1048 replication_desired: Optional[int]=None, 1049 storage_classes_desired: Optional[List[str]]=None, 1050 put_threads: Optional[int]=None): 1051 """Initialize a Collection object 1052 1053 Arguments: 1054 1055 * manifest_locator_or_text: str | None --- This string can contain a 1056 collection manifest text, portable data hash, or UUID. When given a 1057 portable data hash or UUID, this instance will load a collection 1058 record from the API server. Otherwise, this instance will represent a 1059 new collection without an API server record. The default value `None` 1060 instantiates a new collection with an empty manifest. 1061 1062 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1063 Arvados API client object this instance uses to make requests. If 1064 none is given, this instance creates its own client using the 1065 settings from `apiconfig` (see below). If your client instantiates 1066 many Collection objects, you can help limit memory utilization by 1067 calling `arvados.api.api` to construct an 1068 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client` 1069 for every Collection. 1070 1071 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1072 object this instance uses to make requests. If none is given, this 1073 instance creates its own client using its `api_client`. 1074 1075 * num_retries: int --- The number of times that client requests are 1076 retried. Default 10. 1077 1078 * parent: arvados.collection.Collection | None --- The parent Collection 1079 object of this instance, if any. This argument is primarily used by 1080 other Collection methods; user client code shouldn't need to use it. 1081 1082 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1083 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1084 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1085 Collection object constructs one from these settings. If no 1086 mapping is provided, calls `arvados.config.settings` to get these 1087 parameters from user configuration. 1088 1089 * block_manager: arvados.arvfile._BlockManager | None --- The 1090 _BlockManager object used by this instance to coordinate reading 1091 and writing Keep data blocks. If none is given, this instance 1092 constructs its own. This argument is primarily used by other 1093 Collection methods; user client code shouldn't need to use it. 1094 1095 * replication_desired: int | None --- This controls both the value of 1096 the `replication_desired` field on API collection records saved by 1097 this class, as well as the number of Keep services that the object 1098 writes new data blocks to. If none is given, uses the default value 1099 configured for the cluster. 1100 1101 * storage_classes_desired: list[str] | None --- This controls both 1102 the value of the `storage_classes_desired` field on API collection 1103 records saved by this class, as well as selecting which specific 1104 Keep services the object writes new data blocks to. If none is 1105 given, defaults to an empty list. 1106 1107 * put_threads: int | None --- The number of threads to run 1108 simultaneously to upload data blocks to Keep. This value is used when 1109 building a new `block_manager`. It is unused when a `block_manager` 1110 is provided. 1111 """ 1112 1113 if storage_classes_desired and type(storage_classes_desired) is not list: 1114 raise errors.ArgumentError("storage_classes_desired must be list type.") 1115 1116 super(Collection, self).__init__(parent) 1117 self._api_client = api_client 1118 self._keep_client = keep_client 1119 1120 # Use the keep client from ThreadSafeAPIClient 1121 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient): 1122 self._keep_client = self._api_client.keep 1123 1124 self._block_manager = block_manager 1125 self.replication_desired = replication_desired 1126 self._storage_classes_desired = storage_classes_desired 1127 self.put_threads = put_threads 1128 1129 if apiconfig: 1130 self._config = apiconfig 1131 else: 1132 self._config = config.settings() 1133 1134 self.num_retries = num_retries 1135 self._manifest_locator = None 1136 self._manifest_text = None 1137 self._portable_data_hash = None 1138 self._api_response = None 1139 self._token_refresh_timestamp = 0 1140 1141 self.lock = threading.RLock() 1142 self.events = None 1143 1144 if manifest_locator_or_text: 1145 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1146 self._manifest_locator = manifest_locator_or_text 1147 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1148 self._manifest_locator = manifest_locator_or_text 1149 if not self._has_local_collection_uuid(): 1150 self._has_remote_blocks = True 1151 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1152 self._manifest_text = manifest_locator_or_text 1153 if '+R' in self._manifest_text: 1154 self._has_remote_blocks = True 1155 else: 1156 raise errors.ArgumentError( 1157 "Argument to CollectionReader is not a manifest or a collection UUID") 1158 1159 try: 1160 self._populate() 1161 except errors.SyntaxError as e: 1162 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.api.ThreadSafeAPIClient
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
1164 def storage_classes_desired(self) -> List[str]: 1165 """Get this collection's `storage_classes_desired` value""" 1166 return self._storage_classes_desired or []
Get this collection’s storage_classes_desired
value
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
1171 def get_properties(self) -> Properties: 1172 """Get this collection's properties 1173 1174 This method always returns a dict. If this collection object does not 1175 have an associated API record, or that record does not have any 1176 properties set, this method returns an empty dict. 1177 """ 1178 if self._api_response and self._api_response["properties"]: 1179 return self._api_response["properties"] 1180 else: 1181 return {}
Get this collection’s properties
This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.
1183 def get_trash_at(self) -> Optional[datetime.datetime]: 1184 """Get this collection's `trash_at` field 1185 1186 This method parses the `trash_at` field of the collection's API 1187 record and returns a datetime from it. If that field is not set, or 1188 this collection object does not have an associated API record, 1189 returns None. 1190 """ 1191 if self._api_response and self._api_response["trash_at"]: 1192 try: 1193 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1194 except ValueError: 1195 return None 1196 else: 1197 return None
Get this collection’s trash_at
field
This method parses the trash_at
field of the collection’s API
record and returns a datetime from it. If that field is not set, or
this collection object does not have an associated API record,
returns None.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
1205 @synchronized 1206 @retry_method 1207 def update( 1208 self, 1209 other: Optional['Collection']=None, 1210 num_retries: Optional[int]=None, 1211 ) -> None: 1212 """Merge another collection's contents into this one 1213 1214 This method compares the manifest of this collection instance with 1215 another, then updates this instance's manifest with changes from the 1216 other, renaming files to flag conflicts where necessary. 1217 1218 When called without any arguments, this method reloads the collection's 1219 API record, and updates this instance with any changes that have 1220 appeared server-side. If this instance does not have a corresponding 1221 API record, this method raises `arvados.errors.ArgumentError`. 1222 1223 Arguments: 1224 1225 * other: arvados.collection.Collection | None --- The collection 1226 whose contents should be merged into this instance. When not 1227 provided, this method reloads this collection's API record and 1228 constructs a Collection object from it. If this instance does not 1229 have a corresponding API record, this method raises 1230 `arvados.errors.ArgumentError`. 1231 1232 * num_retries: int | None --- The number of times to retry reloading 1233 the collection's API record from the API server. If not specified, 1234 uses the `num_retries` provided when this instance was constructed. 1235 """ 1236 1237 token_refresh_period = 60*60 1238 time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp) 1239 upstream_response = None 1240 1241 if other is None: 1242 if self._manifest_locator is None: 1243 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1244 1245 if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period: 1246 return 1247 1248 upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1249 other = CollectionReader(upstream_response["manifest_text"]) 1250 1251 if self.committed(): 1252 # 1st case, no local changes, content is the same 1253 if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period: 1254 # No difference in content. Remember the API record 1255 # (metadata such as name or properties may have changed) 1256 # but don't update the token refresh timestamp. 1257 if upstream_response is not None: 1258 self._remember_api_response(upstream_response) 1259 return 1260 1261 # 2nd case, no local changes, but either upstream changed 1262 # or we want to refresh tokens. 1263 1264 self.apply(self.diff(other)) 1265 if upstream_response is not None: 1266 self._remember_api_response(upstream_response) 1267 self._update_token_timestamp() 1268 self.set_committed(True) 1269 return 1270 1271 # 3rd case, upstream changed, but we also have uncommitted 1272 # changes that we want to incorporate so they don't get lost. 1273 1274 # _manifest_text stores the text from last time we received a 1275 # record from the API server. This is the state of the 1276 # collection before our uncommitted changes. 1277 baseline = Collection(self._manifest_text) 1278 1279 # Get the set of changes between our baseline and the other 1280 # collection and apply them to self. 1281 # 1282 # If a file was modified in both 'self' and 'other', the 1283 # 'apply' method keeps the contents of 'self' and creates a 1284 # conflict file with the contents of 'other'. 1285 self.apply(baseline.diff(other)) 1286 1287 # Remember the new baseline, changes to a file 1288 if upstream_response is not None: 1289 self._remember_api_response(upstream_response)
Merge another collection’s contents into this one
This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.
When called without any arguments, this method reloads the collection’s
API record, and updates this instance with any changes that have
appeared server-side. If this instance does not have a corresponding
API record, this method raises arvados.errors.ArgumentError
.
Arguments:
other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises
arvados.errors.ArgumentError
.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.
1381 def stop_threads(self) -> None: 1382 """Stop background Keep upload/download threads""" 1383 if self._block_manager is not None: 1384 self._block_manager.stop_threads()
Stop background Keep upload/download threads
1386 @synchronized 1387 def manifest_locator(self) -> Optional[str]: 1388 """Get this collection's manifest locator, if any 1389 1390 * If this collection instance is associated with an API record with a 1391 UUID, return that. 1392 * Otherwise, if this collection instance was loaded from an API record 1393 by portable data hash, return that. 1394 * Otherwise, return `None`. 1395 """ 1396 return self._manifest_locator
Get this collection’s manifest locator, if any
- If this collection instance is associated with an API record with a UUID, return that.
- Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
- Otherwise, return
None
.
1398 @synchronized 1399 def clone( 1400 self, 1401 new_parent: Optional['Collection']=None, 1402 new_name: Optional[str]=None, 1403 readonly: bool=False, 1404 new_config: Optional[Mapping[str, str]]=None, 1405 ) -> 'Collection': 1406 """Create a Collection object with the same contents as this instance 1407 1408 This method creates a new Collection object with contents that match 1409 this instance's. The new collection will not be associated with any API 1410 record. 1411 1412 Arguments: 1413 1414 * new_parent: arvados.collection.Collection | None --- This value is 1415 passed to the new Collection's constructor as the `parent` 1416 argument. 1417 1418 * new_name: str | None --- This value is unused. 1419 1420 * readonly: bool --- If this value is true, this method constructs and 1421 returns a `CollectionReader`. Otherwise, it returns a mutable 1422 `Collection`. Default `False`. 1423 1424 * new_config: Mapping[str, str] | None --- This value is passed to the 1425 new Collection's constructor as `apiconfig`. If no value is provided, 1426 defaults to the configuration passed to this instance's constructor. 1427 """ 1428 if new_config is None: 1429 new_config = self._config 1430 if readonly: 1431 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1432 else: 1433 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1434 1435 newcollection._clonefrom(self) 1436 return newcollection
Create a Collection object with the same contents as this instance
This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.
Arguments:
new_parent: Collection | None — This value is passed to the new Collection’s constructor as the
parent
argument.new_name: str | None — This value is unused.
readonly: bool — If this value is true, this method constructs and returns a
CollectionReader
. Otherwise, it returns a mutableCollection
. DefaultFalse
.new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as
apiconfig
. If no value is provided, defaults to the configuration passed to this instance’s constructor.
1438 @synchronized 1439 def api_response(self) -> Optional[Dict[str, Any]]: 1440 """Get this instance's associated API record 1441 1442 If this Collection instance has an associated API record, return it. 1443 Otherwise, return `None`. 1444 """ 1445 return self._api_response
Get this instance’s associated API record
If this Collection instance has an associated API record, return it.
Otherwise, return None
.
1447 def find_or_create( 1448 self, 1449 path: str, 1450 create_type: CreateType, 1451 ) -> CollectionItem: 1452 if path == ".": 1453 return self 1454 else: 1455 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
1457 def find(self, path: str) -> CollectionItem: 1458 if path == ".": 1459 return self 1460 else: 1461 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
1463 def remove(self, path: str, recursive: bool=False) -> None: 1464 if path == ".": 1465 raise errors.ArgumentError("Cannot remove '.'") 1466 else: 1467 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
1469 @must_be_writable 1470 @synchronized 1471 @retry_method 1472 def save( 1473 self, 1474 properties: Optional[Properties]=None, 1475 storage_classes: Optional[StorageClasses]=None, 1476 trash_at: Optional[datetime.datetime]=None, 1477 merge: bool=True, 1478 num_retries: Optional[int]=None, 1479 preserve_version: bool=False, 1480 ) -> str: 1481 """Save collection to an existing API record 1482 1483 This method updates the instance's corresponding API record to match 1484 the instance's state. If this instance does not have a corresponding API 1485 record yet, raises `AssertionError`. (To create a new API record, use 1486 `Collection.save_new`.) This method returns the saved collection 1487 manifest. 1488 1489 Arguments: 1490 1491 * properties: dict[str, Any] | None --- If provided, the API record will 1492 be updated with these properties. Note this will completely replace 1493 any existing properties. 1494 1495 * storage_classes: list[str] | None --- If provided, the API record will 1496 be updated with this value in the `storage_classes_desired` field. 1497 This value will also be saved on the instance and used for any 1498 changes that follow. 1499 1500 * trash_at: datetime.datetime | None --- If provided, the API record 1501 will be updated with this value in the `trash_at` field. 1502 1503 * merge: bool --- If `True` (the default), this method will first 1504 reload this collection's API record, and merge any new contents into 1505 this instance before saving changes. See `Collection.update` for 1506 details. 1507 1508 * num_retries: int | None --- The number of times to retry reloading 1509 the collection's API record from the API server. If not specified, 1510 uses the `num_retries` provided when this instance was constructed. 1511 1512 * preserve_version: bool --- This value will be passed to directly 1513 to the underlying API call. If `True`, the Arvados API will 1514 preserve the versions of this collection both immediately before 1515 and after the update. If `True` when the API server is not 1516 configured with collection versioning, this method raises 1517 `arvados.errors.ArgumentError`. 1518 """ 1519 if properties and type(properties) is not dict: 1520 raise errors.ArgumentError("properties must be dictionary type.") 1521 1522 if storage_classes and type(storage_classes) is not list: 1523 raise errors.ArgumentError("storage_classes must be list type.") 1524 if storage_classes: 1525 self._storage_classes_desired = storage_classes 1526 1527 if trash_at and type(trash_at) is not datetime.datetime: 1528 raise errors.ArgumentError("trash_at must be datetime type.") 1529 1530 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1531 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1532 1533 body={} 1534 if properties: 1535 body["properties"] = properties 1536 if self.storage_classes_desired(): 1537 body["storage_classes_desired"] = self.storage_classes_desired() 1538 if trash_at: 1539 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1540 body["trash_at"] = t 1541 if preserve_version: 1542 body["preserve_version"] = preserve_version 1543 1544 if not self.committed(): 1545 if self._has_remote_blocks: 1546 # Copy any remote blocks to the local cluster. 1547 self._copy_remote_blocks(remote_blocks={}) 1548 self._has_remote_blocks = False 1549 if not self._has_collection_uuid(): 1550 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1551 elif not self._has_local_collection_uuid(): 1552 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1553 1554 self._my_block_manager().commit_all() 1555 1556 if merge: 1557 self.update() 1558 1559 text = self.manifest_text(strip=False) 1560 body['manifest_text'] = text 1561 1562 self._remember_api_response(self._my_api().collections().update( 1563 uuid=self._manifest_locator, 1564 body=body 1565 ).execute(num_retries=num_retries)) 1566 self.set_committed(True) 1567 elif body: 1568 self._remember_api_response(self._my_api().collections().update( 1569 uuid=self._manifest_locator, 1570 body=body 1571 ).execute(num_retries=num_retries)) 1572 1573 return self._manifest_text
Save collection to an existing API record
This method updates the instance’s corresponding API record to match
the instance’s state. If this instance does not have a corresponding API
record yet, raises AssertionError
. (To create a new API record, use
Collection.save_new
.) This method returns the saved collection
manifest.
Arguments:
properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.
storage_classes: list[str] | None — If provided, the API record will be updated with this value in the
storage_classes_desired
field. This value will also be saved on the instance and used for any changes that follow.trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the
trash_at
field.merge: bool — If
True
(the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. SeeCollection.update
for details.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1576 @must_be_writable 1577 @synchronized 1578 @retry_method 1579 def save_new( 1580 self, 1581 name: Optional[str]=None, 1582 create_collection_record: bool=True, 1583 owner_uuid: Optional[str]=None, 1584 properties: Optional[Properties]=None, 1585 storage_classes: Optional[StorageClasses]=None, 1586 trash_at: Optional[datetime.datetime]=None, 1587 ensure_unique_name: bool=False, 1588 num_retries: Optional[int]=None, 1589 preserve_version: bool=False, 1590 ): 1591 """Save collection to a new API record 1592 1593 This method finishes uploading new data blocks and (optionally) 1594 creates a new API collection record with the provided data. If a new 1595 record is created, this instance becomes associated with that record 1596 for future updates like `save()`. This method returns the saved 1597 collection manifest. 1598 1599 Arguments: 1600 1601 * name: str | None --- The `name` field to use on the new collection 1602 record. If not specified, a generic default name is generated. 1603 1604 * create_collection_record: bool --- If `True` (the default), creates a 1605 collection record on the API server. If `False`, the method finishes 1606 all data uploads and only returns the resulting collection manifest 1607 without sending it to the API server. 1608 1609 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1610 new collection record. 1611 1612 * properties: dict[str, Any] | None --- The `properties` field to use on 1613 the new collection record. 1614 1615 * storage_classes: list[str] | None --- The 1616 `storage_classes_desired` field to use on the new collection record. 1617 1618 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1619 on the new collection record. 1620 1621 * ensure_unique_name: bool --- This value is passed directly to the 1622 Arvados API when creating the collection record. If `True`, the API 1623 server may modify the submitted `name` to ensure the collection's 1624 `name`+`owner_uuid` combination is unique. If `False` (the default), 1625 if a collection already exists with this same `name`+`owner_uuid` 1626 combination, creating a collection record will raise a validation 1627 error. 1628 1629 * num_retries: int | None --- The number of times to retry reloading 1630 the collection's API record from the API server. If not specified, 1631 uses the `num_retries` provided when this instance was constructed. 1632 1633 * preserve_version: bool --- This value will be passed to directly 1634 to the underlying API call. If `True`, the Arvados API will 1635 preserve the versions of this collection both immediately before 1636 and after the update. If `True` when the API server is not 1637 configured with collection versioning, this method raises 1638 `arvados.errors.ArgumentError`. 1639 """ 1640 if properties and type(properties) is not dict: 1641 raise errors.ArgumentError("properties must be dictionary type.") 1642 1643 if storage_classes and type(storage_classes) is not list: 1644 raise errors.ArgumentError("storage_classes must be list type.") 1645 1646 if trash_at and type(trash_at) is not datetime.datetime: 1647 raise errors.ArgumentError("trash_at must be datetime type.") 1648 1649 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1650 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1651 1652 if self._has_remote_blocks: 1653 # Copy any remote blocks to the local cluster. 1654 self._copy_remote_blocks(remote_blocks={}) 1655 self._has_remote_blocks = False 1656 1657 if storage_classes: 1658 self._storage_classes_desired = storage_classes 1659 1660 self._my_block_manager().commit_all() 1661 text = self.manifest_text(strip=False) 1662 1663 if create_collection_record: 1664 if name is None: 1665 name = "New collection" 1666 ensure_unique_name = True 1667 1668 body = {"manifest_text": text, 1669 "name": name, 1670 "replication_desired": self.replication_desired} 1671 if owner_uuid: 1672 body["owner_uuid"] = owner_uuid 1673 if properties: 1674 body["properties"] = properties 1675 if self.storage_classes_desired(): 1676 body["storage_classes_desired"] = self.storage_classes_desired() 1677 if trash_at: 1678 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1679 body["trash_at"] = t 1680 if preserve_version: 1681 body["preserve_version"] = preserve_version 1682 1683 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1684 self._manifest_locator = self._api_response["uuid"] 1685 self.set_committed(True) 1686 1687 return text
Save collection to a new API record
This method finishes uploading new data blocks and (optionally)
creates a new API collection record with the provided data. If a new
record is created, this instance becomes associated with that record
for future updates like save()
. This method returns the saved
collection manifest.
Arguments:
name: str | None — The
name
field to use on the new collection record. If not specified, a generic default name is generated.create_collection_record: bool — If
True
(the default), creates a collection record on the API server. IfFalse
, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.owner_uuid: str | None — The
owner_uuid
field to use on the new collection record.properties: dict[str, Any] | None — The
properties
field to use on the new collection record.storage_classes: list[str] | None — The
storage_classes_desired
field to use on the new collection record.trash_at: datetime.datetime | None — The
trash_at
field to use on the new collection record.ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If
True
, the API server may modify the submittedname
to ensure the collection’sname
+owner_uuid
combination is unique. IfFalse
(the default), if a collection already exists with this samename
+owner_uuid
combination, creating a collection record will raise a validation error.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1771 @synchronized 1772 def notify( 1773 self, 1774 event: ChangeType, 1775 collection: 'RichCollectionBase', 1776 name: str, 1777 item: CollectionItem, 1778 ) -> None: 1779 if self._callback: 1780 self._callback(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — For ADD events, the new contents at
name
withincollection
; for DEL events, the item that was removed. For MOD and TOK events, a 2-tuple of the previous item and the new item (may be the same object or different, depending on whether the action involved it being modified in place or replaced).
1783class Subcollection(RichCollectionBase): 1784 """Read and manipulate a stream/directory within an Arvados collection 1785 1786 This class represents a single stream (like a directory) within an Arvados 1787 `Collection`. It is returned by `Collection.find` and provides the same API. 1788 Operations that work on the API collection record propagate to the parent 1789 `Collection` object. 1790 """ 1791 1792 def __init__(self, parent, name): 1793 super(Subcollection, self).__init__(parent) 1794 self.lock = self.root_collection().lock 1795 self._manifest_text = None 1796 self.name = name 1797 self.num_retries = parent.num_retries 1798 1799 def root_collection(self) -> 'Collection': 1800 return self.parent.root_collection() 1801 1802 def writable(self) -> bool: 1803 return self.root_collection().writable() 1804 1805 def _my_api(self): 1806 return self.root_collection()._my_api() 1807 1808 def _my_keep(self): 1809 return self.root_collection()._my_keep() 1810 1811 def _my_block_manager(self): 1812 return self.root_collection()._my_block_manager() 1813 1814 def stream_name(self) -> str: 1815 return os.path.join(self.parent.stream_name(), self.name) 1816 1817 @synchronized 1818 def clone( 1819 self, 1820 new_parent: Optional['Collection']=None, 1821 new_name: Optional[str]=None, 1822 ) -> 'Subcollection': 1823 c = Subcollection(new_parent, new_name) 1824 c._clonefrom(self) 1825 return c 1826 1827 @must_be_writable 1828 @synchronized 1829 def _reparent(self, newparent, newname): 1830 self.set_committed(False) 1831 self.flush() 1832 self.parent.remove(self.name, recursive=True) 1833 self.parent = newparent 1834 self.name = newname 1835 self.lock = self.parent.root_collection().lock 1836 1837 @synchronized 1838 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1839 """Encode empty directories by using an \056-named (".") empty file""" 1840 if len(self._items) == 0: 1841 return "%s %s 0:0:\\056\n" % ( 1842 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1843 return super(Subcollection, self)._get_manifest_text(stream_name, 1844 strip, normalize, 1845 only_committed)
Read and manipulate a stream/directory within an Arvados collection
This class represents a single stream (like a directory) within an Arvados
Collection
. It is returned by Collection.find
and provides the same API.
Operations that work on the API collection record propagate to the parent
Collection
object.
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Inherited Members
1848class CollectionReader(Collection): 1849 """Read-only `Collection` subclass 1850 1851 This class will never create or update any API collection records. You can 1852 use this class for additional code safety when you only need to read 1853 existing collections. 1854 """ 1855 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1856 self._in_init = True 1857 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1858 self._in_init = False 1859 1860 # Forego any locking since it should never change once initialized. 1861 self.lock = NoopLock() 1862 1863 # Backwards compatability with old CollectionReader 1864 # all_streams() and all_files() 1865 self._streams = None 1866 1867 def writable(self) -> bool: 1868 return self._in_init
Read-only Collection
subclass
This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.
1855 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1856 self._in_init = True 1857 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1858 self._in_init = False 1859 1860 # Forego any locking since it should never change once initialized. 1861 self.lock = NoopLock() 1862 1863 # Backwards compatability with old CollectionReader 1864 # all_streams() and all_files() 1865 self._streams = None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.api.ThreadSafeAPIClient
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.