arvados.collection
Tools to work with Arvados collections
This module provides high-level interfaces to create, read, and update
Arvados collections. Most users will want to instantiate Collection
objects, and use methods like Collection.open
and Collection.mkdirs
to
read and write data in the collection. Refer to the Arvados Python SDK
cookbook for an introduction to using the Collection class.
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Tools to work with Arvados collections 5 6This module provides high-level interfaces to create, read, and update 7Arvados collections. Most users will want to instantiate `Collection` 8objects, and use methods like `Collection.open` and `Collection.mkdirs` to 9read and write data in the collection. Refer to the Arvados Python SDK 10cookbook for [an introduction to using the Collection class][cookbook]. 11 12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 13""" 14 15import ciso8601 16import datetime 17import errno 18import functools 19import hashlib 20import io 21import logging 22import os 23import re 24import sys 25import threading 26import time 27 28from collections import deque 29from stat import * 30 31from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock 32from .keep import KeepLocator, KeepClient 33from ._normalize_stream import normalize_stream, escape 34from ._ranges import Range, LocatorAndRange 35from .safeapi import ThreadSafeApiCache 36import arvados.config as config 37import arvados.errors as errors 38import arvados.util 39import arvados.events as events 40from arvados.retry import retry_method 41 42from typing import ( 43 Any, 44 Callable, 45 Dict, 46 IO, 47 Iterator, 48 List, 49 Mapping, 50 Optional, 51 Tuple, 52 Union, 53) 54 55if sys.version_info < (3, 8): 56 from typing_extensions import Literal 57else: 58 from typing import Literal 59 60_logger = logging.getLogger('arvados.collection') 61 62ADD = "add" 63"""Argument value for `Collection` methods to represent an added item""" 64DEL = "del" 65"""Argument value for `Collection` methods to represent a removed item""" 66MOD = "mod" 67"""Argument value for `Collection` methods to represent a modified item""" 68TOK = "tok" 69"""Argument value for `Collection` methods to represent an item with token differences""" 70FILE = "file" 71"""`create_type` value for `Collection.find_or_create`""" 72COLLECTION = "collection" 73"""`create_type` value for `Collection.find_or_create`""" 74 75ChangeList = List[Union[ 76 Tuple[Literal[ADD, DEL], str, 'Collection'], 77 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'], 78]] 79ChangeType = Literal[ADD, DEL, MOD, TOK] 80CollectionItem = Union[ArvadosFile, 'Collection'] 81ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object] 82CreateType = Literal[COLLECTION, FILE] 83Properties = Dict[str, Any] 84StorageClasses = List[str] 85 86class CollectionBase(object): 87 """Abstract base class for Collection classes 88 89 .. ATTENTION:: Internal 90 This class is meant to be used by other parts of the SDK. User code 91 should instantiate or subclass `Collection` or one of its subclasses 92 directly. 93 """ 94 95 def __enter__(self): 96 """Enter a context block with this collection instance""" 97 return self 98 99 def __exit__(self, exc_type, exc_value, traceback): 100 """Exit a context block with this collection instance""" 101 pass 102 103 def _my_keep(self): 104 if self._keep_client is None: 105 self._keep_client = KeepClient(api_client=self._api_client, 106 num_retries=self.num_retries) 107 return self._keep_client 108 109 def stripped_manifest(self) -> str: 110 """Create a copy of the collection manifest with only size hints 111 112 This method returns a string with the current collection's manifest 113 text with all non-portable locator hints like permission hints and 114 remote cluster hints removed. The only hints in the returned manifest 115 will be size hints. 116 """ 117 raw = self.manifest_text() 118 clean = [] 119 for line in raw.split("\n"): 120 fields = line.split() 121 if fields: 122 clean_fields = fields[:1] + [ 123 (re.sub(r'\+[^\d][^\+]*', '', x) 124 if re.match(arvados.util.keep_locator_pattern, x) 125 else x) 126 for x in fields[1:]] 127 clean += [' '.join(clean_fields), "\n"] 128 return ''.join(clean) 129 130 131class _WriterFile(_FileLikeObjectBase): 132 def __init__(self, coll_writer, name): 133 super(_WriterFile, self).__init__(name, 'wb') 134 self.dest = coll_writer 135 136 def close(self): 137 super(_WriterFile, self).close() 138 self.dest.finish_current_file() 139 140 @_FileLikeObjectBase._before_close 141 def write(self, data): 142 self.dest.write(data) 143 144 @_FileLikeObjectBase._before_close 145 def writelines(self, seq): 146 for data in seq: 147 self.write(data) 148 149 @_FileLikeObjectBase._before_close 150 def flush(self): 151 self.dest.flush_data() 152 153 154class RichCollectionBase(CollectionBase): 155 """Base class for Collection classes 156 157 .. ATTENTION:: Internal 158 This class is meant to be used by other parts of the SDK. User code 159 should instantiate or subclass `Collection` or one of its subclasses 160 directly. 161 """ 162 163 def __init__(self, parent=None): 164 self.parent = parent 165 self._committed = False 166 self._has_remote_blocks = False 167 self._callback = None 168 self._items = {} 169 170 def _my_api(self): 171 raise NotImplementedError() 172 173 def _my_keep(self): 174 raise NotImplementedError() 175 176 def _my_block_manager(self): 177 raise NotImplementedError() 178 179 def writable(self) -> bool: 180 """Indicate whether this collection object can be modified 181 182 This method returns `False` if this object is a `CollectionReader`, 183 else `True`. 184 """ 185 raise NotImplementedError() 186 187 def root_collection(self) -> 'Collection': 188 """Get this collection's root collection object 189 190 If you open a subcollection with `Collection.find`, calling this method 191 on that subcollection returns the source Collection object. 192 """ 193 raise NotImplementedError() 194 195 def stream_name(self) -> str: 196 """Get the name of the manifest stream represented by this collection 197 198 If you open a subcollection with `Collection.find`, calling this method 199 on that subcollection returns the name of the stream you opened. 200 """ 201 raise NotImplementedError() 202 203 @synchronized 204 def has_remote_blocks(self) -> bool: 205 """Indiciate whether the collection refers to remote data 206 207 Returns `True` if the collection manifest includes any Keep locators 208 with a remote hint (`+R`), else `False`. 209 """ 210 if self._has_remote_blocks: 211 return True 212 for item in self: 213 if self[item].has_remote_blocks(): 214 return True 215 return False 216 217 @synchronized 218 def set_has_remote_blocks(self, val: bool) -> None: 219 """Cache whether this collection refers to remote blocks 220 221 .. ATTENTION:: Internal 222 This method is only meant to be used by other Collection methods. 223 224 Set this collection's cached "has remote blocks" flag to the given 225 value. 226 """ 227 self._has_remote_blocks = val 228 if self.parent: 229 self.parent.set_has_remote_blocks(val) 230 231 @must_be_writable 232 @synchronized 233 def find_or_create( 234 self, 235 path: str, 236 create_type: CreateType, 237 ) -> CollectionItem: 238 """Get the item at the given path, creating it if necessary 239 240 If `path` refers to a stream in this collection, returns a 241 corresponding `Subcollection` object. If `path` refers to a file in 242 this collection, returns a corresponding 243 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 244 this collection, then this method creates a new object and returns 245 it, creating parent streams as needed. The type of object created is 246 determined by the value of `create_type`. 247 248 Arguments: 249 250 * path: str --- The path to find or create within this collection. 251 252 * create_type: Literal[COLLECTION, FILE] --- The type of object to 253 create at `path` if one does not exist. Passing `COLLECTION` 254 creates a stream and returns the corresponding 255 `Subcollection`. Passing `FILE` creates a new file and returns the 256 corresponding `arvados.arvfile.ArvadosFile`. 257 """ 258 pathcomponents = path.split("/", 1) 259 if pathcomponents[0]: 260 item = self._items.get(pathcomponents[0]) 261 if len(pathcomponents) == 1: 262 if item is None: 263 # create new file 264 if create_type == COLLECTION: 265 item = Subcollection(self, pathcomponents[0]) 266 else: 267 item = ArvadosFile(self, pathcomponents[0]) 268 self._items[pathcomponents[0]] = item 269 self.set_committed(False) 270 self.notify(ADD, self, pathcomponents[0], item) 271 return item 272 else: 273 if item is None: 274 # create new collection 275 item = Subcollection(self, pathcomponents[0]) 276 self._items[pathcomponents[0]] = item 277 self.set_committed(False) 278 self.notify(ADD, self, pathcomponents[0], item) 279 if isinstance(item, RichCollectionBase): 280 return item.find_or_create(pathcomponents[1], create_type) 281 else: 282 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 283 else: 284 return self 285 286 @synchronized 287 def find(self, path: str) -> CollectionItem: 288 """Get the item at the given path 289 290 If `path` refers to a stream in this collection, returns a 291 corresponding `Subcollection` object. If `path` refers to a file in 292 this collection, returns a corresponding 293 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 294 this collection, then this method raises `NotADirectoryError`. 295 296 Arguments: 297 298 * path: str --- The path to find or create within this collection. 299 """ 300 if not path: 301 raise errors.ArgumentError("Parameter 'path' is empty.") 302 303 pathcomponents = path.split("/", 1) 304 if pathcomponents[0] == '': 305 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 306 307 item = self._items.get(pathcomponents[0]) 308 if item is None: 309 return None 310 elif len(pathcomponents) == 1: 311 return item 312 else: 313 if isinstance(item, RichCollectionBase): 314 if pathcomponents[1]: 315 return item.find(pathcomponents[1]) 316 else: 317 return item 318 else: 319 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 320 321 @synchronized 322 def mkdirs(self, path: str) -> 'Subcollection': 323 """Create and return a subcollection at `path` 324 325 If `path` exists within this collection, raises `FileExistsError`. 326 Otherwise, creates a stream at that path and returns the 327 corresponding `Subcollection`. 328 """ 329 if self.find(path) != None: 330 raise IOError(errno.EEXIST, "Directory or file exists", path) 331 332 return self.find_or_create(path, COLLECTION) 333 334 def open( 335 self, 336 path: str, 337 mode: str="r", 338 encoding: Optional[str]=None 339 ) -> IO: 340 """Open a file-like object within the collection 341 342 This method returns a file-like object that can read and/or write the 343 file located at `path` within the collection. If you attempt to write 344 a `path` that does not exist, the file is created with `find_or_create`. 345 If the file cannot be opened for any other reason, this method raises 346 `OSError` with an appropriate errno. 347 348 Arguments: 349 350 * path: str --- The path of the file to open within this collection 351 352 * mode: str --- The mode to open this file. Supports all the same 353 values as `builtins.open`. 354 355 * encoding: str | None --- The text encoding of the file. Only used 356 when the file is opened in text mode. The default is 357 platform-dependent. 358 359 """ 360 if not re.search(r'^[rwa][bt]?\+?$', mode): 361 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 362 363 if mode[0] == 'r' and '+' not in mode: 364 fclass = ArvadosFileReader 365 arvfile = self.find(path) 366 elif not self.writable(): 367 raise IOError(errno.EROFS, "Collection is read only") 368 else: 369 fclass = ArvadosFileWriter 370 arvfile = self.find_or_create(path, FILE) 371 372 if arvfile is None: 373 raise IOError(errno.ENOENT, "File not found", path) 374 if not isinstance(arvfile, ArvadosFile): 375 raise IOError(errno.EISDIR, "Is a directory", path) 376 377 if mode[0] == 'w': 378 arvfile.truncate(0) 379 380 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 381 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 382 if 'b' not in mode: 383 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 384 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 385 return f 386 387 def modified(self) -> bool: 388 """Indicate whether this collection has an API server record 389 390 Returns `False` if this collection corresponds to a record loaded from 391 the API server, `True` otherwise. 392 """ 393 return not self.committed() 394 395 @synchronized 396 def committed(self): 397 """Indicate whether this collection has an API server record 398 399 Returns `True` if this collection corresponds to a record loaded from 400 the API server, `False` otherwise. 401 """ 402 return self._committed 403 404 @synchronized 405 def set_committed(self, value: bool=True): 406 """Cache whether this collection has an API server record 407 408 .. ATTENTION:: Internal 409 This method is only meant to be used by other Collection methods. 410 411 Set this collection's cached "committed" flag to the given 412 value and propagates it as needed. 413 """ 414 if value == self._committed: 415 return 416 if value: 417 for k,v in self._items.items(): 418 v.set_committed(True) 419 self._committed = True 420 else: 421 self._committed = False 422 if self.parent is not None: 423 self.parent.set_committed(False) 424 425 @synchronized 426 def __iter__(self) -> Iterator[str]: 427 """Iterate names of streams and files in this collection 428 429 This method does not recurse. It only iterates the contents of this 430 collection's corresponding stream. 431 """ 432 return iter(self._items) 433 434 @synchronized 435 def __getitem__(self, k: str) -> CollectionItem: 436 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 437 438 This method does not recurse. If you want to search a path, use 439 `RichCollectionBase.find` instead. 440 """ 441 return self._items[k] 442 443 @synchronized 444 def __contains__(self, k: str) -> bool: 445 """Indicate whether this collection has an item with this name 446 447 This method does not recurse. It you want to check a path, use 448 `RichCollectionBase.exists` instead. 449 """ 450 return k in self._items 451 452 @synchronized 453 def __len__(self): 454 """Get the number of items directly contained in this collection 455 456 This method does not recurse. It only counts the streams and files 457 in this collection's corresponding stream. 458 """ 459 return len(self._items) 460 461 @must_be_writable 462 @synchronized 463 def __delitem__(self, p: str) -> None: 464 """Delete an item from this collection's stream 465 466 This method does not recurse. If you want to remove an item by a 467 path, use `RichCollectionBase.remove` instead. 468 """ 469 del self._items[p] 470 self.set_committed(False) 471 self.notify(DEL, self, p, None) 472 473 @synchronized 474 def keys(self) -> Iterator[str]: 475 """Iterate names of streams and files in this collection 476 477 This method does not recurse. It only iterates the contents of this 478 collection's corresponding stream. 479 """ 480 return self._items.keys() 481 482 @synchronized 483 def values(self) -> List[CollectionItem]: 484 """Get a list of objects in this collection's stream 485 486 The return value includes a `Subcollection` for every stream, and an 487 `arvados.arvfile.ArvadosFile` for every file, directly within this 488 collection's stream. This method does not recurse. 489 """ 490 return list(self._items.values()) 491 492 @synchronized 493 def items(self) -> List[Tuple[str, CollectionItem]]: 494 """Get a list of `(name, object)` tuples from this collection's stream 495 496 The return value includes a `Subcollection` for every stream, and an 497 `arvados.arvfile.ArvadosFile` for every file, directly within this 498 collection's stream. This method does not recurse. 499 """ 500 return list(self._items.items()) 501 502 def exists(self, path: str) -> bool: 503 """Indicate whether this collection includes an item at `path` 504 505 This method returns `True` if `path` refers to a stream or file within 506 this collection, else `False`. 507 508 Arguments: 509 510 * path: str --- The path to check for existence within this collection 511 """ 512 return self.find(path) is not None 513 514 @must_be_writable 515 @synchronized 516 def remove(self, path: str, recursive: bool=False) -> None: 517 """Remove the file or stream at `path` 518 519 Arguments: 520 521 * path: str --- The path of the item to remove from the collection 522 523 * recursive: bool --- Controls the method's behavior if `path` refers 524 to a nonempty stream. If `False` (the default), this method raises 525 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 526 items under the stream. 527 """ 528 if not path: 529 raise errors.ArgumentError("Parameter 'path' is empty.") 530 531 pathcomponents = path.split("/", 1) 532 item = self._items.get(pathcomponents[0]) 533 if item is None: 534 raise IOError(errno.ENOENT, "File not found", path) 535 if len(pathcomponents) == 1: 536 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 537 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 538 deleteditem = self._items[pathcomponents[0]] 539 del self._items[pathcomponents[0]] 540 self.set_committed(False) 541 self.notify(DEL, self, pathcomponents[0], deleteditem) 542 else: 543 item.remove(pathcomponents[1], recursive=recursive) 544 545 def _clonefrom(self, source): 546 for k,v in source.items(): 547 self._items[k] = v.clone(self, k) 548 549 def clone(self): 550 raise NotImplementedError() 551 552 @must_be_writable 553 @synchronized 554 def add( 555 self, 556 source_obj: CollectionItem, 557 target_name: str, 558 overwrite: bool=False, 559 reparent: bool=False, 560 ) -> None: 561 """Copy or move a file or subcollection object to this collection 562 563 Arguments: 564 565 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 566 to add to this collection 567 568 * target_name: str --- The path inside this collection where 569 `source_obj` should be added. 570 571 * overwrite: bool --- Controls the behavior of this method when the 572 collection already contains an object at `target_name`. If `False` 573 (the default), this method will raise `FileExistsError`. If `True`, 574 the object at `target_name` will be replaced with `source_obj`. 575 576 * reparent: bool --- Controls whether this method copies or moves 577 `source_obj`. If `False` (the default), `source_obj` is copied into 578 this collection. If `True`, `source_obj` is moved into this 579 collection. 580 """ 581 if target_name in self and not overwrite: 582 raise IOError(errno.EEXIST, "File already exists", target_name) 583 584 modified_from = None 585 if target_name in self: 586 modified_from = self[target_name] 587 588 # Actually make the move or copy. 589 if reparent: 590 source_obj._reparent(self, target_name) 591 item = source_obj 592 else: 593 item = source_obj.clone(self, target_name) 594 595 self._items[target_name] = item 596 self.set_committed(False) 597 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 598 self.set_has_remote_blocks(True) 599 600 if modified_from: 601 self.notify(MOD, self, target_name, (modified_from, item)) 602 else: 603 self.notify(ADD, self, target_name, item) 604 605 def _get_src_target(self, source, target_path, source_collection, create_dest): 606 if source_collection is None: 607 source_collection = self 608 609 # Find the object 610 if isinstance(source, str): 611 source_obj = source_collection.find(source) 612 if source_obj is None: 613 raise IOError(errno.ENOENT, "File not found", source) 614 sourcecomponents = source.split("/") 615 else: 616 source_obj = source 617 sourcecomponents = None 618 619 # Find parent collection the target path 620 targetcomponents = target_path.split("/") 621 622 # Determine the name to use. 623 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 624 625 if not target_name: 626 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 627 628 if create_dest: 629 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 630 else: 631 if len(targetcomponents) > 1: 632 target_dir = self.find("/".join(targetcomponents[0:-1])) 633 else: 634 target_dir = self 635 636 if target_dir is None: 637 raise IOError(errno.ENOENT, "Target directory not found", target_name) 638 639 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 640 target_dir = target_dir[target_name] 641 target_name = sourcecomponents[-1] 642 643 return (source_obj, target_dir, target_name) 644 645 @must_be_writable 646 @synchronized 647 def copy( 648 self, 649 source: Union[str, CollectionItem], 650 target_path: str, 651 source_collection: Optional['RichCollectionBase']=None, 652 overwrite: bool=False, 653 ) -> None: 654 """Copy a file or subcollection object to this collection 655 656 Arguments: 657 658 * source: str | arvados.arvfile.ArvadosFile | 659 arvados.collection.Subcollection --- The file or subcollection to 660 add to this collection. If `source` is a str, the object will be 661 found by looking up this path from `source_collection` (see 662 below). 663 664 * target_path: str --- The path inside this collection where the 665 source object should be added. 666 667 * source_collection: arvados.collection.Collection | None --- The 668 collection to find the source object from when `source` is a 669 path. Defaults to the current collection (`self`). 670 671 * overwrite: bool --- Controls the behavior of this method when the 672 collection already contains an object at `target_path`. If `False` 673 (the default), this method will raise `FileExistsError`. If `True`, 674 the object at `target_path` will be replaced with `source_obj`. 675 """ 676 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 677 target_dir.add(source_obj, target_name, overwrite, False) 678 679 @must_be_writable 680 @synchronized 681 def rename( 682 self, 683 source: Union[str, CollectionItem], 684 target_path: str, 685 source_collection: Optional['RichCollectionBase']=None, 686 overwrite: bool=False, 687 ) -> None: 688 """Move a file or subcollection object to this collection 689 690 Arguments: 691 692 * source: str | arvados.arvfile.ArvadosFile | 693 arvados.collection.Subcollection --- The file or subcollection to 694 add to this collection. If `source` is a str, the object will be 695 found by looking up this path from `source_collection` (see 696 below). 697 698 * target_path: str --- The path inside this collection where the 699 source object should be added. 700 701 * source_collection: arvados.collection.Collection | None --- The 702 collection to find the source object from when `source` is a 703 path. Defaults to the current collection (`self`). 704 705 * overwrite: bool --- Controls the behavior of this method when the 706 collection already contains an object at `target_path`. If `False` 707 (the default), this method will raise `FileExistsError`. If `True`, 708 the object at `target_path` will be replaced with `source_obj`. 709 """ 710 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 711 if not source_obj.writable(): 712 raise IOError(errno.EROFS, "Source collection is read only", source) 713 target_dir.add(source_obj, target_name, overwrite, True) 714 715 def portable_manifest_text(self, stream_name: str=".") -> str: 716 """Get the portable manifest text for this collection 717 718 The portable manifest text is normalized, and does not include access 719 tokens. This method does not flush outstanding blocks to Keep. 720 721 Arguments: 722 723 * stream_name: str --- The name to use for this collection's stream in 724 the generated manifest. Default `'.'`. 725 """ 726 return self._get_manifest_text(stream_name, True, True) 727 728 @synchronized 729 def manifest_text( 730 self, 731 stream_name: str=".", 732 strip: bool=False, 733 normalize: bool=False, 734 only_committed: bool=False, 735 ) -> str: 736 """Get the manifest text for this collection 737 738 Arguments: 739 740 * stream_name: str --- The name to use for this collection's stream in 741 the generated manifest. Default `'.'`. 742 743 * strip: bool --- Controls whether or not the returned manifest text 744 includes access tokens. If `False` (the default), the manifest text 745 will include access tokens. If `True`, the manifest text will not 746 include access tokens. 747 748 * normalize: bool --- Controls whether or not the returned manifest 749 text is normalized. Default `False`. 750 751 * only_committed: bool --- Controls whether or not this method uploads 752 pending data to Keep before building and returning the manifest text. 753 If `False` (the default), this method will finish uploading all data 754 to Keep, then return the final manifest. If `True`, this method will 755 build and return a manifest that only refers to the data that has 756 finished uploading at the time this method was called. 757 """ 758 if not only_committed: 759 self._my_block_manager().commit_all() 760 return self._get_manifest_text(stream_name, strip, normalize, 761 only_committed=only_committed) 762 763 @synchronized 764 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 765 """Get the manifest text for this collection, sub collections and files. 766 767 :stream_name: 768 Name to use for this stream (directory) 769 770 :strip: 771 If True, remove signing tokens from block locators if present. 772 If False (default), block locators are left unchanged. 773 774 :normalize: 775 If True, always export the manifest text in normalized form 776 even if the Collection is not modified. If False (default) and the collection 777 is not modified, return the original manifest text even if it is not 778 in normalized form. 779 780 :only_committed: 781 If True, only include blocks that were already committed to Keep. 782 783 """ 784 785 if not self.committed() or self._manifest_text is None or normalize: 786 stream = {} 787 buf = [] 788 sorted_keys = sorted(self.keys()) 789 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 790 # Create a stream per file `k` 791 arvfile = self[filename] 792 filestream = [] 793 for segment in arvfile.segments(): 794 loc = segment.locator 795 if arvfile.parent._my_block_manager().is_bufferblock(loc): 796 if only_committed: 797 continue 798 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 799 if strip: 800 loc = KeepLocator(loc).stripped() 801 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 802 segment.segment_offset, segment.range_size)) 803 stream[filename] = filestream 804 if stream: 805 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 806 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 807 buf.append(self[dirname].manifest_text( 808 stream_name=os.path.join(stream_name, dirname), 809 strip=strip, normalize=True, only_committed=only_committed)) 810 return "".join(buf) 811 else: 812 if strip: 813 return self.stripped_manifest() 814 else: 815 return self._manifest_text 816 817 @synchronized 818 def _copy_remote_blocks(self, remote_blocks={}): 819 """Scan through the entire collection and ask Keep to copy remote blocks. 820 821 When accessing a remote collection, blocks will have a remote signature 822 (+R instead of +A). Collect these signatures and request Keep to copy the 823 blocks to the local cluster, returning local (+A) signatures. 824 825 :remote_blocks: 826 Shared cache of remote to local block mappings. This is used to avoid 827 doing extra work when blocks are shared by more than one file in 828 different subdirectories. 829 830 """ 831 for item in self: 832 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 833 return remote_blocks 834 835 @synchronized 836 def diff( 837 self, 838 end_collection: 'RichCollectionBase', 839 prefix: str=".", 840 holding_collection: Optional['Collection']=None, 841 ) -> ChangeList: 842 """Build a list of differences between this collection and another 843 844 Arguments: 845 846 * end_collection: arvados.collection.RichCollectionBase --- A 847 collection object with the desired end state. The returned diff 848 list will describe how to go from the current collection object 849 `self` to `end_collection`. 850 851 * prefix: str --- The name to use for this collection's stream in 852 the diff list. Default `'.'`. 853 854 * holding_collection: arvados.collection.Collection | None --- A 855 collection object used to hold objects for the returned diff 856 list. By default, a new empty collection is created. 857 """ 858 changes = [] 859 if holding_collection is None: 860 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 861 for k in self: 862 if k not in end_collection: 863 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 864 for k in end_collection: 865 if k in self: 866 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 867 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 868 elif end_collection[k] != self[k]: 869 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 870 else: 871 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 872 else: 873 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 874 return changes 875 876 @must_be_writable 877 @synchronized 878 def apply(self, changes: ChangeList) -> None: 879 """Apply a list of changes from to this collection 880 881 This method takes a list of changes generated by 882 `RichCollectionBase.diff` and applies it to this 883 collection. Afterward, the state of this collection object will 884 match the state of `end_collection` passed to `diff`. If a change 885 conflicts with a local change, it will be saved to an alternate path 886 indicating the conflict. 887 888 Arguments: 889 890 * changes: arvados.collection.ChangeList --- The list of differences 891 generated by `RichCollectionBase.diff`. 892 """ 893 if changes: 894 self.set_committed(False) 895 for change in changes: 896 event_type = change[0] 897 path = change[1] 898 initial = change[2] 899 local = self.find(path) 900 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 901 time.gmtime())) 902 if event_type == ADD: 903 if local is None: 904 # No local file at path, safe to copy over new file 905 self.copy(initial, path) 906 elif local is not None and local != initial: 907 # There is already local file and it is different: 908 # save change to conflict file. 909 self.copy(initial, conflictpath) 910 elif event_type == MOD or event_type == TOK: 911 final = change[3] 912 if local == initial: 913 # Local matches the "initial" item so it has not 914 # changed locally and is safe to update. 915 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 916 # Replace contents of local file with new contents 917 local.replace_contents(final) 918 else: 919 # Overwrite path with new item; this can happen if 920 # path was a file and is now a collection or vice versa 921 self.copy(final, path, overwrite=True) 922 else: 923 # Local is missing (presumably deleted) or local doesn't 924 # match the "start" value, so save change to conflict file 925 self.copy(final, conflictpath) 926 elif event_type == DEL: 927 if local == initial: 928 # Local item matches "initial" value, so it is safe to remove. 929 self.remove(path, recursive=True) 930 # else, the file is modified or already removed, in either 931 # case we don't want to try to remove it. 932 933 def portable_data_hash(self) -> str: 934 """Get the portable data hash for this collection's manifest""" 935 if self._manifest_locator and self.committed(): 936 # If the collection is already saved on the API server, and it's committed 937 # then return API server's PDH response. 938 return self._portable_data_hash 939 else: 940 stripped = self.portable_manifest_text().encode() 941 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 942 943 @synchronized 944 def subscribe(self, callback: ChangeCallback) -> None: 945 """Set a notify callback for changes to this collection 946 947 Arguments: 948 949 * callback: arvados.collection.ChangeCallback --- The callable to 950 call each time the collection is changed. 951 """ 952 if self._callback is None: 953 self._callback = callback 954 else: 955 raise errors.ArgumentError("A callback is already set on this collection.") 956 957 @synchronized 958 def unsubscribe(self) -> None: 959 """Remove any notify callback set for changes to this collection""" 960 if self._callback is not None: 961 self._callback = None 962 963 @synchronized 964 def notify( 965 self, 966 event: ChangeType, 967 collection: 'RichCollectionBase', 968 name: str, 969 item: CollectionItem, 970 ) -> None: 971 """Notify any subscribed callback about a change to this collection 972 973 .. ATTENTION:: Internal 974 This method is only meant to be used by other Collection methods. 975 976 If a callback has been registered with `RichCollectionBase.subscribe`, 977 it will be called with information about a change to this collection. 978 Then this notification will be propagated to this collection's root. 979 980 Arguments: 981 982 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 983 the collection. 984 985 * collection: arvados.collection.RichCollectionBase --- The 986 collection that was modified. 987 988 * name: str --- The name of the file or stream within `collection` that 989 was modified. 990 991 * item: arvados.arvfile.ArvadosFile | 992 arvados.collection.Subcollection --- The new contents at `name` 993 within `collection`. 994 """ 995 if self._callback: 996 self._callback(event, collection, name, item) 997 self.root_collection().notify(event, collection, name, item) 998 999 @synchronized 1000 def __eq__(self, other: Any) -> bool: 1001 """Indicate whether this collection object is equal to another""" 1002 if other is self: 1003 return True 1004 if not isinstance(other, RichCollectionBase): 1005 return False 1006 if len(self._items) != len(other): 1007 return False 1008 for k in self._items: 1009 if k not in other: 1010 return False 1011 if self._items[k] != other[k]: 1012 return False 1013 return True 1014 1015 def __ne__(self, other: Any) -> bool: 1016 """Indicate whether this collection object is not equal to another""" 1017 return not self.__eq__(other) 1018 1019 @synchronized 1020 def flush(self) -> None: 1021 """Upload any pending data to Keep""" 1022 for e in self.values(): 1023 e.flush() 1024 1025 1026class Collection(RichCollectionBase): 1027 """Read and manipulate an Arvados collection 1028 1029 This class provides a high-level interface to create, read, and update 1030 Arvados collections and their contents. Refer to the Arvados Python SDK 1031 cookbook for [an introduction to using the Collection class][cookbook]. 1032 1033 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1034 """ 1035 1036 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1037 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1038 keep_client: Optional['arvados.keep.KeepClient']=None, 1039 num_retries: int=10, 1040 parent: Optional['Collection']=None, 1041 apiconfig: Optional[Mapping[str, str]]=None, 1042 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1043 replication_desired: Optional[int]=None, 1044 storage_classes_desired: Optional[List[str]]=None, 1045 put_threads: Optional[int]=None): 1046 """Initialize a Collection object 1047 1048 Arguments: 1049 1050 * manifest_locator_or_text: str | None --- This string can contain a 1051 collection manifest text, portable data hash, or UUID. When given a 1052 portable data hash or UUID, this instance will load a collection 1053 record from the API server. Otherwise, this instance will represent a 1054 new collection without an API server record. The default value `None` 1055 instantiates a new collection with an empty manifest. 1056 1057 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1058 Arvados API client object this instance uses to make requests. If 1059 none is given, this instance creates its own client using the 1060 settings from `apiconfig` (see below). If your client instantiates 1061 many Collection objects, you can help limit memory utilization by 1062 calling `arvados.api.api` to construct an 1063 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1064 for every Collection. 1065 1066 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1067 object this instance uses to make requests. If none is given, this 1068 instance creates its own client using its `api_client`. 1069 1070 * num_retries: int --- The number of times that client requests are 1071 retried. Default 10. 1072 1073 * parent: arvados.collection.Collection | None --- The parent Collection 1074 object of this instance, if any. This argument is primarily used by 1075 other Collection methods; user client code shouldn't need to use it. 1076 1077 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1078 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1079 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1080 Collection object constructs one from these settings. If no 1081 mapping is provided, calls `arvados.config.settings` to get these 1082 parameters from user configuration. 1083 1084 * block_manager: arvados.arvfile._BlockManager | None --- The 1085 _BlockManager object used by this instance to coordinate reading 1086 and writing Keep data blocks. If none is given, this instance 1087 constructs its own. This argument is primarily used by other 1088 Collection methods; user client code shouldn't need to use it. 1089 1090 * replication_desired: int | None --- This controls both the value of 1091 the `replication_desired` field on API collection records saved by 1092 this class, as well as the number of Keep services that the object 1093 writes new data blocks to. If none is given, uses the default value 1094 configured for the cluster. 1095 1096 * storage_classes_desired: list[str] | None --- This controls both 1097 the value of the `storage_classes_desired` field on API collection 1098 records saved by this class, as well as selecting which specific 1099 Keep services the object writes new data blocks to. If none is 1100 given, defaults to an empty list. 1101 1102 * put_threads: int | None --- The number of threads to run 1103 simultaneously to upload data blocks to Keep. This value is used when 1104 building a new `block_manager`. It is unused when a `block_manager` 1105 is provided. 1106 """ 1107 1108 if storage_classes_desired and type(storage_classes_desired) is not list: 1109 raise errors.ArgumentError("storage_classes_desired must be list type.") 1110 1111 super(Collection, self).__init__(parent) 1112 self._api_client = api_client 1113 self._keep_client = keep_client 1114 1115 # Use the keep client from ThreadSafeApiCache 1116 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1117 self._keep_client = self._api_client.keep 1118 1119 self._block_manager = block_manager 1120 self.replication_desired = replication_desired 1121 self._storage_classes_desired = storage_classes_desired 1122 self.put_threads = put_threads 1123 1124 if apiconfig: 1125 self._config = apiconfig 1126 else: 1127 self._config = config.settings() 1128 1129 self.num_retries = num_retries 1130 self._manifest_locator = None 1131 self._manifest_text = None 1132 self._portable_data_hash = None 1133 self._api_response = None 1134 self._past_versions = set() 1135 1136 self.lock = threading.RLock() 1137 self.events = None 1138 1139 if manifest_locator_or_text: 1140 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1141 self._manifest_locator = manifest_locator_or_text 1142 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1143 self._manifest_locator = manifest_locator_or_text 1144 if not self._has_local_collection_uuid(): 1145 self._has_remote_blocks = True 1146 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1147 self._manifest_text = manifest_locator_or_text 1148 if '+R' in self._manifest_text: 1149 self._has_remote_blocks = True 1150 else: 1151 raise errors.ArgumentError( 1152 "Argument to CollectionReader is not a manifest or a collection UUID") 1153 1154 try: 1155 self._populate() 1156 except errors.SyntaxError as e: 1157 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1158 1159 def storage_classes_desired(self) -> List[str]: 1160 """Get this collection's `storage_classes_desired` value""" 1161 return self._storage_classes_desired or [] 1162 1163 def root_collection(self) -> 'Collection': 1164 return self 1165 1166 def get_properties(self) -> Properties: 1167 """Get this collection's properties 1168 1169 This method always returns a dict. If this collection object does not 1170 have an associated API record, or that record does not have any 1171 properties set, this method returns an empty dict. 1172 """ 1173 if self._api_response and self._api_response["properties"]: 1174 return self._api_response["properties"] 1175 else: 1176 return {} 1177 1178 def get_trash_at(self) -> Optional[datetime.datetime]: 1179 """Get this collection's `trash_at` field 1180 1181 This method parses the `trash_at` field of the collection's API 1182 record and returns a datetime from it. If that field is not set, or 1183 this collection object does not have an associated API record, 1184 returns None. 1185 """ 1186 if self._api_response and self._api_response["trash_at"]: 1187 try: 1188 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1189 except ValueError: 1190 return None 1191 else: 1192 return None 1193 1194 def stream_name(self) -> str: 1195 return "." 1196 1197 def writable(self) -> bool: 1198 return True 1199 1200 @synchronized 1201 def known_past_version( 1202 self, 1203 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1204 ) -> bool: 1205 """Indicate whether an API record for this collection has been seen before 1206 1207 As this collection object loads records from the API server, it records 1208 their `modified_at` and `portable_data_hash` fields. This method accepts 1209 a 2-tuple with values for those fields, and returns `True` if the 1210 combination was previously loaded. 1211 """ 1212 return modified_at_and_portable_data_hash in self._past_versions 1213 1214 @synchronized 1215 @retry_method 1216 def update( 1217 self, 1218 other: Optional['Collection']=None, 1219 num_retries: Optional[int]=None, 1220 ) -> None: 1221 """Merge another collection's contents into this one 1222 1223 This method compares the manifest of this collection instance with 1224 another, then updates this instance's manifest with changes from the 1225 other, renaming files to flag conflicts where necessary. 1226 1227 When called without any arguments, this method reloads the collection's 1228 API record, and updates this instance with any changes that have 1229 appeared server-side. If this instance does not have a corresponding 1230 API record, this method raises `arvados.errors.ArgumentError`. 1231 1232 Arguments: 1233 1234 * other: arvados.collection.Collection | None --- The collection 1235 whose contents should be merged into this instance. When not 1236 provided, this method reloads this collection's API record and 1237 constructs a Collection object from it. If this instance does not 1238 have a corresponding API record, this method raises 1239 `arvados.errors.ArgumentError`. 1240 1241 * num_retries: int | None --- The number of times to retry reloading 1242 the collection's API record from the API server. If not specified, 1243 uses the `num_retries` provided when this instance was constructed. 1244 """ 1245 if other is None: 1246 if self._manifest_locator is None: 1247 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1248 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1249 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1250 response.get("portable_data_hash") != self.portable_data_hash()): 1251 # The record on the server is different from our current one, but we've seen it before, 1252 # so ignore it because it's already been merged. 1253 # However, if it's the same as our current record, proceed with the update, because we want to update 1254 # our tokens. 1255 return 1256 else: 1257 self._remember_api_response(response) 1258 other = CollectionReader(response["manifest_text"]) 1259 baseline = CollectionReader(self._manifest_text) 1260 self.apply(baseline.diff(other)) 1261 self._manifest_text = self.manifest_text() 1262 1263 @synchronized 1264 def _my_api(self): 1265 if self._api_client is None: 1266 self._api_client = ThreadSafeApiCache(self._config, version='v1') 1267 if self._keep_client is None: 1268 self._keep_client = self._api_client.keep 1269 return self._api_client 1270 1271 @synchronized 1272 def _my_keep(self): 1273 if self._keep_client is None: 1274 if self._api_client is None: 1275 self._my_api() 1276 else: 1277 self._keep_client = KeepClient(api_client=self._api_client) 1278 return self._keep_client 1279 1280 @synchronized 1281 def _my_block_manager(self): 1282 if self._block_manager is None: 1283 copies = (self.replication_desired or 1284 self._my_api()._rootDesc.get('defaultCollectionReplication', 1285 2)) 1286 self._block_manager = _BlockManager(self._my_keep(), 1287 copies=copies, 1288 put_threads=self.put_threads, 1289 num_retries=self.num_retries, 1290 storage_classes_func=self.storage_classes_desired) 1291 return self._block_manager 1292 1293 def _remember_api_response(self, response): 1294 self._api_response = response 1295 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1296 1297 def _populate_from_api_server(self): 1298 # As in KeepClient itself, we must wait until the last 1299 # possible moment to instantiate an API client, in order to 1300 # avoid tripping up clients that don't have access to an API 1301 # server. If we do build one, make sure our Keep client uses 1302 # it. If instantiation fails, we'll fall back to the except 1303 # clause, just like any other Collection lookup 1304 # failure. Return an exception, or None if successful. 1305 self._remember_api_response(self._my_api().collections().get( 1306 uuid=self._manifest_locator).execute( 1307 num_retries=self.num_retries)) 1308 self._manifest_text = self._api_response['manifest_text'] 1309 self._portable_data_hash = self._api_response['portable_data_hash'] 1310 # If not overriden via kwargs, we should try to load the 1311 # replication_desired and storage_classes_desired from the API server 1312 if self.replication_desired is None: 1313 self.replication_desired = self._api_response.get('replication_desired', None) 1314 if self._storage_classes_desired is None: 1315 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1316 1317 def _populate(self): 1318 if self._manifest_text is None: 1319 if self._manifest_locator is None: 1320 return 1321 else: 1322 self._populate_from_api_server() 1323 self._baseline_manifest = self._manifest_text 1324 self._import_manifest(self._manifest_text) 1325 1326 def _has_collection_uuid(self): 1327 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1328 1329 def _has_local_collection_uuid(self): 1330 return self._has_collection_uuid and \ 1331 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1332 1333 def __enter__(self): 1334 return self 1335 1336 def __exit__(self, exc_type, exc_value, traceback): 1337 """Exit a context with this collection instance 1338 1339 If no exception was raised inside the context block, and this 1340 collection is writable and has a corresponding API record, that 1341 record will be updated to match the state of this instance at the end 1342 of the block. 1343 """ 1344 if exc_type is None: 1345 if self.writable() and self._has_collection_uuid(): 1346 self.save() 1347 self.stop_threads() 1348 1349 def stop_threads(self) -> None: 1350 """Stop background Keep upload/download threads""" 1351 if self._block_manager is not None: 1352 self._block_manager.stop_threads() 1353 1354 @synchronized 1355 def manifest_locator(self) -> Optional[str]: 1356 """Get this collection's manifest locator, if any 1357 1358 * If this collection instance is associated with an API record with a 1359 UUID, return that. 1360 * Otherwise, if this collection instance was loaded from an API record 1361 by portable data hash, return that. 1362 * Otherwise, return `None`. 1363 """ 1364 return self._manifest_locator 1365 1366 @synchronized 1367 def clone( 1368 self, 1369 new_parent: Optional['Collection']=None, 1370 new_name: Optional[str]=None, 1371 readonly: bool=False, 1372 new_config: Optional[Mapping[str, str]]=None, 1373 ) -> 'Collection': 1374 """Create a Collection object with the same contents as this instance 1375 1376 This method creates a new Collection object with contents that match 1377 this instance's. The new collection will not be associated with any API 1378 record. 1379 1380 Arguments: 1381 1382 * new_parent: arvados.collection.Collection | None --- This value is 1383 passed to the new Collection's constructor as the `parent` 1384 argument. 1385 1386 * new_name: str | None --- This value is unused. 1387 1388 * readonly: bool --- If this value is true, this method constructs and 1389 returns a `CollectionReader`. Otherwise, it returns a mutable 1390 `Collection`. Default `False`. 1391 1392 * new_config: Mapping[str, str] | None --- This value is passed to the 1393 new Collection's constructor as `apiconfig`. If no value is provided, 1394 defaults to the configuration passed to this instance's constructor. 1395 """ 1396 if new_config is None: 1397 new_config = self._config 1398 if readonly: 1399 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1400 else: 1401 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1402 1403 newcollection._clonefrom(self) 1404 return newcollection 1405 1406 @synchronized 1407 def api_response(self) -> Optional[Dict[str, Any]]: 1408 """Get this instance's associated API record 1409 1410 If this Collection instance has an associated API record, return it. 1411 Otherwise, return `None`. 1412 """ 1413 return self._api_response 1414 1415 def find_or_create( 1416 self, 1417 path: str, 1418 create_type: CreateType, 1419 ) -> CollectionItem: 1420 if path == ".": 1421 return self 1422 else: 1423 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1424 1425 def find(self, path: str) -> CollectionItem: 1426 if path == ".": 1427 return self 1428 else: 1429 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1430 1431 def remove(self, path: str, recursive: bool=False) -> None: 1432 if path == ".": 1433 raise errors.ArgumentError("Cannot remove '.'") 1434 else: 1435 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1436 1437 @must_be_writable 1438 @synchronized 1439 @retry_method 1440 def save( 1441 self, 1442 properties: Optional[Properties]=None, 1443 storage_classes: Optional[StorageClasses]=None, 1444 trash_at: Optional[datetime.datetime]=None, 1445 merge: bool=True, 1446 num_retries: Optional[int]=None, 1447 preserve_version: bool=False, 1448 ) -> str: 1449 """Save collection to an existing API record 1450 1451 This method updates the instance's corresponding API record to match 1452 the instance's state. If this instance does not have a corresponding API 1453 record yet, raises `AssertionError`. (To create a new API record, use 1454 `Collection.save_new`.) This method returns the saved collection 1455 manifest. 1456 1457 Arguments: 1458 1459 * properties: dict[str, Any] | None --- If provided, the API record will 1460 be updated with these properties. Note this will completely replace 1461 any existing properties. 1462 1463 * storage_classes: list[str] | None --- If provided, the API record will 1464 be updated with this value in the `storage_classes_desired` field. 1465 This value will also be saved on the instance and used for any 1466 changes that follow. 1467 1468 * trash_at: datetime.datetime | None --- If provided, the API record 1469 will be updated with this value in the `trash_at` field. 1470 1471 * merge: bool --- If `True` (the default), this method will first 1472 reload this collection's API record, and merge any new contents into 1473 this instance before saving changes. See `Collection.update` for 1474 details. 1475 1476 * num_retries: int | None --- The number of times to retry reloading 1477 the collection's API record from the API server. If not specified, 1478 uses the `num_retries` provided when this instance was constructed. 1479 1480 * preserve_version: bool --- This value will be passed to directly 1481 to the underlying API call. If `True`, the Arvados API will 1482 preserve the versions of this collection both immediately before 1483 and after the update. If `True` when the API server is not 1484 configured with collection versioning, this method raises 1485 `arvados.errors.ArgumentError`. 1486 """ 1487 if properties and type(properties) is not dict: 1488 raise errors.ArgumentError("properties must be dictionary type.") 1489 1490 if storage_classes and type(storage_classes) is not list: 1491 raise errors.ArgumentError("storage_classes must be list type.") 1492 if storage_classes: 1493 self._storage_classes_desired = storage_classes 1494 1495 if trash_at and type(trash_at) is not datetime.datetime: 1496 raise errors.ArgumentError("trash_at must be datetime type.") 1497 1498 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1499 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1500 1501 body={} 1502 if properties: 1503 body["properties"] = properties 1504 if self.storage_classes_desired(): 1505 body["storage_classes_desired"] = self.storage_classes_desired() 1506 if trash_at: 1507 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1508 body["trash_at"] = t 1509 if preserve_version: 1510 body["preserve_version"] = preserve_version 1511 1512 if not self.committed(): 1513 if self._has_remote_blocks: 1514 # Copy any remote blocks to the local cluster. 1515 self._copy_remote_blocks(remote_blocks={}) 1516 self._has_remote_blocks = False 1517 if not self._has_collection_uuid(): 1518 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1519 elif not self._has_local_collection_uuid(): 1520 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1521 1522 self._my_block_manager().commit_all() 1523 1524 if merge: 1525 self.update() 1526 1527 text = self.manifest_text(strip=False) 1528 body['manifest_text'] = text 1529 1530 self._remember_api_response(self._my_api().collections().update( 1531 uuid=self._manifest_locator, 1532 body=body 1533 ).execute(num_retries=num_retries)) 1534 self._manifest_text = self._api_response["manifest_text"] 1535 self._portable_data_hash = self._api_response["portable_data_hash"] 1536 self.set_committed(True) 1537 elif body: 1538 self._remember_api_response(self._my_api().collections().update( 1539 uuid=self._manifest_locator, 1540 body=body 1541 ).execute(num_retries=num_retries)) 1542 1543 return self._manifest_text 1544 1545 1546 @must_be_writable 1547 @synchronized 1548 @retry_method 1549 def save_new( 1550 self, 1551 name: Optional[str]=None, 1552 create_collection_record: bool=True, 1553 owner_uuid: Optional[str]=None, 1554 properties: Optional[Properties]=None, 1555 storage_classes: Optional[StorageClasses]=None, 1556 trash_at: Optional[datetime.datetime]=None, 1557 ensure_unique_name: bool=False, 1558 num_retries: Optional[int]=None, 1559 preserve_version: bool=False, 1560 ): 1561 """Save collection to a new API record 1562 1563 This method finishes uploading new data blocks and (optionally) 1564 creates a new API collection record with the provided data. If a new 1565 record is created, this instance becomes associated with that record 1566 for future updates like `save()`. This method returns the saved 1567 collection manifest. 1568 1569 Arguments: 1570 1571 * name: str | None --- The `name` field to use on the new collection 1572 record. If not specified, a generic default name is generated. 1573 1574 * create_collection_record: bool --- If `True` (the default), creates a 1575 collection record on the API server. If `False`, the method finishes 1576 all data uploads and only returns the resulting collection manifest 1577 without sending it to the API server. 1578 1579 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1580 new collection record. 1581 1582 * properties: dict[str, Any] | None --- The `properties` field to use on 1583 the new collection record. 1584 1585 * storage_classes: list[str] | None --- The 1586 `storage_classes_desired` field to use on the new collection record. 1587 1588 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1589 on the new collection record. 1590 1591 * ensure_unique_name: bool --- This value is passed directly to the 1592 Arvados API when creating the collection record. If `True`, the API 1593 server may modify the submitted `name` to ensure the collection's 1594 `name`+`owner_uuid` combination is unique. If `False` (the default), 1595 if a collection already exists with this same `name`+`owner_uuid` 1596 combination, creating a collection record will raise a validation 1597 error. 1598 1599 * num_retries: int | None --- The number of times to retry reloading 1600 the collection's API record from the API server. If not specified, 1601 uses the `num_retries` provided when this instance was constructed. 1602 1603 * preserve_version: bool --- This value will be passed to directly 1604 to the underlying API call. If `True`, the Arvados API will 1605 preserve the versions of this collection both immediately before 1606 and after the update. If `True` when the API server is not 1607 configured with collection versioning, this method raises 1608 `arvados.errors.ArgumentError`. 1609 """ 1610 if properties and type(properties) is not dict: 1611 raise errors.ArgumentError("properties must be dictionary type.") 1612 1613 if storage_classes and type(storage_classes) is not list: 1614 raise errors.ArgumentError("storage_classes must be list type.") 1615 1616 if trash_at and type(trash_at) is not datetime.datetime: 1617 raise errors.ArgumentError("trash_at must be datetime type.") 1618 1619 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1620 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1621 1622 if self._has_remote_blocks: 1623 # Copy any remote blocks to the local cluster. 1624 self._copy_remote_blocks(remote_blocks={}) 1625 self._has_remote_blocks = False 1626 1627 if storage_classes: 1628 self._storage_classes_desired = storage_classes 1629 1630 self._my_block_manager().commit_all() 1631 text = self.manifest_text(strip=False) 1632 1633 if create_collection_record: 1634 if name is None: 1635 name = "New collection" 1636 ensure_unique_name = True 1637 1638 body = {"manifest_text": text, 1639 "name": name, 1640 "replication_desired": self.replication_desired} 1641 if owner_uuid: 1642 body["owner_uuid"] = owner_uuid 1643 if properties: 1644 body["properties"] = properties 1645 if self.storage_classes_desired(): 1646 body["storage_classes_desired"] = self.storage_classes_desired() 1647 if trash_at: 1648 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1649 body["trash_at"] = t 1650 if preserve_version: 1651 body["preserve_version"] = preserve_version 1652 1653 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1654 text = self._api_response["manifest_text"] 1655 1656 self._manifest_locator = self._api_response["uuid"] 1657 self._portable_data_hash = self._api_response["portable_data_hash"] 1658 1659 self._manifest_text = text 1660 self.set_committed(True) 1661 1662 return text 1663 1664 _token_re = re.compile(r'(\S+)(\s+|$)') 1665 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1666 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1667 1668 def _unescape_manifest_path(self, path): 1669 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1670 1671 @synchronized 1672 def _import_manifest(self, manifest_text): 1673 """Import a manifest into a `Collection`. 1674 1675 :manifest_text: 1676 The manifest text to import from. 1677 1678 """ 1679 if len(self) > 0: 1680 raise ArgumentError("Can only import manifest into an empty collection") 1681 1682 STREAM_NAME = 0 1683 BLOCKS = 1 1684 SEGMENTS = 2 1685 1686 stream_name = None 1687 state = STREAM_NAME 1688 1689 for token_and_separator in self._token_re.finditer(manifest_text): 1690 tok = token_and_separator.group(1) 1691 sep = token_and_separator.group(2) 1692 1693 if state == STREAM_NAME: 1694 # starting a new stream 1695 stream_name = self._unescape_manifest_path(tok) 1696 blocks = [] 1697 segments = [] 1698 streamoffset = 0 1699 state = BLOCKS 1700 self.find_or_create(stream_name, COLLECTION) 1701 continue 1702 1703 if state == BLOCKS: 1704 block_locator = self._block_re.match(tok) 1705 if block_locator: 1706 blocksize = int(block_locator.group(1)) 1707 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1708 streamoffset += blocksize 1709 else: 1710 state = SEGMENTS 1711 1712 if state == SEGMENTS: 1713 file_segment = self._segment_re.match(tok) 1714 if file_segment: 1715 pos = int(file_segment.group(1)) 1716 size = int(file_segment.group(2)) 1717 name = self._unescape_manifest_path(file_segment.group(3)) 1718 if name.split('/')[-1] == '.': 1719 # placeholder for persisting an empty directory, not a real file 1720 if len(name) > 2: 1721 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1722 else: 1723 filepath = os.path.join(stream_name, name) 1724 try: 1725 afile = self.find_or_create(filepath, FILE) 1726 except IOError as e: 1727 if e.errno == errno.ENOTDIR: 1728 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1729 else: 1730 raise e from None 1731 if isinstance(afile, ArvadosFile): 1732 afile.add_segment(blocks, pos, size) 1733 else: 1734 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1735 else: 1736 # error! 1737 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1738 1739 if sep == "\n": 1740 stream_name = None 1741 state = STREAM_NAME 1742 1743 self.set_committed(True) 1744 1745 @synchronized 1746 def notify( 1747 self, 1748 event: ChangeType, 1749 collection: 'RichCollectionBase', 1750 name: str, 1751 item: CollectionItem, 1752 ) -> None: 1753 if self._callback: 1754 self._callback(event, collection, name, item) 1755 1756 1757class Subcollection(RichCollectionBase): 1758 """Read and manipulate a stream/directory within an Arvados collection 1759 1760 This class represents a single stream (like a directory) within an Arvados 1761 `Collection`. It is returned by `Collection.find` and provides the same API. 1762 Operations that work on the API collection record propagate to the parent 1763 `Collection` object. 1764 """ 1765 1766 def __init__(self, parent, name): 1767 super(Subcollection, self).__init__(parent) 1768 self.lock = self.root_collection().lock 1769 self._manifest_text = None 1770 self.name = name 1771 self.num_retries = parent.num_retries 1772 1773 def root_collection(self) -> 'Collection': 1774 return self.parent.root_collection() 1775 1776 def writable(self) -> bool: 1777 return self.root_collection().writable() 1778 1779 def _my_api(self): 1780 return self.root_collection()._my_api() 1781 1782 def _my_keep(self): 1783 return self.root_collection()._my_keep() 1784 1785 def _my_block_manager(self): 1786 return self.root_collection()._my_block_manager() 1787 1788 def stream_name(self) -> str: 1789 return os.path.join(self.parent.stream_name(), self.name) 1790 1791 @synchronized 1792 def clone( 1793 self, 1794 new_parent: Optional['Collection']=None, 1795 new_name: Optional[str]=None, 1796 ) -> 'Subcollection': 1797 c = Subcollection(new_parent, new_name) 1798 c._clonefrom(self) 1799 return c 1800 1801 @must_be_writable 1802 @synchronized 1803 def _reparent(self, newparent, newname): 1804 self.set_committed(False) 1805 self.flush() 1806 self.parent.remove(self.name, recursive=True) 1807 self.parent = newparent 1808 self.name = newname 1809 self.lock = self.parent.root_collection().lock 1810 1811 @synchronized 1812 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1813 """Encode empty directories by using an \056-named (".") empty file""" 1814 if len(self._items) == 0: 1815 return "%s %s 0:0:\\056\n" % ( 1816 escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1817 return super(Subcollection, self)._get_manifest_text(stream_name, 1818 strip, normalize, 1819 only_committed) 1820 1821 1822class CollectionReader(Collection): 1823 """Read-only `Collection` subclass 1824 1825 This class will never create or update any API collection records. You can 1826 use this class for additional code safety when you only need to read 1827 existing collections. 1828 """ 1829 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1830 self._in_init = True 1831 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1832 self._in_init = False 1833 1834 # Forego any locking since it should never change once initialized. 1835 self.lock = NoopLock() 1836 1837 # Backwards compatability with old CollectionReader 1838 # all_streams() and all_files() 1839 self._streams = None 1840 1841 def writable(self) -> bool: 1842 return self._in_init
Argument value for Collection
methods to represent an added item
Argument value for Collection
methods to represent a removed item
Argument value for Collection
methods to represent a modified item
Argument value for Collection
methods to represent an item with token differences
create_type
value for Collection.find_or_create
create_type
value for Collection.find_or_create
87class CollectionBase(object): 88 """Abstract base class for Collection classes 89 90 .. ATTENTION:: Internal 91 This class is meant to be used by other parts of the SDK. User code 92 should instantiate or subclass `Collection` or one of its subclasses 93 directly. 94 """ 95 96 def __enter__(self): 97 """Enter a context block with this collection instance""" 98 return self 99 100 def __exit__(self, exc_type, exc_value, traceback): 101 """Exit a context block with this collection instance""" 102 pass 103 104 def _my_keep(self): 105 if self._keep_client is None: 106 self._keep_client = KeepClient(api_client=self._api_client, 107 num_retries=self.num_retries) 108 return self._keep_client 109 110 def stripped_manifest(self) -> str: 111 """Create a copy of the collection manifest with only size hints 112 113 This method returns a string with the current collection's manifest 114 text with all non-portable locator hints like permission hints and 115 remote cluster hints removed. The only hints in the returned manifest 116 will be size hints. 117 """ 118 raw = self.manifest_text() 119 clean = [] 120 for line in raw.split("\n"): 121 fields = line.split() 122 if fields: 123 clean_fields = fields[:1] + [ 124 (re.sub(r'\+[^\d][^\+]*', '', x) 125 if re.match(arvados.util.keep_locator_pattern, x) 126 else x) 127 for x in fields[1:]] 128 clean += [' '.join(clean_fields), "\n"] 129 return ''.join(clean)
Abstract base class for Collection classes
110 def stripped_manifest(self) -> str: 111 """Create a copy of the collection manifest with only size hints 112 113 This method returns a string with the current collection's manifest 114 text with all non-portable locator hints like permission hints and 115 remote cluster hints removed. The only hints in the returned manifest 116 will be size hints. 117 """ 118 raw = self.manifest_text() 119 clean = [] 120 for line in raw.split("\n"): 121 fields = line.split() 122 if fields: 123 clean_fields = fields[:1] + [ 124 (re.sub(r'\+[^\d][^\+]*', '', x) 125 if re.match(arvados.util.keep_locator_pattern, x) 126 else x) 127 for x in fields[1:]] 128 clean += [' '.join(clean_fields), "\n"] 129 return ''.join(clean)
Create a copy of the collection manifest with only size hints
This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.
155class RichCollectionBase(CollectionBase): 156 """Base class for Collection classes 157 158 .. ATTENTION:: Internal 159 This class is meant to be used by other parts of the SDK. User code 160 should instantiate or subclass `Collection` or one of its subclasses 161 directly. 162 """ 163 164 def __init__(self, parent=None): 165 self.parent = parent 166 self._committed = False 167 self._has_remote_blocks = False 168 self._callback = None 169 self._items = {} 170 171 def _my_api(self): 172 raise NotImplementedError() 173 174 def _my_keep(self): 175 raise NotImplementedError() 176 177 def _my_block_manager(self): 178 raise NotImplementedError() 179 180 def writable(self) -> bool: 181 """Indicate whether this collection object can be modified 182 183 This method returns `False` if this object is a `CollectionReader`, 184 else `True`. 185 """ 186 raise NotImplementedError() 187 188 def root_collection(self) -> 'Collection': 189 """Get this collection's root collection object 190 191 If you open a subcollection with `Collection.find`, calling this method 192 on that subcollection returns the source Collection object. 193 """ 194 raise NotImplementedError() 195 196 def stream_name(self) -> str: 197 """Get the name of the manifest stream represented by this collection 198 199 If you open a subcollection with `Collection.find`, calling this method 200 on that subcollection returns the name of the stream you opened. 201 """ 202 raise NotImplementedError() 203 204 @synchronized 205 def has_remote_blocks(self) -> bool: 206 """Indiciate whether the collection refers to remote data 207 208 Returns `True` if the collection manifest includes any Keep locators 209 with a remote hint (`+R`), else `False`. 210 """ 211 if self._has_remote_blocks: 212 return True 213 for item in self: 214 if self[item].has_remote_blocks(): 215 return True 216 return False 217 218 @synchronized 219 def set_has_remote_blocks(self, val: bool) -> None: 220 """Cache whether this collection refers to remote blocks 221 222 .. ATTENTION:: Internal 223 This method is only meant to be used by other Collection methods. 224 225 Set this collection's cached "has remote blocks" flag to the given 226 value. 227 """ 228 self._has_remote_blocks = val 229 if self.parent: 230 self.parent.set_has_remote_blocks(val) 231 232 @must_be_writable 233 @synchronized 234 def find_or_create( 235 self, 236 path: str, 237 create_type: CreateType, 238 ) -> CollectionItem: 239 """Get the item at the given path, creating it if necessary 240 241 If `path` refers to a stream in this collection, returns a 242 corresponding `Subcollection` object. If `path` refers to a file in 243 this collection, returns a corresponding 244 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 245 this collection, then this method creates a new object and returns 246 it, creating parent streams as needed. The type of object created is 247 determined by the value of `create_type`. 248 249 Arguments: 250 251 * path: str --- The path to find or create within this collection. 252 253 * create_type: Literal[COLLECTION, FILE] --- The type of object to 254 create at `path` if one does not exist. Passing `COLLECTION` 255 creates a stream and returns the corresponding 256 `Subcollection`. Passing `FILE` creates a new file and returns the 257 corresponding `arvados.arvfile.ArvadosFile`. 258 """ 259 pathcomponents = path.split("/", 1) 260 if pathcomponents[0]: 261 item = self._items.get(pathcomponents[0]) 262 if len(pathcomponents) == 1: 263 if item is None: 264 # create new file 265 if create_type == COLLECTION: 266 item = Subcollection(self, pathcomponents[0]) 267 else: 268 item = ArvadosFile(self, pathcomponents[0]) 269 self._items[pathcomponents[0]] = item 270 self.set_committed(False) 271 self.notify(ADD, self, pathcomponents[0], item) 272 return item 273 else: 274 if item is None: 275 # create new collection 276 item = Subcollection(self, pathcomponents[0]) 277 self._items[pathcomponents[0]] = item 278 self.set_committed(False) 279 self.notify(ADD, self, pathcomponents[0], item) 280 if isinstance(item, RichCollectionBase): 281 return item.find_or_create(pathcomponents[1], create_type) 282 else: 283 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 284 else: 285 return self 286 287 @synchronized 288 def find(self, path: str) -> CollectionItem: 289 """Get the item at the given path 290 291 If `path` refers to a stream in this collection, returns a 292 corresponding `Subcollection` object. If `path` refers to a file in 293 this collection, returns a corresponding 294 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 295 this collection, then this method raises `NotADirectoryError`. 296 297 Arguments: 298 299 * path: str --- The path to find or create within this collection. 300 """ 301 if not path: 302 raise errors.ArgumentError("Parameter 'path' is empty.") 303 304 pathcomponents = path.split("/", 1) 305 if pathcomponents[0] == '': 306 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 307 308 item = self._items.get(pathcomponents[0]) 309 if item is None: 310 return None 311 elif len(pathcomponents) == 1: 312 return item 313 else: 314 if isinstance(item, RichCollectionBase): 315 if pathcomponents[1]: 316 return item.find(pathcomponents[1]) 317 else: 318 return item 319 else: 320 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 321 322 @synchronized 323 def mkdirs(self, path: str) -> 'Subcollection': 324 """Create and return a subcollection at `path` 325 326 If `path` exists within this collection, raises `FileExistsError`. 327 Otherwise, creates a stream at that path and returns the 328 corresponding `Subcollection`. 329 """ 330 if self.find(path) != None: 331 raise IOError(errno.EEXIST, "Directory or file exists", path) 332 333 return self.find_or_create(path, COLLECTION) 334 335 def open( 336 self, 337 path: str, 338 mode: str="r", 339 encoding: Optional[str]=None 340 ) -> IO: 341 """Open a file-like object within the collection 342 343 This method returns a file-like object that can read and/or write the 344 file located at `path` within the collection. If you attempt to write 345 a `path` that does not exist, the file is created with `find_or_create`. 346 If the file cannot be opened for any other reason, this method raises 347 `OSError` with an appropriate errno. 348 349 Arguments: 350 351 * path: str --- The path of the file to open within this collection 352 353 * mode: str --- The mode to open this file. Supports all the same 354 values as `builtins.open`. 355 356 * encoding: str | None --- The text encoding of the file. Only used 357 when the file is opened in text mode. The default is 358 platform-dependent. 359 360 """ 361 if not re.search(r'^[rwa][bt]?\+?$', mode): 362 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 363 364 if mode[0] == 'r' and '+' not in mode: 365 fclass = ArvadosFileReader 366 arvfile = self.find(path) 367 elif not self.writable(): 368 raise IOError(errno.EROFS, "Collection is read only") 369 else: 370 fclass = ArvadosFileWriter 371 arvfile = self.find_or_create(path, FILE) 372 373 if arvfile is None: 374 raise IOError(errno.ENOENT, "File not found", path) 375 if not isinstance(arvfile, ArvadosFile): 376 raise IOError(errno.EISDIR, "Is a directory", path) 377 378 if mode[0] == 'w': 379 arvfile.truncate(0) 380 381 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 382 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 383 if 'b' not in mode: 384 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 385 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 386 return f 387 388 def modified(self) -> bool: 389 """Indicate whether this collection has an API server record 390 391 Returns `False` if this collection corresponds to a record loaded from 392 the API server, `True` otherwise. 393 """ 394 return not self.committed() 395 396 @synchronized 397 def committed(self): 398 """Indicate whether this collection has an API server record 399 400 Returns `True` if this collection corresponds to a record loaded from 401 the API server, `False` otherwise. 402 """ 403 return self._committed 404 405 @synchronized 406 def set_committed(self, value: bool=True): 407 """Cache whether this collection has an API server record 408 409 .. ATTENTION:: Internal 410 This method is only meant to be used by other Collection methods. 411 412 Set this collection's cached "committed" flag to the given 413 value and propagates it as needed. 414 """ 415 if value == self._committed: 416 return 417 if value: 418 for k,v in self._items.items(): 419 v.set_committed(True) 420 self._committed = True 421 else: 422 self._committed = False 423 if self.parent is not None: 424 self.parent.set_committed(False) 425 426 @synchronized 427 def __iter__(self) -> Iterator[str]: 428 """Iterate names of streams and files in this collection 429 430 This method does not recurse. It only iterates the contents of this 431 collection's corresponding stream. 432 """ 433 return iter(self._items) 434 435 @synchronized 436 def __getitem__(self, k: str) -> CollectionItem: 437 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 438 439 This method does not recurse. If you want to search a path, use 440 `RichCollectionBase.find` instead. 441 """ 442 return self._items[k] 443 444 @synchronized 445 def __contains__(self, k: str) -> bool: 446 """Indicate whether this collection has an item with this name 447 448 This method does not recurse. It you want to check a path, use 449 `RichCollectionBase.exists` instead. 450 """ 451 return k in self._items 452 453 @synchronized 454 def __len__(self): 455 """Get the number of items directly contained in this collection 456 457 This method does not recurse. It only counts the streams and files 458 in this collection's corresponding stream. 459 """ 460 return len(self._items) 461 462 @must_be_writable 463 @synchronized 464 def __delitem__(self, p: str) -> None: 465 """Delete an item from this collection's stream 466 467 This method does not recurse. If you want to remove an item by a 468 path, use `RichCollectionBase.remove` instead. 469 """ 470 del self._items[p] 471 self.set_committed(False) 472 self.notify(DEL, self, p, None) 473 474 @synchronized 475 def keys(self) -> Iterator[str]: 476 """Iterate names of streams and files in this collection 477 478 This method does not recurse. It only iterates the contents of this 479 collection's corresponding stream. 480 """ 481 return self._items.keys() 482 483 @synchronized 484 def values(self) -> List[CollectionItem]: 485 """Get a list of objects in this collection's stream 486 487 The return value includes a `Subcollection` for every stream, and an 488 `arvados.arvfile.ArvadosFile` for every file, directly within this 489 collection's stream. This method does not recurse. 490 """ 491 return list(self._items.values()) 492 493 @synchronized 494 def items(self) -> List[Tuple[str, CollectionItem]]: 495 """Get a list of `(name, object)` tuples from this collection's stream 496 497 The return value includes a `Subcollection` for every stream, and an 498 `arvados.arvfile.ArvadosFile` for every file, directly within this 499 collection's stream. This method does not recurse. 500 """ 501 return list(self._items.items()) 502 503 def exists(self, path: str) -> bool: 504 """Indicate whether this collection includes an item at `path` 505 506 This method returns `True` if `path` refers to a stream or file within 507 this collection, else `False`. 508 509 Arguments: 510 511 * path: str --- The path to check for existence within this collection 512 """ 513 return self.find(path) is not None 514 515 @must_be_writable 516 @synchronized 517 def remove(self, path: str, recursive: bool=False) -> None: 518 """Remove the file or stream at `path` 519 520 Arguments: 521 522 * path: str --- The path of the item to remove from the collection 523 524 * recursive: bool --- Controls the method's behavior if `path` refers 525 to a nonempty stream. If `False` (the default), this method raises 526 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 527 items under the stream. 528 """ 529 if not path: 530 raise errors.ArgumentError("Parameter 'path' is empty.") 531 532 pathcomponents = path.split("/", 1) 533 item = self._items.get(pathcomponents[0]) 534 if item is None: 535 raise IOError(errno.ENOENT, "File not found", path) 536 if len(pathcomponents) == 1: 537 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 538 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 539 deleteditem = self._items[pathcomponents[0]] 540 del self._items[pathcomponents[0]] 541 self.set_committed(False) 542 self.notify(DEL, self, pathcomponents[0], deleteditem) 543 else: 544 item.remove(pathcomponents[1], recursive=recursive) 545 546 def _clonefrom(self, source): 547 for k,v in source.items(): 548 self._items[k] = v.clone(self, k) 549 550 def clone(self): 551 raise NotImplementedError() 552 553 @must_be_writable 554 @synchronized 555 def add( 556 self, 557 source_obj: CollectionItem, 558 target_name: str, 559 overwrite: bool=False, 560 reparent: bool=False, 561 ) -> None: 562 """Copy or move a file or subcollection object to this collection 563 564 Arguments: 565 566 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 567 to add to this collection 568 569 * target_name: str --- The path inside this collection where 570 `source_obj` should be added. 571 572 * overwrite: bool --- Controls the behavior of this method when the 573 collection already contains an object at `target_name`. If `False` 574 (the default), this method will raise `FileExistsError`. If `True`, 575 the object at `target_name` will be replaced with `source_obj`. 576 577 * reparent: bool --- Controls whether this method copies or moves 578 `source_obj`. If `False` (the default), `source_obj` is copied into 579 this collection. If `True`, `source_obj` is moved into this 580 collection. 581 """ 582 if target_name in self and not overwrite: 583 raise IOError(errno.EEXIST, "File already exists", target_name) 584 585 modified_from = None 586 if target_name in self: 587 modified_from = self[target_name] 588 589 # Actually make the move or copy. 590 if reparent: 591 source_obj._reparent(self, target_name) 592 item = source_obj 593 else: 594 item = source_obj.clone(self, target_name) 595 596 self._items[target_name] = item 597 self.set_committed(False) 598 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 599 self.set_has_remote_blocks(True) 600 601 if modified_from: 602 self.notify(MOD, self, target_name, (modified_from, item)) 603 else: 604 self.notify(ADD, self, target_name, item) 605 606 def _get_src_target(self, source, target_path, source_collection, create_dest): 607 if source_collection is None: 608 source_collection = self 609 610 # Find the object 611 if isinstance(source, str): 612 source_obj = source_collection.find(source) 613 if source_obj is None: 614 raise IOError(errno.ENOENT, "File not found", source) 615 sourcecomponents = source.split("/") 616 else: 617 source_obj = source 618 sourcecomponents = None 619 620 # Find parent collection the target path 621 targetcomponents = target_path.split("/") 622 623 # Determine the name to use. 624 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 625 626 if not target_name: 627 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 628 629 if create_dest: 630 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 631 else: 632 if len(targetcomponents) > 1: 633 target_dir = self.find("/".join(targetcomponents[0:-1])) 634 else: 635 target_dir = self 636 637 if target_dir is None: 638 raise IOError(errno.ENOENT, "Target directory not found", target_name) 639 640 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 641 target_dir = target_dir[target_name] 642 target_name = sourcecomponents[-1] 643 644 return (source_obj, target_dir, target_name) 645 646 @must_be_writable 647 @synchronized 648 def copy( 649 self, 650 source: Union[str, CollectionItem], 651 target_path: str, 652 source_collection: Optional['RichCollectionBase']=None, 653 overwrite: bool=False, 654 ) -> None: 655 """Copy a file or subcollection object to this collection 656 657 Arguments: 658 659 * source: str | arvados.arvfile.ArvadosFile | 660 arvados.collection.Subcollection --- The file or subcollection to 661 add to this collection. If `source` is a str, the object will be 662 found by looking up this path from `source_collection` (see 663 below). 664 665 * target_path: str --- The path inside this collection where the 666 source object should be added. 667 668 * source_collection: arvados.collection.Collection | None --- The 669 collection to find the source object from when `source` is a 670 path. Defaults to the current collection (`self`). 671 672 * overwrite: bool --- Controls the behavior of this method when the 673 collection already contains an object at `target_path`. If `False` 674 (the default), this method will raise `FileExistsError`. If `True`, 675 the object at `target_path` will be replaced with `source_obj`. 676 """ 677 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 678 target_dir.add(source_obj, target_name, overwrite, False) 679 680 @must_be_writable 681 @synchronized 682 def rename( 683 self, 684 source: Union[str, CollectionItem], 685 target_path: str, 686 source_collection: Optional['RichCollectionBase']=None, 687 overwrite: bool=False, 688 ) -> None: 689 """Move a file or subcollection object to this collection 690 691 Arguments: 692 693 * source: str | arvados.arvfile.ArvadosFile | 694 arvados.collection.Subcollection --- The file or subcollection to 695 add to this collection. If `source` is a str, the object will be 696 found by looking up this path from `source_collection` (see 697 below). 698 699 * target_path: str --- The path inside this collection where the 700 source object should be added. 701 702 * source_collection: arvados.collection.Collection | None --- The 703 collection to find the source object from when `source` is a 704 path. Defaults to the current collection (`self`). 705 706 * overwrite: bool --- Controls the behavior of this method when the 707 collection already contains an object at `target_path`. If `False` 708 (the default), this method will raise `FileExistsError`. If `True`, 709 the object at `target_path` will be replaced with `source_obj`. 710 """ 711 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 712 if not source_obj.writable(): 713 raise IOError(errno.EROFS, "Source collection is read only", source) 714 target_dir.add(source_obj, target_name, overwrite, True) 715 716 def portable_manifest_text(self, stream_name: str=".") -> str: 717 """Get the portable manifest text for this collection 718 719 The portable manifest text is normalized, and does not include access 720 tokens. This method does not flush outstanding blocks to Keep. 721 722 Arguments: 723 724 * stream_name: str --- The name to use for this collection's stream in 725 the generated manifest. Default `'.'`. 726 """ 727 return self._get_manifest_text(stream_name, True, True) 728 729 @synchronized 730 def manifest_text( 731 self, 732 stream_name: str=".", 733 strip: bool=False, 734 normalize: bool=False, 735 only_committed: bool=False, 736 ) -> str: 737 """Get the manifest text for this collection 738 739 Arguments: 740 741 * stream_name: str --- The name to use for this collection's stream in 742 the generated manifest. Default `'.'`. 743 744 * strip: bool --- Controls whether or not the returned manifest text 745 includes access tokens. If `False` (the default), the manifest text 746 will include access tokens. If `True`, the manifest text will not 747 include access tokens. 748 749 * normalize: bool --- Controls whether or not the returned manifest 750 text is normalized. Default `False`. 751 752 * only_committed: bool --- Controls whether or not this method uploads 753 pending data to Keep before building and returning the manifest text. 754 If `False` (the default), this method will finish uploading all data 755 to Keep, then return the final manifest. If `True`, this method will 756 build and return a manifest that only refers to the data that has 757 finished uploading at the time this method was called. 758 """ 759 if not only_committed: 760 self._my_block_manager().commit_all() 761 return self._get_manifest_text(stream_name, strip, normalize, 762 only_committed=only_committed) 763 764 @synchronized 765 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 766 """Get the manifest text for this collection, sub collections and files. 767 768 :stream_name: 769 Name to use for this stream (directory) 770 771 :strip: 772 If True, remove signing tokens from block locators if present. 773 If False (default), block locators are left unchanged. 774 775 :normalize: 776 If True, always export the manifest text in normalized form 777 even if the Collection is not modified. If False (default) and the collection 778 is not modified, return the original manifest text even if it is not 779 in normalized form. 780 781 :only_committed: 782 If True, only include blocks that were already committed to Keep. 783 784 """ 785 786 if not self.committed() or self._manifest_text is None or normalize: 787 stream = {} 788 buf = [] 789 sorted_keys = sorted(self.keys()) 790 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 791 # Create a stream per file `k` 792 arvfile = self[filename] 793 filestream = [] 794 for segment in arvfile.segments(): 795 loc = segment.locator 796 if arvfile.parent._my_block_manager().is_bufferblock(loc): 797 if only_committed: 798 continue 799 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 800 if strip: 801 loc = KeepLocator(loc).stripped() 802 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 803 segment.segment_offset, segment.range_size)) 804 stream[filename] = filestream 805 if stream: 806 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 807 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 808 buf.append(self[dirname].manifest_text( 809 stream_name=os.path.join(stream_name, dirname), 810 strip=strip, normalize=True, only_committed=only_committed)) 811 return "".join(buf) 812 else: 813 if strip: 814 return self.stripped_manifest() 815 else: 816 return self._manifest_text 817 818 @synchronized 819 def _copy_remote_blocks(self, remote_blocks={}): 820 """Scan through the entire collection and ask Keep to copy remote blocks. 821 822 When accessing a remote collection, blocks will have a remote signature 823 (+R instead of +A). Collect these signatures and request Keep to copy the 824 blocks to the local cluster, returning local (+A) signatures. 825 826 :remote_blocks: 827 Shared cache of remote to local block mappings. This is used to avoid 828 doing extra work when blocks are shared by more than one file in 829 different subdirectories. 830 831 """ 832 for item in self: 833 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 834 return remote_blocks 835 836 @synchronized 837 def diff( 838 self, 839 end_collection: 'RichCollectionBase', 840 prefix: str=".", 841 holding_collection: Optional['Collection']=None, 842 ) -> ChangeList: 843 """Build a list of differences between this collection and another 844 845 Arguments: 846 847 * end_collection: arvados.collection.RichCollectionBase --- A 848 collection object with the desired end state. The returned diff 849 list will describe how to go from the current collection object 850 `self` to `end_collection`. 851 852 * prefix: str --- The name to use for this collection's stream in 853 the diff list. Default `'.'`. 854 855 * holding_collection: arvados.collection.Collection | None --- A 856 collection object used to hold objects for the returned diff 857 list. By default, a new empty collection is created. 858 """ 859 changes = [] 860 if holding_collection is None: 861 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 862 for k in self: 863 if k not in end_collection: 864 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 865 for k in end_collection: 866 if k in self: 867 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 868 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 869 elif end_collection[k] != self[k]: 870 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 871 else: 872 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 873 else: 874 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 875 return changes 876 877 @must_be_writable 878 @synchronized 879 def apply(self, changes: ChangeList) -> None: 880 """Apply a list of changes from to this collection 881 882 This method takes a list of changes generated by 883 `RichCollectionBase.diff` and applies it to this 884 collection. Afterward, the state of this collection object will 885 match the state of `end_collection` passed to `diff`. If a change 886 conflicts with a local change, it will be saved to an alternate path 887 indicating the conflict. 888 889 Arguments: 890 891 * changes: arvados.collection.ChangeList --- The list of differences 892 generated by `RichCollectionBase.diff`. 893 """ 894 if changes: 895 self.set_committed(False) 896 for change in changes: 897 event_type = change[0] 898 path = change[1] 899 initial = change[2] 900 local = self.find(path) 901 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 902 time.gmtime())) 903 if event_type == ADD: 904 if local is None: 905 # No local file at path, safe to copy over new file 906 self.copy(initial, path) 907 elif local is not None and local != initial: 908 # There is already local file and it is different: 909 # save change to conflict file. 910 self.copy(initial, conflictpath) 911 elif event_type == MOD or event_type == TOK: 912 final = change[3] 913 if local == initial: 914 # Local matches the "initial" item so it has not 915 # changed locally and is safe to update. 916 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 917 # Replace contents of local file with new contents 918 local.replace_contents(final) 919 else: 920 # Overwrite path with new item; this can happen if 921 # path was a file and is now a collection or vice versa 922 self.copy(final, path, overwrite=True) 923 else: 924 # Local is missing (presumably deleted) or local doesn't 925 # match the "start" value, so save change to conflict file 926 self.copy(final, conflictpath) 927 elif event_type == DEL: 928 if local == initial: 929 # Local item matches "initial" value, so it is safe to remove. 930 self.remove(path, recursive=True) 931 # else, the file is modified or already removed, in either 932 # case we don't want to try to remove it. 933 934 def portable_data_hash(self) -> str: 935 """Get the portable data hash for this collection's manifest""" 936 if self._manifest_locator and self.committed(): 937 # If the collection is already saved on the API server, and it's committed 938 # then return API server's PDH response. 939 return self._portable_data_hash 940 else: 941 stripped = self.portable_manifest_text().encode() 942 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 943 944 @synchronized 945 def subscribe(self, callback: ChangeCallback) -> None: 946 """Set a notify callback for changes to this collection 947 948 Arguments: 949 950 * callback: arvados.collection.ChangeCallback --- The callable to 951 call each time the collection is changed. 952 """ 953 if self._callback is None: 954 self._callback = callback 955 else: 956 raise errors.ArgumentError("A callback is already set on this collection.") 957 958 @synchronized 959 def unsubscribe(self) -> None: 960 """Remove any notify callback set for changes to this collection""" 961 if self._callback is not None: 962 self._callback = None 963 964 @synchronized 965 def notify( 966 self, 967 event: ChangeType, 968 collection: 'RichCollectionBase', 969 name: str, 970 item: CollectionItem, 971 ) -> None: 972 """Notify any subscribed callback about a change to this collection 973 974 .. ATTENTION:: Internal 975 This method is only meant to be used by other Collection methods. 976 977 If a callback has been registered with `RichCollectionBase.subscribe`, 978 it will be called with information about a change to this collection. 979 Then this notification will be propagated to this collection's root. 980 981 Arguments: 982 983 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 984 the collection. 985 986 * collection: arvados.collection.RichCollectionBase --- The 987 collection that was modified. 988 989 * name: str --- The name of the file or stream within `collection` that 990 was modified. 991 992 * item: arvados.arvfile.ArvadosFile | 993 arvados.collection.Subcollection --- The new contents at `name` 994 within `collection`. 995 """ 996 if self._callback: 997 self._callback(event, collection, name, item) 998 self.root_collection().notify(event, collection, name, item) 999 1000 @synchronized 1001 def __eq__(self, other: Any) -> bool: 1002 """Indicate whether this collection object is equal to another""" 1003 if other is self: 1004 return True 1005 if not isinstance(other, RichCollectionBase): 1006 return False 1007 if len(self._items) != len(other): 1008 return False 1009 for k in self._items: 1010 if k not in other: 1011 return False 1012 if self._items[k] != other[k]: 1013 return False 1014 return True 1015 1016 def __ne__(self, other: Any) -> bool: 1017 """Indicate whether this collection object is not equal to another""" 1018 return not self.__eq__(other) 1019 1020 @synchronized 1021 def flush(self) -> None: 1022 """Upload any pending data to Keep""" 1023 for e in self.values(): 1024 e.flush()
Base class for Collection classes
180 def writable(self) -> bool: 181 """Indicate whether this collection object can be modified 182 183 This method returns `False` if this object is a `CollectionReader`, 184 else `True`. 185 """ 186 raise NotImplementedError()
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
188 def root_collection(self) -> 'Collection': 189 """Get this collection's root collection object 190 191 If you open a subcollection with `Collection.find`, calling this method 192 on that subcollection returns the source Collection object. 193 """ 194 raise NotImplementedError()
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
196 def stream_name(self) -> str: 197 """Get the name of the manifest stream represented by this collection 198 199 If you open a subcollection with `Collection.find`, calling this method 200 on that subcollection returns the name of the stream you opened. 201 """ 202 raise NotImplementedError()
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
204 @synchronized 205 def has_remote_blocks(self) -> bool: 206 """Indiciate whether the collection refers to remote data 207 208 Returns `True` if the collection manifest includes any Keep locators 209 with a remote hint (`+R`), else `False`. 210 """ 211 if self._has_remote_blocks: 212 return True 213 for item in self: 214 if self[item].has_remote_blocks(): 215 return True 216 return False
Indiciate whether the collection refers to remote data
Returns True
if the collection manifest includes any Keep locators
with a remote hint (+R
), else False
.
218 @synchronized 219 def set_has_remote_blocks(self, val: bool) -> None: 220 """Cache whether this collection refers to remote blocks 221 222 .. ATTENTION:: Internal 223 This method is only meant to be used by other Collection methods. 224 225 Set this collection's cached "has remote blocks" flag to the given 226 value. 227 """ 228 self._has_remote_blocks = val 229 if self.parent: 230 self.parent.set_has_remote_blocks(val)
Cache whether this collection refers to remote blocks
Set this collection’s cached “has remote blocks” flag to the given value.
232 @must_be_writable 233 @synchronized 234 def find_or_create( 235 self, 236 path: str, 237 create_type: CreateType, 238 ) -> CollectionItem: 239 """Get the item at the given path, creating it if necessary 240 241 If `path` refers to a stream in this collection, returns a 242 corresponding `Subcollection` object. If `path` refers to a file in 243 this collection, returns a corresponding 244 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 245 this collection, then this method creates a new object and returns 246 it, creating parent streams as needed. The type of object created is 247 determined by the value of `create_type`. 248 249 Arguments: 250 251 * path: str --- The path to find or create within this collection. 252 253 * create_type: Literal[COLLECTION, FILE] --- The type of object to 254 create at `path` if one does not exist. Passing `COLLECTION` 255 creates a stream and returns the corresponding 256 `Subcollection`. Passing `FILE` creates a new file and returns the 257 corresponding `arvados.arvfile.ArvadosFile`. 258 """ 259 pathcomponents = path.split("/", 1) 260 if pathcomponents[0]: 261 item = self._items.get(pathcomponents[0]) 262 if len(pathcomponents) == 1: 263 if item is None: 264 # create new file 265 if create_type == COLLECTION: 266 item = Subcollection(self, pathcomponents[0]) 267 else: 268 item = ArvadosFile(self, pathcomponents[0]) 269 self._items[pathcomponents[0]] = item 270 self.set_committed(False) 271 self.notify(ADD, self, pathcomponents[0], item) 272 return item 273 else: 274 if item is None: 275 # create new collection 276 item = Subcollection(self, pathcomponents[0]) 277 self._items[pathcomponents[0]] = item 278 self.set_committed(False) 279 self.notify(ADD, self, pathcomponents[0], item) 280 if isinstance(item, RichCollectionBase): 281 return item.find_or_create(pathcomponents[1], create_type) 282 else: 283 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 284 else: 285 return self
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
287 @synchronized 288 def find(self, path: str) -> CollectionItem: 289 """Get the item at the given path 290 291 If `path` refers to a stream in this collection, returns a 292 corresponding `Subcollection` object. If `path` refers to a file in 293 this collection, returns a corresponding 294 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 295 this collection, then this method raises `NotADirectoryError`. 296 297 Arguments: 298 299 * path: str --- The path to find or create within this collection. 300 """ 301 if not path: 302 raise errors.ArgumentError("Parameter 'path' is empty.") 303 304 pathcomponents = path.split("/", 1) 305 if pathcomponents[0] == '': 306 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 307 308 item = self._items.get(pathcomponents[0]) 309 if item is None: 310 return None 311 elif len(pathcomponents) == 1: 312 return item 313 else: 314 if isinstance(item, RichCollectionBase): 315 if pathcomponents[1]: 316 return item.find(pathcomponents[1]) 317 else: 318 return item 319 else: 320 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
322 @synchronized 323 def mkdirs(self, path: str) -> 'Subcollection': 324 """Create and return a subcollection at `path` 325 326 If `path` exists within this collection, raises `FileExistsError`. 327 Otherwise, creates a stream at that path and returns the 328 corresponding `Subcollection`. 329 """ 330 if self.find(path) != None: 331 raise IOError(errno.EEXIST, "Directory or file exists", path) 332 333 return self.find_or_create(path, COLLECTION)
Create and return a subcollection at path
If path
exists within this collection, raises FileExistsError
.
Otherwise, creates a stream at that path and returns the
corresponding Subcollection
.
335 def open( 336 self, 337 path: str, 338 mode: str="r", 339 encoding: Optional[str]=None 340 ) -> IO: 341 """Open a file-like object within the collection 342 343 This method returns a file-like object that can read and/or write the 344 file located at `path` within the collection. If you attempt to write 345 a `path` that does not exist, the file is created with `find_or_create`. 346 If the file cannot be opened for any other reason, this method raises 347 `OSError` with an appropriate errno. 348 349 Arguments: 350 351 * path: str --- The path of the file to open within this collection 352 353 * mode: str --- The mode to open this file. Supports all the same 354 values as `builtins.open`. 355 356 * encoding: str | None --- The text encoding of the file. Only used 357 when the file is opened in text mode. The default is 358 platform-dependent. 359 360 """ 361 if not re.search(r'^[rwa][bt]?\+?$', mode): 362 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 363 364 if mode[0] == 'r' and '+' not in mode: 365 fclass = ArvadosFileReader 366 arvfile = self.find(path) 367 elif not self.writable(): 368 raise IOError(errno.EROFS, "Collection is read only") 369 else: 370 fclass = ArvadosFileWriter 371 arvfile = self.find_or_create(path, FILE) 372 373 if arvfile is None: 374 raise IOError(errno.ENOENT, "File not found", path) 375 if not isinstance(arvfile, ArvadosFile): 376 raise IOError(errno.EISDIR, "Is a directory", path) 377 378 if mode[0] == 'w': 379 arvfile.truncate(0) 380 381 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 382 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 383 if 'b' not in mode: 384 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 385 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 386 return f
Open a file-like object within the collection
This method returns a file-like object that can read and/or write the
file located at path
within the collection. If you attempt to write
a path
that does not exist, the file is created with find_or_create
.
If the file cannot be opened for any other reason, this method raises
OSError
with an appropriate errno.
Arguments:
path: str — The path of the file to open within this collection
mode: str — The mode to open this file. Supports all the same values as
builtins.open
.encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.
388 def modified(self) -> bool: 389 """Indicate whether this collection has an API server record 390 391 Returns `False` if this collection corresponds to a record loaded from 392 the API server, `True` otherwise. 393 """ 394 return not self.committed()
Indicate whether this collection has an API server record
Returns False
if this collection corresponds to a record loaded from
the API server, True
otherwise.
396 @synchronized 397 def committed(self): 398 """Indicate whether this collection has an API server record 399 400 Returns `True` if this collection corresponds to a record loaded from 401 the API server, `False` otherwise. 402 """ 403 return self._committed
Indicate whether this collection has an API server record
Returns True
if this collection corresponds to a record loaded from
the API server, False
otherwise.
405 @synchronized 406 def set_committed(self, value: bool=True): 407 """Cache whether this collection has an API server record 408 409 .. ATTENTION:: Internal 410 This method is only meant to be used by other Collection methods. 411 412 Set this collection's cached "committed" flag to the given 413 value and propagates it as needed. 414 """ 415 if value == self._committed: 416 return 417 if value: 418 for k,v in self._items.items(): 419 v.set_committed(True) 420 self._committed = True 421 else: 422 self._committed = False 423 if self.parent is not None: 424 self.parent.set_committed(False)
Cache whether this collection has an API server record
Set this collection’s cached “committed” flag to the given value and propagates it as needed.
474 @synchronized 475 def keys(self) -> Iterator[str]: 476 """Iterate names of streams and files in this collection 477 478 This method does not recurse. It only iterates the contents of this 479 collection's corresponding stream. 480 """ 481 return self._items.keys()
Iterate names of streams and files in this collection
This method does not recurse. It only iterates the contents of this collection’s corresponding stream.
483 @synchronized 484 def values(self) -> List[CollectionItem]: 485 """Get a list of objects in this collection's stream 486 487 The return value includes a `Subcollection` for every stream, and an 488 `arvados.arvfile.ArvadosFile` for every file, directly within this 489 collection's stream. This method does not recurse. 490 """ 491 return list(self._items.values())
Get a list of objects in this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
493 @synchronized 494 def items(self) -> List[Tuple[str, CollectionItem]]: 495 """Get a list of `(name, object)` tuples from this collection's stream 496 497 The return value includes a `Subcollection` for every stream, and an 498 `arvados.arvfile.ArvadosFile` for every file, directly within this 499 collection's stream. This method does not recurse. 500 """ 501 return list(self._items.items())
Get a list of (name, object)
tuples from this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
503 def exists(self, path: str) -> bool: 504 """Indicate whether this collection includes an item at `path` 505 506 This method returns `True` if `path` refers to a stream or file within 507 this collection, else `False`. 508 509 Arguments: 510 511 * path: str --- The path to check for existence within this collection 512 """ 513 return self.find(path) is not None
Indicate whether this collection includes an item at path
This method returns True
if path
refers to a stream or file within
this collection, else False
.
Arguments:
- path: str — The path to check for existence within this collection
515 @must_be_writable 516 @synchronized 517 def remove(self, path: str, recursive: bool=False) -> None: 518 """Remove the file or stream at `path` 519 520 Arguments: 521 522 * path: str --- The path of the item to remove from the collection 523 524 * recursive: bool --- Controls the method's behavior if `path` refers 525 to a nonempty stream. If `False` (the default), this method raises 526 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 527 items under the stream. 528 """ 529 if not path: 530 raise errors.ArgumentError("Parameter 'path' is empty.") 531 532 pathcomponents = path.split("/", 1) 533 item = self._items.get(pathcomponents[0]) 534 if item is None: 535 raise IOError(errno.ENOENT, "File not found", path) 536 if len(pathcomponents) == 1: 537 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 538 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 539 deleteditem = self._items[pathcomponents[0]] 540 del self._items[pathcomponents[0]] 541 self.set_committed(False) 542 self.notify(DEL, self, pathcomponents[0], deleteditem) 543 else: 544 item.remove(pathcomponents[1], recursive=recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
553 @must_be_writable 554 @synchronized 555 def add( 556 self, 557 source_obj: CollectionItem, 558 target_name: str, 559 overwrite: bool=False, 560 reparent: bool=False, 561 ) -> None: 562 """Copy or move a file or subcollection object to this collection 563 564 Arguments: 565 566 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 567 to add to this collection 568 569 * target_name: str --- The path inside this collection where 570 `source_obj` should be added. 571 572 * overwrite: bool --- Controls the behavior of this method when the 573 collection already contains an object at `target_name`. If `False` 574 (the default), this method will raise `FileExistsError`. If `True`, 575 the object at `target_name` will be replaced with `source_obj`. 576 577 * reparent: bool --- Controls whether this method copies or moves 578 `source_obj`. If `False` (the default), `source_obj` is copied into 579 this collection. If `True`, `source_obj` is moved into this 580 collection. 581 """ 582 if target_name in self and not overwrite: 583 raise IOError(errno.EEXIST, "File already exists", target_name) 584 585 modified_from = None 586 if target_name in self: 587 modified_from = self[target_name] 588 589 # Actually make the move or copy. 590 if reparent: 591 source_obj._reparent(self, target_name) 592 item = source_obj 593 else: 594 item = source_obj.clone(self, target_name) 595 596 self._items[target_name] = item 597 self.set_committed(False) 598 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 599 self.set_has_remote_blocks(True) 600 601 if modified_from: 602 self.notify(MOD, self, target_name, (modified_from, item)) 603 else: 604 self.notify(ADD, self, target_name, item)
Copy or move a file or subcollection object to this collection
Arguments:
source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection
target_name: str — The path inside this collection where
source_obj
should be added.overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_name
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_name
will be replaced withsource_obj
.reparent: bool — Controls whether this method copies or moves
source_obj
. IfFalse
(the default),source_obj
is copied into this collection. IfTrue
,source_obj
is moved into this collection.
646 @must_be_writable 647 @synchronized 648 def copy( 649 self, 650 source: Union[str, CollectionItem], 651 target_path: str, 652 source_collection: Optional['RichCollectionBase']=None, 653 overwrite: bool=False, 654 ) -> None: 655 """Copy a file or subcollection object to this collection 656 657 Arguments: 658 659 * source: str | arvados.arvfile.ArvadosFile | 660 arvados.collection.Subcollection --- The file or subcollection to 661 add to this collection. If `source` is a str, the object will be 662 found by looking up this path from `source_collection` (see 663 below). 664 665 * target_path: str --- The path inside this collection where the 666 source object should be added. 667 668 * source_collection: arvados.collection.Collection | None --- The 669 collection to find the source object from when `source` is a 670 path. Defaults to the current collection (`self`). 671 672 * overwrite: bool --- Controls the behavior of this method when the 673 collection already contains an object at `target_path`. If `False` 674 (the default), this method will raise `FileExistsError`. If `True`, 675 the object at `target_path` will be replaced with `source_obj`. 676 """ 677 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 678 target_dir.add(source_obj, target_name, overwrite, False)
Copy a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
680 @must_be_writable 681 @synchronized 682 def rename( 683 self, 684 source: Union[str, CollectionItem], 685 target_path: str, 686 source_collection: Optional['RichCollectionBase']=None, 687 overwrite: bool=False, 688 ) -> None: 689 """Move a file or subcollection object to this collection 690 691 Arguments: 692 693 * source: str | arvados.arvfile.ArvadosFile | 694 arvados.collection.Subcollection --- The file or subcollection to 695 add to this collection. If `source` is a str, the object will be 696 found by looking up this path from `source_collection` (see 697 below). 698 699 * target_path: str --- The path inside this collection where the 700 source object should be added. 701 702 * source_collection: arvados.collection.Collection | None --- The 703 collection to find the source object from when `source` is a 704 path. Defaults to the current collection (`self`). 705 706 * overwrite: bool --- Controls the behavior of this method when the 707 collection already contains an object at `target_path`. If `False` 708 (the default), this method will raise `FileExistsError`. If `True`, 709 the object at `target_path` will be replaced with `source_obj`. 710 """ 711 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 712 if not source_obj.writable(): 713 raise IOError(errno.EROFS, "Source collection is read only", source) 714 target_dir.add(source_obj, target_name, overwrite, True)
Move a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
716 def portable_manifest_text(self, stream_name: str=".") -> str: 717 """Get the portable manifest text for this collection 718 719 The portable manifest text is normalized, and does not include access 720 tokens. This method does not flush outstanding blocks to Keep. 721 722 Arguments: 723 724 * stream_name: str --- The name to use for this collection's stream in 725 the generated manifest. Default `'.'`. 726 """ 727 return self._get_manifest_text(stream_name, True, True)
Get the portable manifest text for this collection
The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.
Arguments:
- stream_name: str — The name to use for this collection’s stream in
the generated manifest. Default
'.'
.
729 @synchronized 730 def manifest_text( 731 self, 732 stream_name: str=".", 733 strip: bool=False, 734 normalize: bool=False, 735 only_committed: bool=False, 736 ) -> str: 737 """Get the manifest text for this collection 738 739 Arguments: 740 741 * stream_name: str --- The name to use for this collection's stream in 742 the generated manifest. Default `'.'`. 743 744 * strip: bool --- Controls whether or not the returned manifest text 745 includes access tokens. If `False` (the default), the manifest text 746 will include access tokens. If `True`, the manifest text will not 747 include access tokens. 748 749 * normalize: bool --- Controls whether or not the returned manifest 750 text is normalized. Default `False`. 751 752 * only_committed: bool --- Controls whether or not this method uploads 753 pending data to Keep before building and returning the manifest text. 754 If `False` (the default), this method will finish uploading all data 755 to Keep, then return the final manifest. If `True`, this method will 756 build and return a manifest that only refers to the data that has 757 finished uploading at the time this method was called. 758 """ 759 if not only_committed: 760 self._my_block_manager().commit_all() 761 return self._get_manifest_text(stream_name, strip, normalize, 762 only_committed=only_committed)
Get the manifest text for this collection
Arguments:
stream_name: str — The name to use for this collection’s stream in the generated manifest. Default
'.'
.strip: bool — Controls whether or not the returned manifest text includes access tokens. If
False
(the default), the manifest text will include access tokens. IfTrue
, the manifest text will not include access tokens.normalize: bool — Controls whether or not the returned manifest text is normalized. Default
False
.only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If
False
(the default), this method will finish uploading all data to Keep, then return the final manifest. IfTrue
, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.
836 @synchronized 837 def diff( 838 self, 839 end_collection: 'RichCollectionBase', 840 prefix: str=".", 841 holding_collection: Optional['Collection']=None, 842 ) -> ChangeList: 843 """Build a list of differences between this collection and another 844 845 Arguments: 846 847 * end_collection: arvados.collection.RichCollectionBase --- A 848 collection object with the desired end state. The returned diff 849 list will describe how to go from the current collection object 850 `self` to `end_collection`. 851 852 * prefix: str --- The name to use for this collection's stream in 853 the diff list. Default `'.'`. 854 855 * holding_collection: arvados.collection.Collection | None --- A 856 collection object used to hold objects for the returned diff 857 list. By default, a new empty collection is created. 858 """ 859 changes = [] 860 if holding_collection is None: 861 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 862 for k in self: 863 if k not in end_collection: 864 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 865 for k in end_collection: 866 if k in self: 867 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 868 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 869 elif end_collection[k] != self[k]: 870 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 871 else: 872 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 873 else: 874 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 875 return changes
Build a list of differences between this collection and another
Arguments:
end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object
self
toend_collection
.prefix: str — The name to use for this collection’s stream in the diff list. Default
'.'
.holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.
877 @must_be_writable 878 @synchronized 879 def apply(self, changes: ChangeList) -> None: 880 """Apply a list of changes from to this collection 881 882 This method takes a list of changes generated by 883 `RichCollectionBase.diff` and applies it to this 884 collection. Afterward, the state of this collection object will 885 match the state of `end_collection` passed to `diff`. If a change 886 conflicts with a local change, it will be saved to an alternate path 887 indicating the conflict. 888 889 Arguments: 890 891 * changes: arvados.collection.ChangeList --- The list of differences 892 generated by `RichCollectionBase.diff`. 893 """ 894 if changes: 895 self.set_committed(False) 896 for change in changes: 897 event_type = change[0] 898 path = change[1] 899 initial = change[2] 900 local = self.find(path) 901 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 902 time.gmtime())) 903 if event_type == ADD: 904 if local is None: 905 # No local file at path, safe to copy over new file 906 self.copy(initial, path) 907 elif local is not None and local != initial: 908 # There is already local file and it is different: 909 # save change to conflict file. 910 self.copy(initial, conflictpath) 911 elif event_type == MOD or event_type == TOK: 912 final = change[3] 913 if local == initial: 914 # Local matches the "initial" item so it has not 915 # changed locally and is safe to update. 916 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 917 # Replace contents of local file with new contents 918 local.replace_contents(final) 919 else: 920 # Overwrite path with new item; this can happen if 921 # path was a file and is now a collection or vice versa 922 self.copy(final, path, overwrite=True) 923 else: 924 # Local is missing (presumably deleted) or local doesn't 925 # match the "start" value, so save change to conflict file 926 self.copy(final, conflictpath) 927 elif event_type == DEL: 928 if local == initial: 929 # Local item matches "initial" value, so it is safe to remove. 930 self.remove(path, recursive=True) 931 # else, the file is modified or already removed, in either 932 # case we don't want to try to remove it.
Apply a list of changes from to this collection
This method takes a list of changes generated by
RichCollectionBase.diff
and applies it to this
collection. Afterward, the state of this collection object will
match the state of end_collection
passed to diff
. If a change
conflicts with a local change, it will be saved to an alternate path
indicating the conflict.
Arguments:
- changes: arvados.collection.ChangeList — The list of differences
generated by
RichCollectionBase.diff
.
934 def portable_data_hash(self) -> str: 935 """Get the portable data hash for this collection's manifest""" 936 if self._manifest_locator and self.committed(): 937 # If the collection is already saved on the API server, and it's committed 938 # then return API server's PDH response. 939 return self._portable_data_hash 940 else: 941 stripped = self.portable_manifest_text().encode() 942 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
Get the portable data hash for this collection’s manifest
944 @synchronized 945 def subscribe(self, callback: ChangeCallback) -> None: 946 """Set a notify callback for changes to this collection 947 948 Arguments: 949 950 * callback: arvados.collection.ChangeCallback --- The callable to 951 call each time the collection is changed. 952 """ 953 if self._callback is None: 954 self._callback = callback 955 else: 956 raise errors.ArgumentError("A callback is already set on this collection.")
Set a notify callback for changes to this collection
Arguments:
- callback: arvados.collection.ChangeCallback — The callable to call each time the collection is changed.
958 @synchronized 959 def unsubscribe(self) -> None: 960 """Remove any notify callback set for changes to this collection""" 961 if self._callback is not None: 962 self._callback = None
Remove any notify callback set for changes to this collection
964 @synchronized 965 def notify( 966 self, 967 event: ChangeType, 968 collection: 'RichCollectionBase', 969 name: str, 970 item: CollectionItem, 971 ) -> None: 972 """Notify any subscribed callback about a change to this collection 973 974 .. ATTENTION:: Internal 975 This method is only meant to be used by other Collection methods. 976 977 If a callback has been registered with `RichCollectionBase.subscribe`, 978 it will be called with information about a change to this collection. 979 Then this notification will be propagated to this collection's root. 980 981 Arguments: 982 983 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 984 the collection. 985 986 * collection: arvados.collection.RichCollectionBase --- The 987 collection that was modified. 988 989 * name: str --- The name of the file or stream within `collection` that 990 was modified. 991 992 * item: arvados.arvfile.ArvadosFile | 993 arvados.collection.Subcollection --- The new contents at `name` 994 within `collection`. 995 """ 996 if self._callback: 997 self._callback(event, collection, name, item) 998 self.root_collection().notify(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1020 @synchronized 1021 def flush(self) -> None: 1022 """Upload any pending data to Keep""" 1023 for e in self.values(): 1024 e.flush()
Upload any pending data to Keep
Inherited Members
1027class Collection(RichCollectionBase): 1028 """Read and manipulate an Arvados collection 1029 1030 This class provides a high-level interface to create, read, and update 1031 Arvados collections and their contents. Refer to the Arvados Python SDK 1032 cookbook for [an introduction to using the Collection class][cookbook]. 1033 1034 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1035 """ 1036 1037 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1038 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1039 keep_client: Optional['arvados.keep.KeepClient']=None, 1040 num_retries: int=10, 1041 parent: Optional['Collection']=None, 1042 apiconfig: Optional[Mapping[str, str]]=None, 1043 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1044 replication_desired: Optional[int]=None, 1045 storage_classes_desired: Optional[List[str]]=None, 1046 put_threads: Optional[int]=None): 1047 """Initialize a Collection object 1048 1049 Arguments: 1050 1051 * manifest_locator_or_text: str | None --- This string can contain a 1052 collection manifest text, portable data hash, or UUID. When given a 1053 portable data hash or UUID, this instance will load a collection 1054 record from the API server. Otherwise, this instance will represent a 1055 new collection without an API server record. The default value `None` 1056 instantiates a new collection with an empty manifest. 1057 1058 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1059 Arvados API client object this instance uses to make requests. If 1060 none is given, this instance creates its own client using the 1061 settings from `apiconfig` (see below). If your client instantiates 1062 many Collection objects, you can help limit memory utilization by 1063 calling `arvados.api.api` to construct an 1064 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1065 for every Collection. 1066 1067 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1068 object this instance uses to make requests. If none is given, this 1069 instance creates its own client using its `api_client`. 1070 1071 * num_retries: int --- The number of times that client requests are 1072 retried. Default 10. 1073 1074 * parent: arvados.collection.Collection | None --- The parent Collection 1075 object of this instance, if any. This argument is primarily used by 1076 other Collection methods; user client code shouldn't need to use it. 1077 1078 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1079 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1080 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1081 Collection object constructs one from these settings. If no 1082 mapping is provided, calls `arvados.config.settings` to get these 1083 parameters from user configuration. 1084 1085 * block_manager: arvados.arvfile._BlockManager | None --- The 1086 _BlockManager object used by this instance to coordinate reading 1087 and writing Keep data blocks. If none is given, this instance 1088 constructs its own. This argument is primarily used by other 1089 Collection methods; user client code shouldn't need to use it. 1090 1091 * replication_desired: int | None --- This controls both the value of 1092 the `replication_desired` field on API collection records saved by 1093 this class, as well as the number of Keep services that the object 1094 writes new data blocks to. If none is given, uses the default value 1095 configured for the cluster. 1096 1097 * storage_classes_desired: list[str] | None --- This controls both 1098 the value of the `storage_classes_desired` field on API collection 1099 records saved by this class, as well as selecting which specific 1100 Keep services the object writes new data blocks to. If none is 1101 given, defaults to an empty list. 1102 1103 * put_threads: int | None --- The number of threads to run 1104 simultaneously to upload data blocks to Keep. This value is used when 1105 building a new `block_manager`. It is unused when a `block_manager` 1106 is provided. 1107 """ 1108 1109 if storage_classes_desired and type(storage_classes_desired) is not list: 1110 raise errors.ArgumentError("storage_classes_desired must be list type.") 1111 1112 super(Collection, self).__init__(parent) 1113 self._api_client = api_client 1114 self._keep_client = keep_client 1115 1116 # Use the keep client from ThreadSafeApiCache 1117 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1118 self._keep_client = self._api_client.keep 1119 1120 self._block_manager = block_manager 1121 self.replication_desired = replication_desired 1122 self._storage_classes_desired = storage_classes_desired 1123 self.put_threads = put_threads 1124 1125 if apiconfig: 1126 self._config = apiconfig 1127 else: 1128 self._config = config.settings() 1129 1130 self.num_retries = num_retries 1131 self._manifest_locator = None 1132 self._manifest_text = None 1133 self._portable_data_hash = None 1134 self._api_response = None 1135 self._past_versions = set() 1136 1137 self.lock = threading.RLock() 1138 self.events = None 1139 1140 if manifest_locator_or_text: 1141 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1142 self._manifest_locator = manifest_locator_or_text 1143 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1144 self._manifest_locator = manifest_locator_or_text 1145 if not self._has_local_collection_uuid(): 1146 self._has_remote_blocks = True 1147 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1148 self._manifest_text = manifest_locator_or_text 1149 if '+R' in self._manifest_text: 1150 self._has_remote_blocks = True 1151 else: 1152 raise errors.ArgumentError( 1153 "Argument to CollectionReader is not a manifest or a collection UUID") 1154 1155 try: 1156 self._populate() 1157 except errors.SyntaxError as e: 1158 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1159 1160 def storage_classes_desired(self) -> List[str]: 1161 """Get this collection's `storage_classes_desired` value""" 1162 return self._storage_classes_desired or [] 1163 1164 def root_collection(self) -> 'Collection': 1165 return self 1166 1167 def get_properties(self) -> Properties: 1168 """Get this collection's properties 1169 1170 This method always returns a dict. If this collection object does not 1171 have an associated API record, or that record does not have any 1172 properties set, this method returns an empty dict. 1173 """ 1174 if self._api_response and self._api_response["properties"]: 1175 return self._api_response["properties"] 1176 else: 1177 return {} 1178 1179 def get_trash_at(self) -> Optional[datetime.datetime]: 1180 """Get this collection's `trash_at` field 1181 1182 This method parses the `trash_at` field of the collection's API 1183 record and returns a datetime from it. If that field is not set, or 1184 this collection object does not have an associated API record, 1185 returns None. 1186 """ 1187 if self._api_response and self._api_response["trash_at"]: 1188 try: 1189 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1190 except ValueError: 1191 return None 1192 else: 1193 return None 1194 1195 def stream_name(self) -> str: 1196 return "." 1197 1198 def writable(self) -> bool: 1199 return True 1200 1201 @synchronized 1202 def known_past_version( 1203 self, 1204 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1205 ) -> bool: 1206 """Indicate whether an API record for this collection has been seen before 1207 1208 As this collection object loads records from the API server, it records 1209 their `modified_at` and `portable_data_hash` fields. This method accepts 1210 a 2-tuple with values for those fields, and returns `True` if the 1211 combination was previously loaded. 1212 """ 1213 return modified_at_and_portable_data_hash in self._past_versions 1214 1215 @synchronized 1216 @retry_method 1217 def update( 1218 self, 1219 other: Optional['Collection']=None, 1220 num_retries: Optional[int]=None, 1221 ) -> None: 1222 """Merge another collection's contents into this one 1223 1224 This method compares the manifest of this collection instance with 1225 another, then updates this instance's manifest with changes from the 1226 other, renaming files to flag conflicts where necessary. 1227 1228 When called without any arguments, this method reloads the collection's 1229 API record, and updates this instance with any changes that have 1230 appeared server-side. If this instance does not have a corresponding 1231 API record, this method raises `arvados.errors.ArgumentError`. 1232 1233 Arguments: 1234 1235 * other: arvados.collection.Collection | None --- The collection 1236 whose contents should be merged into this instance. When not 1237 provided, this method reloads this collection's API record and 1238 constructs a Collection object from it. If this instance does not 1239 have a corresponding API record, this method raises 1240 `arvados.errors.ArgumentError`. 1241 1242 * num_retries: int | None --- The number of times to retry reloading 1243 the collection's API record from the API server. If not specified, 1244 uses the `num_retries` provided when this instance was constructed. 1245 """ 1246 if other is None: 1247 if self._manifest_locator is None: 1248 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1249 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1250 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1251 response.get("portable_data_hash") != self.portable_data_hash()): 1252 # The record on the server is different from our current one, but we've seen it before, 1253 # so ignore it because it's already been merged. 1254 # However, if it's the same as our current record, proceed with the update, because we want to update 1255 # our tokens. 1256 return 1257 else: 1258 self._remember_api_response(response) 1259 other = CollectionReader(response["manifest_text"]) 1260 baseline = CollectionReader(self._manifest_text) 1261 self.apply(baseline.diff(other)) 1262 self._manifest_text = self.manifest_text() 1263 1264 @synchronized 1265 def _my_api(self): 1266 if self._api_client is None: 1267 self._api_client = ThreadSafeApiCache(self._config, version='v1') 1268 if self._keep_client is None: 1269 self._keep_client = self._api_client.keep 1270 return self._api_client 1271 1272 @synchronized 1273 def _my_keep(self): 1274 if self._keep_client is None: 1275 if self._api_client is None: 1276 self._my_api() 1277 else: 1278 self._keep_client = KeepClient(api_client=self._api_client) 1279 return self._keep_client 1280 1281 @synchronized 1282 def _my_block_manager(self): 1283 if self._block_manager is None: 1284 copies = (self.replication_desired or 1285 self._my_api()._rootDesc.get('defaultCollectionReplication', 1286 2)) 1287 self._block_manager = _BlockManager(self._my_keep(), 1288 copies=copies, 1289 put_threads=self.put_threads, 1290 num_retries=self.num_retries, 1291 storage_classes_func=self.storage_classes_desired) 1292 return self._block_manager 1293 1294 def _remember_api_response(self, response): 1295 self._api_response = response 1296 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1297 1298 def _populate_from_api_server(self): 1299 # As in KeepClient itself, we must wait until the last 1300 # possible moment to instantiate an API client, in order to 1301 # avoid tripping up clients that don't have access to an API 1302 # server. If we do build one, make sure our Keep client uses 1303 # it. If instantiation fails, we'll fall back to the except 1304 # clause, just like any other Collection lookup 1305 # failure. Return an exception, or None if successful. 1306 self._remember_api_response(self._my_api().collections().get( 1307 uuid=self._manifest_locator).execute( 1308 num_retries=self.num_retries)) 1309 self._manifest_text = self._api_response['manifest_text'] 1310 self._portable_data_hash = self._api_response['portable_data_hash'] 1311 # If not overriden via kwargs, we should try to load the 1312 # replication_desired and storage_classes_desired from the API server 1313 if self.replication_desired is None: 1314 self.replication_desired = self._api_response.get('replication_desired', None) 1315 if self._storage_classes_desired is None: 1316 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1317 1318 def _populate(self): 1319 if self._manifest_text is None: 1320 if self._manifest_locator is None: 1321 return 1322 else: 1323 self._populate_from_api_server() 1324 self._baseline_manifest = self._manifest_text 1325 self._import_manifest(self._manifest_text) 1326 1327 def _has_collection_uuid(self): 1328 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1329 1330 def _has_local_collection_uuid(self): 1331 return self._has_collection_uuid and \ 1332 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1333 1334 def __enter__(self): 1335 return self 1336 1337 def __exit__(self, exc_type, exc_value, traceback): 1338 """Exit a context with this collection instance 1339 1340 If no exception was raised inside the context block, and this 1341 collection is writable and has a corresponding API record, that 1342 record will be updated to match the state of this instance at the end 1343 of the block. 1344 """ 1345 if exc_type is None: 1346 if self.writable() and self._has_collection_uuid(): 1347 self.save() 1348 self.stop_threads() 1349 1350 def stop_threads(self) -> None: 1351 """Stop background Keep upload/download threads""" 1352 if self._block_manager is not None: 1353 self._block_manager.stop_threads() 1354 1355 @synchronized 1356 def manifest_locator(self) -> Optional[str]: 1357 """Get this collection's manifest locator, if any 1358 1359 * If this collection instance is associated with an API record with a 1360 UUID, return that. 1361 * Otherwise, if this collection instance was loaded from an API record 1362 by portable data hash, return that. 1363 * Otherwise, return `None`. 1364 """ 1365 return self._manifest_locator 1366 1367 @synchronized 1368 def clone( 1369 self, 1370 new_parent: Optional['Collection']=None, 1371 new_name: Optional[str]=None, 1372 readonly: bool=False, 1373 new_config: Optional[Mapping[str, str]]=None, 1374 ) -> 'Collection': 1375 """Create a Collection object with the same contents as this instance 1376 1377 This method creates a new Collection object with contents that match 1378 this instance's. The new collection will not be associated with any API 1379 record. 1380 1381 Arguments: 1382 1383 * new_parent: arvados.collection.Collection | None --- This value is 1384 passed to the new Collection's constructor as the `parent` 1385 argument. 1386 1387 * new_name: str | None --- This value is unused. 1388 1389 * readonly: bool --- If this value is true, this method constructs and 1390 returns a `CollectionReader`. Otherwise, it returns a mutable 1391 `Collection`. Default `False`. 1392 1393 * new_config: Mapping[str, str] | None --- This value is passed to the 1394 new Collection's constructor as `apiconfig`. If no value is provided, 1395 defaults to the configuration passed to this instance's constructor. 1396 """ 1397 if new_config is None: 1398 new_config = self._config 1399 if readonly: 1400 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1401 else: 1402 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1403 1404 newcollection._clonefrom(self) 1405 return newcollection 1406 1407 @synchronized 1408 def api_response(self) -> Optional[Dict[str, Any]]: 1409 """Get this instance's associated API record 1410 1411 If this Collection instance has an associated API record, return it. 1412 Otherwise, return `None`. 1413 """ 1414 return self._api_response 1415 1416 def find_or_create( 1417 self, 1418 path: str, 1419 create_type: CreateType, 1420 ) -> CollectionItem: 1421 if path == ".": 1422 return self 1423 else: 1424 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1425 1426 def find(self, path: str) -> CollectionItem: 1427 if path == ".": 1428 return self 1429 else: 1430 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1431 1432 def remove(self, path: str, recursive: bool=False) -> None: 1433 if path == ".": 1434 raise errors.ArgumentError("Cannot remove '.'") 1435 else: 1436 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1437 1438 @must_be_writable 1439 @synchronized 1440 @retry_method 1441 def save( 1442 self, 1443 properties: Optional[Properties]=None, 1444 storage_classes: Optional[StorageClasses]=None, 1445 trash_at: Optional[datetime.datetime]=None, 1446 merge: bool=True, 1447 num_retries: Optional[int]=None, 1448 preserve_version: bool=False, 1449 ) -> str: 1450 """Save collection to an existing API record 1451 1452 This method updates the instance's corresponding API record to match 1453 the instance's state. If this instance does not have a corresponding API 1454 record yet, raises `AssertionError`. (To create a new API record, use 1455 `Collection.save_new`.) This method returns the saved collection 1456 manifest. 1457 1458 Arguments: 1459 1460 * properties: dict[str, Any] | None --- If provided, the API record will 1461 be updated with these properties. Note this will completely replace 1462 any existing properties. 1463 1464 * storage_classes: list[str] | None --- If provided, the API record will 1465 be updated with this value in the `storage_classes_desired` field. 1466 This value will also be saved on the instance and used for any 1467 changes that follow. 1468 1469 * trash_at: datetime.datetime | None --- If provided, the API record 1470 will be updated with this value in the `trash_at` field. 1471 1472 * merge: bool --- If `True` (the default), this method will first 1473 reload this collection's API record, and merge any new contents into 1474 this instance before saving changes. See `Collection.update` for 1475 details. 1476 1477 * num_retries: int | None --- The number of times to retry reloading 1478 the collection's API record from the API server. If not specified, 1479 uses the `num_retries` provided when this instance was constructed. 1480 1481 * preserve_version: bool --- This value will be passed to directly 1482 to the underlying API call. If `True`, the Arvados API will 1483 preserve the versions of this collection both immediately before 1484 and after the update. If `True` when the API server is not 1485 configured with collection versioning, this method raises 1486 `arvados.errors.ArgumentError`. 1487 """ 1488 if properties and type(properties) is not dict: 1489 raise errors.ArgumentError("properties must be dictionary type.") 1490 1491 if storage_classes and type(storage_classes) is not list: 1492 raise errors.ArgumentError("storage_classes must be list type.") 1493 if storage_classes: 1494 self._storage_classes_desired = storage_classes 1495 1496 if trash_at and type(trash_at) is not datetime.datetime: 1497 raise errors.ArgumentError("trash_at must be datetime type.") 1498 1499 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1500 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1501 1502 body={} 1503 if properties: 1504 body["properties"] = properties 1505 if self.storage_classes_desired(): 1506 body["storage_classes_desired"] = self.storage_classes_desired() 1507 if trash_at: 1508 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1509 body["trash_at"] = t 1510 if preserve_version: 1511 body["preserve_version"] = preserve_version 1512 1513 if not self.committed(): 1514 if self._has_remote_blocks: 1515 # Copy any remote blocks to the local cluster. 1516 self._copy_remote_blocks(remote_blocks={}) 1517 self._has_remote_blocks = False 1518 if not self._has_collection_uuid(): 1519 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1520 elif not self._has_local_collection_uuid(): 1521 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1522 1523 self._my_block_manager().commit_all() 1524 1525 if merge: 1526 self.update() 1527 1528 text = self.manifest_text(strip=False) 1529 body['manifest_text'] = text 1530 1531 self._remember_api_response(self._my_api().collections().update( 1532 uuid=self._manifest_locator, 1533 body=body 1534 ).execute(num_retries=num_retries)) 1535 self._manifest_text = self._api_response["manifest_text"] 1536 self._portable_data_hash = self._api_response["portable_data_hash"] 1537 self.set_committed(True) 1538 elif body: 1539 self._remember_api_response(self._my_api().collections().update( 1540 uuid=self._manifest_locator, 1541 body=body 1542 ).execute(num_retries=num_retries)) 1543 1544 return self._manifest_text 1545 1546 1547 @must_be_writable 1548 @synchronized 1549 @retry_method 1550 def save_new( 1551 self, 1552 name: Optional[str]=None, 1553 create_collection_record: bool=True, 1554 owner_uuid: Optional[str]=None, 1555 properties: Optional[Properties]=None, 1556 storage_classes: Optional[StorageClasses]=None, 1557 trash_at: Optional[datetime.datetime]=None, 1558 ensure_unique_name: bool=False, 1559 num_retries: Optional[int]=None, 1560 preserve_version: bool=False, 1561 ): 1562 """Save collection to a new API record 1563 1564 This method finishes uploading new data blocks and (optionally) 1565 creates a new API collection record with the provided data. If a new 1566 record is created, this instance becomes associated with that record 1567 for future updates like `save()`. This method returns the saved 1568 collection manifest. 1569 1570 Arguments: 1571 1572 * name: str | None --- The `name` field to use on the new collection 1573 record. If not specified, a generic default name is generated. 1574 1575 * create_collection_record: bool --- If `True` (the default), creates a 1576 collection record on the API server. If `False`, the method finishes 1577 all data uploads and only returns the resulting collection manifest 1578 without sending it to the API server. 1579 1580 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1581 new collection record. 1582 1583 * properties: dict[str, Any] | None --- The `properties` field to use on 1584 the new collection record. 1585 1586 * storage_classes: list[str] | None --- The 1587 `storage_classes_desired` field to use on the new collection record. 1588 1589 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1590 on the new collection record. 1591 1592 * ensure_unique_name: bool --- This value is passed directly to the 1593 Arvados API when creating the collection record. If `True`, the API 1594 server may modify the submitted `name` to ensure the collection's 1595 `name`+`owner_uuid` combination is unique. If `False` (the default), 1596 if a collection already exists with this same `name`+`owner_uuid` 1597 combination, creating a collection record will raise a validation 1598 error. 1599 1600 * num_retries: int | None --- The number of times to retry reloading 1601 the collection's API record from the API server. If not specified, 1602 uses the `num_retries` provided when this instance was constructed. 1603 1604 * preserve_version: bool --- This value will be passed to directly 1605 to the underlying API call. If `True`, the Arvados API will 1606 preserve the versions of this collection both immediately before 1607 and after the update. If `True` when the API server is not 1608 configured with collection versioning, this method raises 1609 `arvados.errors.ArgumentError`. 1610 """ 1611 if properties and type(properties) is not dict: 1612 raise errors.ArgumentError("properties must be dictionary type.") 1613 1614 if storage_classes and type(storage_classes) is not list: 1615 raise errors.ArgumentError("storage_classes must be list type.") 1616 1617 if trash_at and type(trash_at) is not datetime.datetime: 1618 raise errors.ArgumentError("trash_at must be datetime type.") 1619 1620 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1621 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1622 1623 if self._has_remote_blocks: 1624 # Copy any remote blocks to the local cluster. 1625 self._copy_remote_blocks(remote_blocks={}) 1626 self._has_remote_blocks = False 1627 1628 if storage_classes: 1629 self._storage_classes_desired = storage_classes 1630 1631 self._my_block_manager().commit_all() 1632 text = self.manifest_text(strip=False) 1633 1634 if create_collection_record: 1635 if name is None: 1636 name = "New collection" 1637 ensure_unique_name = True 1638 1639 body = {"manifest_text": text, 1640 "name": name, 1641 "replication_desired": self.replication_desired} 1642 if owner_uuid: 1643 body["owner_uuid"] = owner_uuid 1644 if properties: 1645 body["properties"] = properties 1646 if self.storage_classes_desired(): 1647 body["storage_classes_desired"] = self.storage_classes_desired() 1648 if trash_at: 1649 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1650 body["trash_at"] = t 1651 if preserve_version: 1652 body["preserve_version"] = preserve_version 1653 1654 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1655 text = self._api_response["manifest_text"] 1656 1657 self._manifest_locator = self._api_response["uuid"] 1658 self._portable_data_hash = self._api_response["portable_data_hash"] 1659 1660 self._manifest_text = text 1661 self.set_committed(True) 1662 1663 return text 1664 1665 _token_re = re.compile(r'(\S+)(\s+|$)') 1666 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1667 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1668 1669 def _unescape_manifest_path(self, path): 1670 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1671 1672 @synchronized 1673 def _import_manifest(self, manifest_text): 1674 """Import a manifest into a `Collection`. 1675 1676 :manifest_text: 1677 The manifest text to import from. 1678 1679 """ 1680 if len(self) > 0: 1681 raise ArgumentError("Can only import manifest into an empty collection") 1682 1683 STREAM_NAME = 0 1684 BLOCKS = 1 1685 SEGMENTS = 2 1686 1687 stream_name = None 1688 state = STREAM_NAME 1689 1690 for token_and_separator in self._token_re.finditer(manifest_text): 1691 tok = token_and_separator.group(1) 1692 sep = token_and_separator.group(2) 1693 1694 if state == STREAM_NAME: 1695 # starting a new stream 1696 stream_name = self._unescape_manifest_path(tok) 1697 blocks = [] 1698 segments = [] 1699 streamoffset = 0 1700 state = BLOCKS 1701 self.find_or_create(stream_name, COLLECTION) 1702 continue 1703 1704 if state == BLOCKS: 1705 block_locator = self._block_re.match(tok) 1706 if block_locator: 1707 blocksize = int(block_locator.group(1)) 1708 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1709 streamoffset += blocksize 1710 else: 1711 state = SEGMENTS 1712 1713 if state == SEGMENTS: 1714 file_segment = self._segment_re.match(tok) 1715 if file_segment: 1716 pos = int(file_segment.group(1)) 1717 size = int(file_segment.group(2)) 1718 name = self._unescape_manifest_path(file_segment.group(3)) 1719 if name.split('/')[-1] == '.': 1720 # placeholder for persisting an empty directory, not a real file 1721 if len(name) > 2: 1722