arvados.collection
Tools to work with Arvados collections
This module provides high-level interfaces to create, read, and update
Arvados collections. Most users will want to instantiate Collection
objects, and use methods like Collection.open
and Collection.mkdirs
to
read and write data in the collection. Refer to the Arvados Python SDK
cookbook for an introduction to using the Collection class.
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Tools to work with Arvados collections 5 6This module provides high-level interfaces to create, read, and update 7Arvados collections. Most users will want to instantiate `Collection` 8objects, and use methods like `Collection.open` and `Collection.mkdirs` to 9read and write data in the collection. Refer to the Arvados Python SDK 10cookbook for [an introduction to using the Collection class][cookbook]. 11 12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 13""" 14 15from __future__ import absolute_import 16from future.utils import listitems, listvalues, viewkeys 17from builtins import str 18from past.builtins import basestring 19from builtins import object 20import ciso8601 21import datetime 22import errno 23import functools 24import hashlib 25import io 26import logging 27import os 28import re 29import sys 30import threading 31import time 32 33from collections import deque 34from stat import * 35 36from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock 37from .keep import KeepLocator, KeepClient 38from .stream import StreamReader 39from ._normalize_stream import normalize_stream, escape 40from ._ranges import Range, LocatorAndRange 41from .safeapi import ThreadSafeApiCache 42import arvados.config as config 43import arvados.errors as errors 44import arvados.util 45import arvados.events as events 46from arvados.retry import retry_method 47 48from typing import ( 49 Any, 50 Callable, 51 Dict, 52 IO, 53 Iterator, 54 List, 55 Mapping, 56 Optional, 57 Tuple, 58 Union, 59) 60 61if sys.version_info < (3, 8): 62 from typing_extensions import Literal 63else: 64 from typing import Literal 65 66_logger = logging.getLogger('arvados.collection') 67 68ADD = "add" 69"""Argument value for `Collection` methods to represent an added item""" 70DEL = "del" 71"""Argument value for `Collection` methods to represent a removed item""" 72MOD = "mod" 73"""Argument value for `Collection` methods to represent a modified item""" 74TOK = "tok" 75"""Argument value for `Collection` methods to represent an item with token differences""" 76FILE = "file" 77"""`create_type` value for `Collection.find_or_create`""" 78COLLECTION = "collection" 79"""`create_type` value for `Collection.find_or_create`""" 80 81ChangeList = List[Union[ 82 Tuple[Literal[ADD, DEL], str, 'Collection'], 83 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'], 84]] 85ChangeType = Literal[ADD, DEL, MOD, TOK] 86CollectionItem = Union[ArvadosFile, 'Collection'] 87ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object] 88CreateType = Literal[COLLECTION, FILE] 89Properties = Dict[str, Any] 90StorageClasses = List[str] 91 92class CollectionBase(object): 93 """Abstract base class for Collection classes 94 95 .. ATTENTION:: Internal 96 This class is meant to be used by other parts of the SDK. User code 97 should instantiate or subclass `Collection` or one of its subclasses 98 directly. 99 """ 100 101 def __enter__(self): 102 """Enter a context block with this collection instance""" 103 return self 104 105 def __exit__(self, exc_type, exc_value, traceback): 106 """Exit a context block with this collection instance""" 107 pass 108 109 def _my_keep(self): 110 if self._keep_client is None: 111 self._keep_client = KeepClient(api_client=self._api_client, 112 num_retries=self.num_retries) 113 return self._keep_client 114 115 def stripped_manifest(self) -> str: 116 """Create a copy of the collection manifest with only size hints 117 118 This method returns a string with the current collection's manifest 119 text with all non-portable locator hints like permission hints and 120 remote cluster hints removed. The only hints in the returned manifest 121 will be size hints. 122 """ 123 raw = self.manifest_text() 124 clean = [] 125 for line in raw.split("\n"): 126 fields = line.split() 127 if fields: 128 clean_fields = fields[:1] + [ 129 (re.sub(r'\+[^\d][^\+]*', '', x) 130 if re.match(arvados.util.keep_locator_pattern, x) 131 else x) 132 for x in fields[1:]] 133 clean += [' '.join(clean_fields), "\n"] 134 return ''.join(clean) 135 136 137class _WriterFile(_FileLikeObjectBase): 138 def __init__(self, coll_writer, name): 139 super(_WriterFile, self).__init__(name, 'wb') 140 self.dest = coll_writer 141 142 def close(self): 143 super(_WriterFile, self).close() 144 self.dest.finish_current_file() 145 146 @_FileLikeObjectBase._before_close 147 def write(self, data): 148 self.dest.write(data) 149 150 @_FileLikeObjectBase._before_close 151 def writelines(self, seq): 152 for data in seq: 153 self.write(data) 154 155 @_FileLikeObjectBase._before_close 156 def flush(self): 157 self.dest.flush_data() 158 159 160class RichCollectionBase(CollectionBase): 161 """Base class for Collection classes 162 163 .. ATTENTION:: Internal 164 This class is meant to be used by other parts of the SDK. User code 165 should instantiate or subclass `Collection` or one of its subclasses 166 directly. 167 """ 168 169 def __init__(self, parent=None): 170 self.parent = parent 171 self._committed = False 172 self._has_remote_blocks = False 173 self._callback = None 174 self._items = {} 175 176 def _my_api(self): 177 raise NotImplementedError() 178 179 def _my_keep(self): 180 raise NotImplementedError() 181 182 def _my_block_manager(self): 183 raise NotImplementedError() 184 185 def writable(self) -> bool: 186 """Indicate whether this collection object can be modified 187 188 This method returns `False` if this object is a `CollectionReader`, 189 else `True`. 190 """ 191 raise NotImplementedError() 192 193 def root_collection(self) -> 'Collection': 194 """Get this collection's root collection object 195 196 If you open a subcollection with `Collection.find`, calling this method 197 on that subcollection returns the source Collection object. 198 """ 199 raise NotImplementedError() 200 201 def stream_name(self) -> str: 202 """Get the name of the manifest stream represented by this collection 203 204 If you open a subcollection with `Collection.find`, calling this method 205 on that subcollection returns the name of the stream you opened. 206 """ 207 raise NotImplementedError() 208 209 @synchronized 210 def has_remote_blocks(self) -> bool: 211 """Indiciate whether the collection refers to remote data 212 213 Returns `True` if the collection manifest includes any Keep locators 214 with a remote hint (`+R`), else `False`. 215 """ 216 if self._has_remote_blocks: 217 return True 218 for item in self: 219 if self[item].has_remote_blocks(): 220 return True 221 return False 222 223 @synchronized 224 def set_has_remote_blocks(self, val: bool) -> None: 225 """Cache whether this collection refers to remote blocks 226 227 .. ATTENTION:: Internal 228 This method is only meant to be used by other Collection methods. 229 230 Set this collection's cached "has remote blocks" flag to the given 231 value. 232 """ 233 self._has_remote_blocks = val 234 if self.parent: 235 self.parent.set_has_remote_blocks(val) 236 237 @must_be_writable 238 @synchronized 239 def find_or_create( 240 self, 241 path: str, 242 create_type: CreateType, 243 ) -> CollectionItem: 244 """Get the item at the given path, creating it if necessary 245 246 If `path` refers to a stream in this collection, returns a 247 corresponding `Subcollection` object. If `path` refers to a file in 248 this collection, returns a corresponding 249 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 250 this collection, then this method creates a new object and returns 251 it, creating parent streams as needed. The type of object created is 252 determined by the value of `create_type`. 253 254 Arguments: 255 256 * path: str --- The path to find or create within this collection. 257 258 * create_type: Literal[COLLECTION, FILE] --- The type of object to 259 create at `path` if one does not exist. Passing `COLLECTION` 260 creates a stream and returns the corresponding 261 `Subcollection`. Passing `FILE` creates a new file and returns the 262 corresponding `arvados.arvfile.ArvadosFile`. 263 """ 264 pathcomponents = path.split("/", 1) 265 if pathcomponents[0]: 266 item = self._items.get(pathcomponents[0]) 267 if len(pathcomponents) == 1: 268 if item is None: 269 # create new file 270 if create_type == COLLECTION: 271 item = Subcollection(self, pathcomponents[0]) 272 else: 273 item = ArvadosFile(self, pathcomponents[0]) 274 self._items[pathcomponents[0]] = item 275 self.set_committed(False) 276 self.notify(ADD, self, pathcomponents[0], item) 277 return item 278 else: 279 if item is None: 280 # create new collection 281 item = Subcollection(self, pathcomponents[0]) 282 self._items[pathcomponents[0]] = item 283 self.set_committed(False) 284 self.notify(ADD, self, pathcomponents[0], item) 285 if isinstance(item, RichCollectionBase): 286 return item.find_or_create(pathcomponents[1], create_type) 287 else: 288 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 289 else: 290 return self 291 292 @synchronized 293 def find(self, path: str) -> CollectionItem: 294 """Get the item at the given path 295 296 If `path` refers to a stream in this collection, returns a 297 corresponding `Subcollection` object. If `path` refers to a file in 298 this collection, returns a corresponding 299 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 300 this collection, then this method raises `NotADirectoryError`. 301 302 Arguments: 303 304 * path: str --- The path to find or create within this collection. 305 """ 306 if not path: 307 raise errors.ArgumentError("Parameter 'path' is empty.") 308 309 pathcomponents = path.split("/", 1) 310 if pathcomponents[0] == '': 311 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 312 313 item = self._items.get(pathcomponents[0]) 314 if item is None: 315 return None 316 elif len(pathcomponents) == 1: 317 return item 318 else: 319 if isinstance(item, RichCollectionBase): 320 if pathcomponents[1]: 321 return item.find(pathcomponents[1]) 322 else: 323 return item 324 else: 325 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 326 327 @synchronized 328 def mkdirs(self, path: str) -> 'Subcollection': 329 """Create and return a subcollection at `path` 330 331 If `path` exists within this collection, raises `FileExistsError`. 332 Otherwise, creates a stream at that path and returns the 333 corresponding `Subcollection`. 334 """ 335 if self.find(path) != None: 336 raise IOError(errno.EEXIST, "Directory or file exists", path) 337 338 return self.find_or_create(path, COLLECTION) 339 340 def open( 341 self, 342 path: str, 343 mode: str="r", 344 encoding: Optional[str]=None, 345 ) -> IO: 346 """Open a file-like object within the collection 347 348 This method returns a file-like object that can read and/or write the 349 file located at `path` within the collection. If you attempt to write 350 a `path` that does not exist, the file is created with `find_or_create`. 351 If the file cannot be opened for any other reason, this method raises 352 `OSError` with an appropriate errno. 353 354 Arguments: 355 356 * path: str --- The path of the file to open within this collection 357 358 * mode: str --- The mode to open this file. Supports all the same 359 values as `builtins.open`. 360 361 * encoding: str | None --- The text encoding of the file. Only used 362 when the file is opened in text mode. The default is 363 platform-dependent. 364 """ 365 if not re.search(r'^[rwa][bt]?\+?$', mode): 366 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 367 368 if mode[0] == 'r' and '+' not in mode: 369 fclass = ArvadosFileReader 370 arvfile = self.find(path) 371 elif not self.writable(): 372 raise IOError(errno.EROFS, "Collection is read only") 373 else: 374 fclass = ArvadosFileWriter 375 arvfile = self.find_or_create(path, FILE) 376 377 if arvfile is None: 378 raise IOError(errno.ENOENT, "File not found", path) 379 if not isinstance(arvfile, ArvadosFile): 380 raise IOError(errno.EISDIR, "Is a directory", path) 381 382 if mode[0] == 'w': 383 arvfile.truncate(0) 384 385 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 386 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 387 if 'b' not in mode: 388 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 389 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 390 return f 391 392 def modified(self) -> bool: 393 """Indicate whether this collection has an API server record 394 395 Returns `False` if this collection corresponds to a record loaded from 396 the API server, `True` otherwise. 397 """ 398 return not self.committed() 399 400 @synchronized 401 def committed(self): 402 """Indicate whether this collection has an API server record 403 404 Returns `True` if this collection corresponds to a record loaded from 405 the API server, `False` otherwise. 406 """ 407 return self._committed 408 409 @synchronized 410 def set_committed(self, value: bool=True): 411 """Cache whether this collection has an API server record 412 413 .. ATTENTION:: Internal 414 This method is only meant to be used by other Collection methods. 415 416 Set this collection's cached "committed" flag to the given 417 value and propagates it as needed. 418 """ 419 if value == self._committed: 420 return 421 if value: 422 for k,v in listitems(self._items): 423 v.set_committed(True) 424 self._committed = True 425 else: 426 self._committed = False 427 if self.parent is not None: 428 self.parent.set_committed(False) 429 430 @synchronized 431 def __iter__(self) -> Iterator[str]: 432 """Iterate names of streams and files in this collection 433 434 This method does not recurse. It only iterates the contents of this 435 collection's corresponding stream. 436 """ 437 return iter(viewkeys(self._items)) 438 439 @synchronized 440 def __getitem__(self, k: str) -> CollectionItem: 441 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 442 443 This method does not recurse. If you want to search a path, use 444 `RichCollectionBase.find` instead. 445 """ 446 return self._items[k] 447 448 @synchronized 449 def __contains__(self, k: str) -> bool: 450 """Indicate whether this collection has an item with this name 451 452 This method does not recurse. It you want to check a path, use 453 `RichCollectionBase.exists` instead. 454 """ 455 return k in self._items 456 457 @synchronized 458 def __len__(self): 459 """Get the number of items directly contained in this collection 460 461 This method does not recurse. It only counts the streams and files 462 in this collection's corresponding stream. 463 """ 464 return len(self._items) 465 466 @must_be_writable 467 @synchronized 468 def __delitem__(self, p: str) -> None: 469 """Delete an item from this collection's stream 470 471 This method does not recurse. If you want to remove an item by a 472 path, use `RichCollectionBase.remove` instead. 473 """ 474 del self._items[p] 475 self.set_committed(False) 476 self.notify(DEL, self, p, None) 477 478 @synchronized 479 def keys(self) -> Iterator[str]: 480 """Iterate names of streams and files in this collection 481 482 This method does not recurse. It only iterates the contents of this 483 collection's corresponding stream. 484 """ 485 return self._items.keys() 486 487 @synchronized 488 def values(self) -> List[CollectionItem]: 489 """Get a list of objects in this collection's stream 490 491 The return value includes a `Subcollection` for every stream, and an 492 `arvados.arvfile.ArvadosFile` for every file, directly within this 493 collection's stream. This method does not recurse. 494 """ 495 return listvalues(self._items) 496 497 @synchronized 498 def items(self) -> List[Tuple[str, CollectionItem]]: 499 """Get a list of `(name, object)` tuples from this collection's stream 500 501 The return value includes a `Subcollection` for every stream, and an 502 `arvados.arvfile.ArvadosFile` for every file, directly within this 503 collection's stream. This method does not recurse. 504 """ 505 return listitems(self._items) 506 507 def exists(self, path: str) -> bool: 508 """Indicate whether this collection includes an item at `path` 509 510 This method returns `True` if `path` refers to a stream or file within 511 this collection, else `False`. 512 513 Arguments: 514 515 * path: str --- The path to check for existence within this collection 516 """ 517 return self.find(path) is not None 518 519 @must_be_writable 520 @synchronized 521 def remove(self, path: str, recursive: bool=False) -> None: 522 """Remove the file or stream at `path` 523 524 Arguments: 525 526 * path: str --- The path of the item to remove from the collection 527 528 * recursive: bool --- Controls the method's behavior if `path` refers 529 to a nonempty stream. If `False` (the default), this method raises 530 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 531 items under the stream. 532 """ 533 if not path: 534 raise errors.ArgumentError("Parameter 'path' is empty.") 535 536 pathcomponents = path.split("/", 1) 537 item = self._items.get(pathcomponents[0]) 538 if item is None: 539 raise IOError(errno.ENOENT, "File not found", path) 540 if len(pathcomponents) == 1: 541 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 542 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 543 deleteditem = self._items[pathcomponents[0]] 544 del self._items[pathcomponents[0]] 545 self.set_committed(False) 546 self.notify(DEL, self, pathcomponents[0], deleteditem) 547 else: 548 item.remove(pathcomponents[1], recursive=recursive) 549 550 def _clonefrom(self, source): 551 for k,v in listitems(source): 552 self._items[k] = v.clone(self, k) 553 554 def clone(self): 555 raise NotImplementedError() 556 557 @must_be_writable 558 @synchronized 559 def add( 560 self, 561 source_obj: CollectionItem, 562 target_name: str, 563 overwrite: bool=False, 564 reparent: bool=False, 565 ) -> None: 566 """Copy or move a file or subcollection object to this collection 567 568 Arguments: 569 570 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 571 to add to this collection 572 573 * target_name: str --- The path inside this collection where 574 `source_obj` should be added. 575 576 * overwrite: bool --- Controls the behavior of this method when the 577 collection already contains an object at `target_name`. If `False` 578 (the default), this method will raise `FileExistsError`. If `True`, 579 the object at `target_name` will be replaced with `source_obj`. 580 581 * reparent: bool --- Controls whether this method copies or moves 582 `source_obj`. If `False` (the default), `source_obj` is copied into 583 this collection. If `True`, `source_obj` is moved into this 584 collection. 585 """ 586 if target_name in self and not overwrite: 587 raise IOError(errno.EEXIST, "File already exists", target_name) 588 589 modified_from = None 590 if target_name in self: 591 modified_from = self[target_name] 592 593 # Actually make the move or copy. 594 if reparent: 595 source_obj._reparent(self, target_name) 596 item = source_obj 597 else: 598 item = source_obj.clone(self, target_name) 599 600 self._items[target_name] = item 601 self.set_committed(False) 602 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 603 self.set_has_remote_blocks(True) 604 605 if modified_from: 606 self.notify(MOD, self, target_name, (modified_from, item)) 607 else: 608 self.notify(ADD, self, target_name, item) 609 610 def _get_src_target(self, source, target_path, source_collection, create_dest): 611 if source_collection is None: 612 source_collection = self 613 614 # Find the object 615 if isinstance(source, basestring): 616 source_obj = source_collection.find(source) 617 if source_obj is None: 618 raise IOError(errno.ENOENT, "File not found", source) 619 sourcecomponents = source.split("/") 620 else: 621 source_obj = source 622 sourcecomponents = None 623 624 # Find parent collection the target path 625 targetcomponents = target_path.split("/") 626 627 # Determine the name to use. 628 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 629 630 if not target_name: 631 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 632 633 if create_dest: 634 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 635 else: 636 if len(targetcomponents) > 1: 637 target_dir = self.find("/".join(targetcomponents[0:-1])) 638 else: 639 target_dir = self 640 641 if target_dir is None: 642 raise IOError(errno.ENOENT, "Target directory not found", target_name) 643 644 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 645 target_dir = target_dir[target_name] 646 target_name = sourcecomponents[-1] 647 648 return (source_obj, target_dir, target_name) 649 650 @must_be_writable 651 @synchronized 652 def copy( 653 self, 654 source: Union[str, CollectionItem], 655 target_path: str, 656 source_collection: Optional['RichCollectionBase']=None, 657 overwrite: bool=False, 658 ) -> None: 659 """Copy a file or subcollection object to this collection 660 661 Arguments: 662 663 * source: str | arvados.arvfile.ArvadosFile | 664 arvados.collection.Subcollection --- The file or subcollection to 665 add to this collection. If `source` is a str, the object will be 666 found by looking up this path from `source_collection` (see 667 below). 668 669 * target_path: str --- The path inside this collection where the 670 source object should be added. 671 672 * source_collection: arvados.collection.Collection | None --- The 673 collection to find the source object from when `source` is a 674 path. Defaults to the current collection (`self`). 675 676 * overwrite: bool --- Controls the behavior of this method when the 677 collection already contains an object at `target_path`. If `False` 678 (the default), this method will raise `FileExistsError`. If `True`, 679 the object at `target_path` will be replaced with `source_obj`. 680 """ 681 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 682 target_dir.add(source_obj, target_name, overwrite, False) 683 684 @must_be_writable 685 @synchronized 686 def rename( 687 self, 688 source: Union[str, CollectionItem], 689 target_path: str, 690 source_collection: Optional['RichCollectionBase']=None, 691 overwrite: bool=False, 692 ) -> None: 693 """Move a file or subcollection object to this collection 694 695 Arguments: 696 697 * source: str | arvados.arvfile.ArvadosFile | 698 arvados.collection.Subcollection --- The file or subcollection to 699 add to this collection. If `source` is a str, the object will be 700 found by looking up this path from `source_collection` (see 701 below). 702 703 * target_path: str --- The path inside this collection where the 704 source object should be added. 705 706 * source_collection: arvados.collection.Collection | None --- The 707 collection to find the source object from when `source` is a 708 path. Defaults to the current collection (`self`). 709 710 * overwrite: bool --- Controls the behavior of this method when the 711 collection already contains an object at `target_path`. If `False` 712 (the default), this method will raise `FileExistsError`. If `True`, 713 the object at `target_path` will be replaced with `source_obj`. 714 """ 715 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 716 if not source_obj.writable(): 717 raise IOError(errno.EROFS, "Source collection is read only", source) 718 target_dir.add(source_obj, target_name, overwrite, True) 719 720 def portable_manifest_text(self, stream_name: str=".") -> str: 721 """Get the portable manifest text for this collection 722 723 The portable manifest text is normalized, and does not include access 724 tokens. This method does not flush outstanding blocks to Keep. 725 726 Arguments: 727 728 * stream_name: str --- The name to use for this collection's stream in 729 the generated manifest. Default `'.'`. 730 """ 731 return self._get_manifest_text(stream_name, True, True) 732 733 @synchronized 734 def manifest_text( 735 self, 736 stream_name: str=".", 737 strip: bool=False, 738 normalize: bool=False, 739 only_committed: bool=False, 740 ) -> str: 741 """Get the manifest text for this collection 742 743 Arguments: 744 745 * stream_name: str --- The name to use for this collection's stream in 746 the generated manifest. Default `'.'`. 747 748 * strip: bool --- Controls whether or not the returned manifest text 749 includes access tokens. If `False` (the default), the manifest text 750 will include access tokens. If `True`, the manifest text will not 751 include access tokens. 752 753 * normalize: bool --- Controls whether or not the returned manifest 754 text is normalized. Default `False`. 755 756 * only_committed: bool --- Controls whether or not this method uploads 757 pending data to Keep before building and returning the manifest text. 758 If `False` (the default), this method will finish uploading all data 759 to Keep, then return the final manifest. If `True`, this method will 760 build and return a manifest that only refers to the data that has 761 finished uploading at the time this method was called. 762 """ 763 if not only_committed: 764 self._my_block_manager().commit_all() 765 return self._get_manifest_text(stream_name, strip, normalize, 766 only_committed=only_committed) 767 768 @synchronized 769 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 770 """Get the manifest text for this collection, sub collections and files. 771 772 :stream_name: 773 Name to use for this stream (directory) 774 775 :strip: 776 If True, remove signing tokens from block locators if present. 777 If False (default), block locators are left unchanged. 778 779 :normalize: 780 If True, always export the manifest text in normalized form 781 even if the Collection is not modified. If False (default) and the collection 782 is not modified, return the original manifest text even if it is not 783 in normalized form. 784 785 :only_committed: 786 If True, only include blocks that were already committed to Keep. 787 788 """ 789 790 if not self.committed() or self._manifest_text is None or normalize: 791 stream = {} 792 buf = [] 793 sorted_keys = sorted(self.keys()) 794 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 795 # Create a stream per file `k` 796 arvfile = self[filename] 797 filestream = [] 798 for segment in arvfile.segments(): 799 loc = segment.locator 800 if arvfile.parent._my_block_manager().is_bufferblock(loc): 801 if only_committed: 802 continue 803 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 804 if strip: 805 loc = KeepLocator(loc).stripped() 806 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 807 segment.segment_offset, segment.range_size)) 808 stream[filename] = filestream 809 if stream: 810 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 811 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 812 buf.append(self[dirname].manifest_text( 813 stream_name=os.path.join(stream_name, dirname), 814 strip=strip, normalize=True, only_committed=only_committed)) 815 return "".join(buf) 816 else: 817 if strip: 818 return self.stripped_manifest() 819 else: 820 return self._manifest_text 821 822 @synchronized 823 def _copy_remote_blocks(self, remote_blocks={}): 824 """Scan through the entire collection and ask Keep to copy remote blocks. 825 826 When accessing a remote collection, blocks will have a remote signature 827 (+R instead of +A). Collect these signatures and request Keep to copy the 828 blocks to the local cluster, returning local (+A) signatures. 829 830 :remote_blocks: 831 Shared cache of remote to local block mappings. This is used to avoid 832 doing extra work when blocks are shared by more than one file in 833 different subdirectories. 834 835 """ 836 for item in self: 837 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 838 return remote_blocks 839 840 @synchronized 841 def diff( 842 self, 843 end_collection: 'RichCollectionBase', 844 prefix: str=".", 845 holding_collection: Optional['Collection']=None, 846 ) -> ChangeList: 847 """Build a list of differences between this collection and another 848 849 Arguments: 850 851 * end_collection: arvados.collection.RichCollectionBase --- A 852 collection object with the desired end state. The returned diff 853 list will describe how to go from the current collection object 854 `self` to `end_collection`. 855 856 * prefix: str --- The name to use for this collection's stream in 857 the diff list. Default `'.'`. 858 859 * holding_collection: arvados.collection.Collection | None --- A 860 collection object used to hold objects for the returned diff 861 list. By default, a new empty collection is created. 862 """ 863 changes = [] 864 if holding_collection is None: 865 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 866 for k in self: 867 if k not in end_collection: 868 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 869 for k in end_collection: 870 if k in self: 871 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 872 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 873 elif end_collection[k] != self[k]: 874 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 875 else: 876 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 877 else: 878 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 879 return changes 880 881 @must_be_writable 882 @synchronized 883 def apply(self, changes: ChangeList) -> None: 884 """Apply a list of changes from to this collection 885 886 This method takes a list of changes generated by 887 `RichCollectionBase.diff` and applies it to this 888 collection. Afterward, the state of this collection object will 889 match the state of `end_collection` passed to `diff`. If a change 890 conflicts with a local change, it will be saved to an alternate path 891 indicating the conflict. 892 893 Arguments: 894 895 * changes: arvados.collection.ChangeList --- The list of differences 896 generated by `RichCollectionBase.diff`. 897 """ 898 if changes: 899 self.set_committed(False) 900 for change in changes: 901 event_type = change[0] 902 path = change[1] 903 initial = change[2] 904 local = self.find(path) 905 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 906 time.gmtime())) 907 if event_type == ADD: 908 if local is None: 909 # No local file at path, safe to copy over new file 910 self.copy(initial, path) 911 elif local is not None and local != initial: 912 # There is already local file and it is different: 913 # save change to conflict file. 914 self.copy(initial, conflictpath) 915 elif event_type == MOD or event_type == TOK: 916 final = change[3] 917 if local == initial: 918 # Local matches the "initial" item so it has not 919 # changed locally and is safe to update. 920 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 921 # Replace contents of local file with new contents 922 local.replace_contents(final) 923 else: 924 # Overwrite path with new item; this can happen if 925 # path was a file and is now a collection or vice versa 926 self.copy(final, path, overwrite=True) 927 else: 928 # Local is missing (presumably deleted) or local doesn't 929 # match the "start" value, so save change to conflict file 930 self.copy(final, conflictpath) 931 elif event_type == DEL: 932 if local == initial: 933 # Local item matches "initial" value, so it is safe to remove. 934 self.remove(path, recursive=True) 935 # else, the file is modified or already removed, in either 936 # case we don't want to try to remove it. 937 938 def portable_data_hash(self) -> str: 939 """Get the portable data hash for this collection's manifest""" 940 if self._manifest_locator and self.committed(): 941 # If the collection is already saved on the API server, and it's committed 942 # then return API server's PDH response. 943 return self._portable_data_hash 944 else: 945 stripped = self.portable_manifest_text().encode() 946 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 947 948 @synchronized 949 def subscribe(self, callback: ChangeCallback) -> None: 950 """Set a notify callback for changes to this collection 951 952 Arguments: 953 954 * callback: arvados.collection.ChangeCallback --- The callable to 955 call each time the collection is changed. 956 """ 957 if self._callback is None: 958 self._callback = callback 959 else: 960 raise errors.ArgumentError("A callback is already set on this collection.") 961 962 @synchronized 963 def unsubscribe(self) -> None: 964 """Remove any notify callback set for changes to this collection""" 965 if self._callback is not None: 966 self._callback = None 967 968 @synchronized 969 def notify( 970 self, 971 event: ChangeType, 972 collection: 'RichCollectionBase', 973 name: str, 974 item: CollectionItem, 975 ) -> None: 976 """Notify any subscribed callback about a change to this collection 977 978 .. ATTENTION:: Internal 979 This method is only meant to be used by other Collection methods. 980 981 If a callback has been registered with `RichCollectionBase.subscribe`, 982 it will be called with information about a change to this collection. 983 Then this notification will be propagated to this collection's root. 984 985 Arguments: 986 987 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 988 the collection. 989 990 * collection: arvados.collection.RichCollectionBase --- The 991 collection that was modified. 992 993 * name: str --- The name of the file or stream within `collection` that 994 was modified. 995 996 * item: arvados.arvfile.ArvadosFile | 997 arvados.collection.Subcollection --- The new contents at `name` 998 within `collection`. 999 """ 1000 if self._callback: 1001 self._callback(event, collection, name, item) 1002 self.root_collection().notify(event, collection, name, item) 1003 1004 @synchronized 1005 def __eq__(self, other: Any) -> bool: 1006 """Indicate whether this collection object is equal to another""" 1007 if other is self: 1008 return True 1009 if not isinstance(other, RichCollectionBase): 1010 return False 1011 if len(self._items) != len(other): 1012 return False 1013 for k in self._items: 1014 if k not in other: 1015 return False 1016 if self._items[k] != other[k]: 1017 return False 1018 return True 1019 1020 def __ne__(self, other: Any) -> bool: 1021 """Indicate whether this collection object is not equal to another""" 1022 return not self.__eq__(other) 1023 1024 @synchronized 1025 def flush(self) -> None: 1026 """Upload any pending data to Keep""" 1027 for e in listvalues(self): 1028 e.flush() 1029 1030 1031class Collection(RichCollectionBase): 1032 """Read and manipulate an Arvados collection 1033 1034 This class provides a high-level interface to create, read, and update 1035 Arvados collections and their contents. Refer to the Arvados Python SDK 1036 cookbook for [an introduction to using the Collection class][cookbook]. 1037 1038 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1039 """ 1040 1041 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1042 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1043 keep_client: Optional['arvados.keep.KeepClient']=None, 1044 num_retries: int=10, 1045 parent: Optional['Collection']=None, 1046 apiconfig: Optional[Mapping[str, str]]=None, 1047 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1048 replication_desired: Optional[int]=None, 1049 storage_classes_desired: Optional[List[str]]=None, 1050 put_threads: Optional[int]=None): 1051 """Initialize a Collection object 1052 1053 Arguments: 1054 1055 * manifest_locator_or_text: str | None --- This string can contain a 1056 collection manifest text, portable data hash, or UUID. When given a 1057 portable data hash or UUID, this instance will load a collection 1058 record from the API server. Otherwise, this instance will represent a 1059 new collection without an API server record. The default value `None` 1060 instantiates a new collection with an empty manifest. 1061 1062 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1063 Arvados API client object this instance uses to make requests. If 1064 none is given, this instance creates its own client using the 1065 settings from `apiconfig` (see below). If your client instantiates 1066 many Collection objects, you can help limit memory utilization by 1067 calling `arvados.api.api` to construct an 1068 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1069 for every Collection. 1070 1071 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1072 object this instance uses to make requests. If none is given, this 1073 instance creates its own client using its `api_client`. 1074 1075 * num_retries: int --- The number of times that client requests are 1076 retried. Default 10. 1077 1078 * parent: arvados.collection.Collection | None --- The parent Collection 1079 object of this instance, if any. This argument is primarily used by 1080 other Collection methods; user client code shouldn't need to use it. 1081 1082 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1083 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1084 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1085 Collection object constructs one from these settings. If no 1086 mapping is provided, calls `arvados.config.settings` to get these 1087 parameters from user configuration. 1088 1089 * block_manager: arvados.arvfile._BlockManager | None --- The 1090 _BlockManager object used by this instance to coordinate reading 1091 and writing Keep data blocks. If none is given, this instance 1092 constructs its own. This argument is primarily used by other 1093 Collection methods; user client code shouldn't need to use it. 1094 1095 * replication_desired: int | None --- This controls both the value of 1096 the `replication_desired` field on API collection records saved by 1097 this class, as well as the number of Keep services that the object 1098 writes new data blocks to. If none is given, uses the default value 1099 configured for the cluster. 1100 1101 * storage_classes_desired: list[str] | None --- This controls both 1102 the value of the `storage_classes_desired` field on API collection 1103 records saved by this class, as well as selecting which specific 1104 Keep services the object writes new data blocks to. If none is 1105 given, defaults to an empty list. 1106 1107 * put_threads: int | None --- The number of threads to run 1108 simultaneously to upload data blocks to Keep. This value is used when 1109 building a new `block_manager`. It is unused when a `block_manager` 1110 is provided. 1111 """ 1112 1113 if storage_classes_desired and type(storage_classes_desired) is not list: 1114 raise errors.ArgumentError("storage_classes_desired must be list type.") 1115 1116 super(Collection, self).__init__(parent) 1117 self._api_client = api_client 1118 self._keep_client = keep_client 1119 1120 # Use the keep client from ThreadSafeApiCache 1121 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1122 self._keep_client = self._api_client.keep 1123 1124 self._block_manager = block_manager 1125 self.replication_desired = replication_desired 1126 self._storage_classes_desired = storage_classes_desired 1127 self.put_threads = put_threads 1128 1129 if apiconfig: 1130 self._config = apiconfig 1131 else: 1132 self._config = config.settings() 1133 1134 self.num_retries = num_retries 1135 self._manifest_locator = None 1136 self._manifest_text = None 1137 self._portable_data_hash = None 1138 self._api_response = None 1139 self._past_versions = set() 1140 1141 self.lock = threading.RLock() 1142 self.events = None 1143 1144 if manifest_locator_or_text: 1145 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1146 self._manifest_locator = manifest_locator_or_text 1147 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1148 self._manifest_locator = manifest_locator_or_text 1149 if not self._has_local_collection_uuid(): 1150 self._has_remote_blocks = True 1151 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1152 self._manifest_text = manifest_locator_or_text 1153 if '+R' in self._manifest_text: 1154 self._has_remote_blocks = True 1155 else: 1156 raise errors.ArgumentError( 1157 "Argument to CollectionReader is not a manifest or a collection UUID") 1158 1159 try: 1160 self._populate() 1161 except errors.SyntaxError as e: 1162 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1163 1164 def storage_classes_desired(self) -> List[str]: 1165 """Get this collection's `storage_classes_desired` value""" 1166 return self._storage_classes_desired or [] 1167 1168 def root_collection(self) -> 'Collection': 1169 return self 1170 1171 def get_properties(self) -> Properties: 1172 """Get this collection's properties 1173 1174 This method always returns a dict. If this collection object does not 1175 have an associated API record, or that record does not have any 1176 properties set, this method returns an empty dict. 1177 """ 1178 if self._api_response and self._api_response["properties"]: 1179 return self._api_response["properties"] 1180 else: 1181 return {} 1182 1183 def get_trash_at(self) -> Optional[datetime.datetime]: 1184 """Get this collection's `trash_at` field 1185 1186 This method parses the `trash_at` field of the collection's API 1187 record and returns a datetime from it. If that field is not set, or 1188 this collection object does not have an associated API record, 1189 returns None. 1190 """ 1191 if self._api_response and self._api_response["trash_at"]: 1192 try: 1193 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1194 except ValueError: 1195 return None 1196 else: 1197 return None 1198 1199 def stream_name(self) -> str: 1200 return "." 1201 1202 def writable(self) -> bool: 1203 return True 1204 1205 @synchronized 1206 def known_past_version( 1207 self, 1208 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1209 ) -> bool: 1210 """Indicate whether an API record for this collection has been seen before 1211 1212 As this collection object loads records from the API server, it records 1213 their `modified_at` and `portable_data_hash` fields. This method accepts 1214 a 2-tuple with values for those fields, and returns `True` if the 1215 combination was previously loaded. 1216 """ 1217 return modified_at_and_portable_data_hash in self._past_versions 1218 1219 @synchronized 1220 @retry_method 1221 def update( 1222 self, 1223 other: Optional['Collection']=None, 1224 num_retries: Optional[int]=None, 1225 ) -> None: 1226 """Merge another collection's contents into this one 1227 1228 This method compares the manifest of this collection instance with 1229 another, then updates this instance's manifest with changes from the 1230 other, renaming files to flag conflicts where necessary. 1231 1232 When called without any arguments, this method reloads the collection's 1233 API record, and updates this instance with any changes that have 1234 appeared server-side. If this instance does not have a corresponding 1235 API record, this method raises `arvados.errors.ArgumentError`. 1236 1237 Arguments: 1238 1239 * other: arvados.collection.Collection | None --- The collection 1240 whose contents should be merged into this instance. When not 1241 provided, this method reloads this collection's API record and 1242 constructs a Collection object from it. If this instance does not 1243 have a corresponding API record, this method raises 1244 `arvados.errors.ArgumentError`. 1245 1246 * num_retries: int | None --- The number of times to retry reloading 1247 the collection's API record from the API server. If not specified, 1248 uses the `num_retries` provided when this instance was constructed. 1249 """ 1250 if other is None: 1251 if self._manifest_locator is None: 1252 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1253 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1254 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1255 response.get("portable_data_hash") != self.portable_data_hash()): 1256 # The record on the server is different from our current one, but we've seen it before, 1257 # so ignore it because it's already been merged. 1258 # However, if it's the same as our current record, proceed with the update, because we want to update 1259 # our tokens. 1260 return 1261 else: 1262 self._remember_api_response(response) 1263 other = CollectionReader(response["manifest_text"]) 1264 baseline = CollectionReader(self._manifest_text) 1265 self.apply(baseline.diff(other)) 1266 self._manifest_text = self.manifest_text() 1267 1268 @synchronized 1269 def _my_api(self): 1270 if self._api_client is None: 1271 self._api_client = ThreadSafeApiCache(self._config, version='v1') 1272 if self._keep_client is None: 1273 self._keep_client = self._api_client.keep 1274 return self._api_client 1275 1276 @synchronized 1277 def _my_keep(self): 1278 if self._keep_client is None: 1279 if self._api_client is None: 1280 self._my_api() 1281 else: 1282 self._keep_client = KeepClient(api_client=self._api_client) 1283 return self._keep_client 1284 1285 @synchronized 1286 def _my_block_manager(self): 1287 if self._block_manager is None: 1288 copies = (self.replication_desired or 1289 self._my_api()._rootDesc.get('defaultCollectionReplication', 1290 2)) 1291 self._block_manager = _BlockManager(self._my_keep(), 1292 copies=copies, 1293 put_threads=self.put_threads, 1294 num_retries=self.num_retries, 1295 storage_classes_func=self.storage_classes_desired) 1296 return self._block_manager 1297 1298 def _remember_api_response(self, response): 1299 self._api_response = response 1300 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1301 1302 def _populate_from_api_server(self): 1303 # As in KeepClient itself, we must wait until the last 1304 # possible moment to instantiate an API client, in order to 1305 # avoid tripping up clients that don't have access to an API 1306 # server. If we do build one, make sure our Keep client uses 1307 # it. If instantiation fails, we'll fall back to the except 1308 # clause, just like any other Collection lookup 1309 # failure. Return an exception, or None if successful. 1310 self._remember_api_response(self._my_api().collections().get( 1311 uuid=self._manifest_locator).execute( 1312 num_retries=self.num_retries)) 1313 self._manifest_text = self._api_response['manifest_text'] 1314 self._portable_data_hash = self._api_response['portable_data_hash'] 1315 # If not overriden via kwargs, we should try to load the 1316 # replication_desired and storage_classes_desired from the API server 1317 if self.replication_desired is None: 1318 self.replication_desired = self._api_response.get('replication_desired', None) 1319 if self._storage_classes_desired is None: 1320 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1321 1322 def _populate(self): 1323 if self._manifest_text is None: 1324 if self._manifest_locator is None: 1325 return 1326 else: 1327 self._populate_from_api_server() 1328 self._baseline_manifest = self._manifest_text 1329 self._import_manifest(self._manifest_text) 1330 1331 def _has_collection_uuid(self): 1332 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1333 1334 def _has_local_collection_uuid(self): 1335 return self._has_collection_uuid and \ 1336 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1337 1338 def __enter__(self): 1339 return self 1340 1341 def __exit__(self, exc_type, exc_value, traceback): 1342 """Exit a context with this collection instance 1343 1344 If no exception was raised inside the context block, and this 1345 collection is writable and has a corresponding API record, that 1346 record will be updated to match the state of this instance at the end 1347 of the block. 1348 """ 1349 if exc_type is None: 1350 if self.writable() and self._has_collection_uuid(): 1351 self.save() 1352 self.stop_threads() 1353 1354 def stop_threads(self) -> None: 1355 """Stop background Keep upload/download threads""" 1356 if self._block_manager is not None: 1357 self._block_manager.stop_threads() 1358 1359 @synchronized 1360 def manifest_locator(self) -> Optional[str]: 1361 """Get this collection's manifest locator, if any 1362 1363 * If this collection instance is associated with an API record with a 1364 UUID, return that. 1365 * Otherwise, if this collection instance was loaded from an API record 1366 by portable data hash, return that. 1367 * Otherwise, return `None`. 1368 """ 1369 return self._manifest_locator 1370 1371 @synchronized 1372 def clone( 1373 self, 1374 new_parent: Optional['Collection']=None, 1375 new_name: Optional[str]=None, 1376 readonly: bool=False, 1377 new_config: Optional[Mapping[str, str]]=None, 1378 ) -> 'Collection': 1379 """Create a Collection object with the same contents as this instance 1380 1381 This method creates a new Collection object with contents that match 1382 this instance's. The new collection will not be associated with any API 1383 record. 1384 1385 Arguments: 1386 1387 * new_parent: arvados.collection.Collection | None --- This value is 1388 passed to the new Collection's constructor as the `parent` 1389 argument. 1390 1391 * new_name: str | None --- This value is unused. 1392 1393 * readonly: bool --- If this value is true, this method constructs and 1394 returns a `CollectionReader`. Otherwise, it returns a mutable 1395 `Collection`. Default `False`. 1396 1397 * new_config: Mapping[str, str] | None --- This value is passed to the 1398 new Collection's constructor as `apiconfig`. If no value is provided, 1399 defaults to the configuration passed to this instance's constructor. 1400 """ 1401 if new_config is None: 1402 new_config = self._config 1403 if readonly: 1404 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1405 else: 1406 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1407 1408 newcollection._clonefrom(self) 1409 return newcollection 1410 1411 @synchronized 1412 def api_response(self) -> Optional[Dict[str, Any]]: 1413 """Get this instance's associated API record 1414 1415 If this Collection instance has an associated API record, return it. 1416 Otherwise, return `None`. 1417 """ 1418 return self._api_response 1419 1420 def find_or_create( 1421 self, 1422 path: str, 1423 create_type: CreateType, 1424 ) -> CollectionItem: 1425 if path == ".": 1426 return self 1427 else: 1428 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1429 1430 def find(self, path: str) -> CollectionItem: 1431 if path == ".": 1432 return self 1433 else: 1434 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1435 1436 def remove(self, path: str, recursive: bool=False) -> None: 1437 if path == ".": 1438 raise errors.ArgumentError("Cannot remove '.'") 1439 else: 1440 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1441 1442 @must_be_writable 1443 @synchronized 1444 @retry_method 1445 def save( 1446 self, 1447 properties: Optional[Properties]=None, 1448 storage_classes: Optional[StorageClasses]=None, 1449 trash_at: Optional[datetime.datetime]=None, 1450 merge: bool=True, 1451 num_retries: Optional[int]=None, 1452 preserve_version: bool=False, 1453 ) -> str: 1454 """Save collection to an existing API record 1455 1456 This method updates the instance's corresponding API record to match 1457 the instance's state. If this instance does not have a corresponding API 1458 record yet, raises `AssertionError`. (To create a new API record, use 1459 `Collection.save_new`.) This method returns the saved collection 1460 manifest. 1461 1462 Arguments: 1463 1464 * properties: dict[str, Any] | None --- If provided, the API record will 1465 be updated with these properties. Note this will completely replace 1466 any existing properties. 1467 1468 * storage_classes: list[str] | None --- If provided, the API record will 1469 be updated with this value in the `storage_classes_desired` field. 1470 This value will also be saved on the instance and used for any 1471 changes that follow. 1472 1473 * trash_at: datetime.datetime | None --- If provided, the API record 1474 will be updated with this value in the `trash_at` field. 1475 1476 * merge: bool --- If `True` (the default), this method will first 1477 reload this collection's API record, and merge any new contents into 1478 this instance before saving changes. See `Collection.update` for 1479 details. 1480 1481 * num_retries: int | None --- The number of times to retry reloading 1482 the collection's API record from the API server. If not specified, 1483 uses the `num_retries` provided when this instance was constructed. 1484 1485 * preserve_version: bool --- This value will be passed to directly 1486 to the underlying API call. If `True`, the Arvados API will 1487 preserve the versions of this collection both immediately before 1488 and after the update. If `True` when the API server is not 1489 configured with collection versioning, this method raises 1490 `arvados.errors.ArgumentError`. 1491 """ 1492 if properties and type(properties) is not dict: 1493 raise errors.ArgumentError("properties must be dictionary type.") 1494 1495 if storage_classes and type(storage_classes) is not list: 1496 raise errors.ArgumentError("storage_classes must be list type.") 1497 if storage_classes: 1498 self._storage_classes_desired = storage_classes 1499 1500 if trash_at and type(trash_at) is not datetime.datetime: 1501 raise errors.ArgumentError("trash_at must be datetime type.") 1502 1503 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1504 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1505 1506 body={} 1507 if properties: 1508 body["properties"] = properties 1509 if self.storage_classes_desired(): 1510 body["storage_classes_desired"] = self.storage_classes_desired() 1511 if trash_at: 1512 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1513 body["trash_at"] = t 1514 if preserve_version: 1515 body["preserve_version"] = preserve_version 1516 1517 if not self.committed(): 1518 if self._has_remote_blocks: 1519 # Copy any remote blocks to the local cluster. 1520 self._copy_remote_blocks(remote_blocks={}) 1521 self._has_remote_blocks = False 1522 if not self._has_collection_uuid(): 1523 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1524 elif not self._has_local_collection_uuid(): 1525 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1526 1527 self._my_block_manager().commit_all() 1528 1529 if merge: 1530 self.update() 1531 1532 text = self.manifest_text(strip=False) 1533 body['manifest_text'] = text 1534 1535 self._remember_api_response(self._my_api().collections().update( 1536 uuid=self._manifest_locator, 1537 body=body 1538 ).execute(num_retries=num_retries)) 1539 self._manifest_text = self._api_response["manifest_text"] 1540 self._portable_data_hash = self._api_response["portable_data_hash"] 1541 self.set_committed(True) 1542 elif body: 1543 self._remember_api_response(self._my_api().collections().update( 1544 uuid=self._manifest_locator, 1545 body=body 1546 ).execute(num_retries=num_retries)) 1547 1548 return self._manifest_text 1549 1550 1551 @must_be_writable 1552 @synchronized 1553 @retry_method 1554 def save_new( 1555 self, 1556 name: Optional[str]=None, 1557 create_collection_record: bool=True, 1558 owner_uuid: Optional[str]=None, 1559 properties: Optional[Properties]=None, 1560 storage_classes: Optional[StorageClasses]=None, 1561 trash_at: Optional[datetime.datetime]=None, 1562 ensure_unique_name: bool=False, 1563 num_retries: Optional[int]=None, 1564 preserve_version: bool=False, 1565 ): 1566 """Save collection to a new API record 1567 1568 This method finishes uploading new data blocks and (optionally) 1569 creates a new API collection record with the provided data. If a new 1570 record is created, this instance becomes associated with that record 1571 for future updates like `save()`. This method returns the saved 1572 collection manifest. 1573 1574 Arguments: 1575 1576 * name: str | None --- The `name` field to use on the new collection 1577 record. If not specified, a generic default name is generated. 1578 1579 * create_collection_record: bool --- If `True` (the default), creates a 1580 collection record on the API server. If `False`, the method finishes 1581 all data uploads and only returns the resulting collection manifest 1582 without sending it to the API server. 1583 1584 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1585 new collection record. 1586 1587 * properties: dict[str, Any] | None --- The `properties` field to use on 1588 the new collection record. 1589 1590 * storage_classes: list[str] | None --- The 1591 `storage_classes_desired` field to use on the new collection record. 1592 1593 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1594 on the new collection record. 1595 1596 * ensure_unique_name: bool --- This value is passed directly to the 1597 Arvados API when creating the collection record. If `True`, the API 1598 server may modify the submitted `name` to ensure the collection's 1599 `name`+`owner_uuid` combination is unique. If `False` (the default), 1600 if a collection already exists with this same `name`+`owner_uuid` 1601 combination, creating a collection record will raise a validation 1602 error. 1603 1604 * num_retries: int | None --- The number of times to retry reloading 1605 the collection's API record from the API server. If not specified, 1606 uses the `num_retries` provided when this instance was constructed. 1607 1608 * preserve_version: bool --- This value will be passed to directly 1609 to the underlying API call. If `True`, the Arvados API will 1610 preserve the versions of this collection both immediately before 1611 and after the update. If `True` when the API server is not 1612 configured with collection versioning, this method raises 1613 `arvados.errors.ArgumentError`. 1614 """ 1615 if properties and type(properties) is not dict: 1616 raise errors.ArgumentError("properties must be dictionary type.") 1617 1618 if storage_classes and type(storage_classes) is not list: 1619 raise errors.ArgumentError("storage_classes must be list type.") 1620 1621 if trash_at and type(trash_at) is not datetime.datetime: 1622 raise errors.ArgumentError("trash_at must be datetime type.") 1623 1624 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1625 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1626 1627 if self._has_remote_blocks: 1628 # Copy any remote blocks to the local cluster. 1629 self._copy_remote_blocks(remote_blocks={}) 1630 self._has_remote_blocks = False 1631 1632 if storage_classes: 1633 self._storage_classes_desired = storage_classes 1634 1635 self._my_block_manager().commit_all() 1636 text = self.manifest_text(strip=False) 1637 1638 if create_collection_record: 1639 if name is None: 1640 name = "New collection" 1641 ensure_unique_name = True 1642 1643 body = {"manifest_text": text, 1644 "name": name, 1645 "replication_desired": self.replication_desired} 1646 if owner_uuid: 1647 body["owner_uuid"] = owner_uuid 1648 if properties: 1649 body["properties"] = properties 1650 if self.storage_classes_desired(): 1651 body["storage_classes_desired"] = self.storage_classes_desired() 1652 if trash_at: 1653 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1654 body["trash_at"] = t 1655 if preserve_version: 1656 body["preserve_version"] = preserve_version 1657 1658 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1659 text = self._api_response["manifest_text"] 1660 1661 self._manifest_locator = self._api_response["uuid"] 1662 self._portable_data_hash = self._api_response["portable_data_hash"] 1663 1664 self._manifest_text = text 1665 self.set_committed(True) 1666 1667 return text 1668 1669 _token_re = re.compile(r'(\S+)(\s+|$)') 1670 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1671 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1672 1673 def _unescape_manifest_path(self, path): 1674 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1675 1676 @synchronized 1677 def _import_manifest(self, manifest_text): 1678 """Import a manifest into a `Collection`. 1679 1680 :manifest_text: 1681 The manifest text to import from. 1682 1683 """ 1684 if len(self) > 0: 1685 raise ArgumentError("Can only import manifest into an empty collection") 1686 1687 STREAM_NAME = 0 1688 BLOCKS = 1 1689 SEGMENTS = 2 1690 1691 stream_name = None 1692 state = STREAM_NAME 1693 1694 for token_and_separator in self._token_re.finditer(manifest_text): 1695 tok = token_and_separator.group(1) 1696 sep = token_and_separator.group(2) 1697 1698 if state == STREAM_NAME: 1699 # starting a new stream 1700 stream_name = self._unescape_manifest_path(tok) 1701 blocks = [] 1702 segments = [] 1703 streamoffset = 0 1704 state = BLOCKS 1705 self.find_or_create(stream_name, COLLECTION) 1706 continue 1707 1708 if state == BLOCKS: 1709 block_locator = self._block_re.match(tok) 1710 if block_locator: 1711 blocksize = int(block_locator.group(1)) 1712 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1713 streamoffset += blocksize 1714 else: 1715 state = SEGMENTS 1716 1717 if state == SEGMENTS: 1718 file_segment = self._segment_re.match(tok) 1719 if file_segment: 1720 pos = int(file_segment.group(1)) 1721 size = int(file_segment.group(2)) 1722 name = self._unescape_manifest_path(file_segment.group(3)) 1723 if name.split('/')[-1] == '.': 1724 # placeholder for persisting an empty directory, not a real file 1725 if len(name) > 2: 1726 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1727 else: 1728 filepath = os.path.join(stream_name, name) 1729 try: 1730 afile = self.find_or_create(filepath, FILE) 1731 except IOError as e: 1732 if e.errno == errno.ENOTDIR: 1733 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1734 else: 1735 raise e from None 1736 if isinstance(afile, ArvadosFile): 1737 afile.add_segment(blocks, pos, size) 1738 else: 1739 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1740 else: 1741 # error! 1742 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1743 1744 if sep == "\n": 1745 stream_name = None 1746 state = STREAM_NAME 1747 1748 self.set_committed(True) 1749 1750 @synchronized 1751 def notify( 1752 self, 1753 event: ChangeType, 1754 collection: 'RichCollectionBase', 1755 name: str, 1756 item: CollectionItem, 1757 ) -> None: 1758 if self._callback: 1759 self._callback(event, collection, name, item) 1760 1761 1762class Subcollection(RichCollectionBase): 1763 """Read and manipulate a stream/directory within an Arvados collection 1764 1765 This class represents a single stream (like a directory) within an Arvados 1766 `Collection`. It is returned by `Collection.find` and provides the same API. 1767 Operations that work on the API collection record propagate to the parent 1768 `Collection` object. 1769 """ 1770 1771 def __init__(self, parent, name): 1772 super(Subcollection, self).__init__(parent) 1773 self.lock = self.root_collection().lock 1774 self._manifest_text = None 1775 self.name = name 1776 self.num_retries = parent.num_retries 1777 1778 def root_collection(self) -> 'Collection': 1779 return self.parent.root_collection() 1780 1781 def writable(self) -> bool: 1782 return self.root_collection().writable() 1783 1784 def _my_api(self): 1785 return self.root_collection()._my_api() 1786 1787 def _my_keep(self): 1788 return self.root_collection()._my_keep() 1789 1790 def _my_block_manager(self): 1791 return self.root_collection()._my_block_manager() 1792 1793 def stream_name(self) -> str: 1794 return os.path.join(self.parent.stream_name(), self.name) 1795 1796 @synchronized 1797 def clone( 1798 self, 1799 new_parent: Optional['Collection']=None, 1800 new_name: Optional[str]=None, 1801 ) -> 'Subcollection': 1802 c = Subcollection(new_parent, new_name) 1803 c._clonefrom(self) 1804 return c 1805 1806 @must_be_writable 1807 @synchronized 1808 def _reparent(self, newparent, newname): 1809 self.set_committed(False) 1810 self.flush() 1811 self.parent.remove(self.name, recursive=True) 1812 self.parent = newparent 1813 self.name = newname 1814 self.lock = self.parent.root_collection().lock 1815 1816 @synchronized 1817 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1818 """Encode empty directories by using an \056-named (".") empty file""" 1819 if len(self._items) == 0: 1820 return "%s %s 0:0:\\056\n" % ( 1821 escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1822 return super(Subcollection, self)._get_manifest_text(stream_name, 1823 strip, normalize, 1824 only_committed) 1825 1826 1827class CollectionReader(Collection): 1828 """Read-only `Collection` subclass 1829 1830 This class will never create or update any API collection records. You can 1831 use this class for additional code safety when you only need to read 1832 existing collections. 1833 """ 1834 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1835 self._in_init = True 1836 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1837 self._in_init = False 1838 1839 # Forego any locking since it should never change once initialized. 1840 self.lock = NoopLock() 1841 1842 # Backwards compatability with old CollectionReader 1843 # all_streams() and all_files() 1844 self._streams = None 1845 1846 def writable(self) -> bool: 1847 return self._in_init 1848 1849 def _populate_streams(orig_func): 1850 @functools.wraps(orig_func) 1851 def populate_streams_wrapper(self, *args, **kwargs): 1852 # Defer populating self._streams until needed since it creates a copy of the manifest. 1853 if self._streams is None: 1854 if self._manifest_text: 1855 self._streams = [sline.split() 1856 for sline in self._manifest_text.split("\n") 1857 if sline] 1858 else: 1859 self._streams = [] 1860 return orig_func(self, *args, **kwargs) 1861 return populate_streams_wrapper 1862 1863 @arvados.util._deprecated('3.0', 'Collection iteration') 1864 @_populate_streams 1865 def normalize(self): 1866 """Normalize the streams returned by `all_streams`""" 1867 streams = {} 1868 for s in self.all_streams(): 1869 for f in s.all_files(): 1870 streamname, filename = split(s.name() + "/" + f.name()) 1871 if streamname not in streams: 1872 streams[streamname] = {} 1873 if filename not in streams[streamname]: 1874 streams[streamname][filename] = [] 1875 for r in f.segments: 1876 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1877 1878 self._streams = [normalize_stream(s, streams[s]) 1879 for s in sorted(streams)] 1880 1881 @arvados.util._deprecated('3.0', 'Collection iteration') 1882 @_populate_streams 1883 def all_streams(self): 1884 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1885 for s in self._streams] 1886 1887 @arvados.util._deprecated('3.0', 'Collection iteration') 1888 @_populate_streams 1889 def all_files(self): 1890 for s in self.all_streams(): 1891 for f in s.all_files(): 1892 yield f 1893 1894 1895class CollectionWriter(CollectionBase): 1896 """Create a new collection from scratch 1897 1898 .. WARNING:: Deprecated 1899 This class is deprecated. Prefer `arvados.collection.Collection` 1900 instead. 1901 """ 1902 1903 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 1904 def __init__(self, api_client=None, num_retries=0, replication=None): 1905 """Instantiate a CollectionWriter. 1906 1907 CollectionWriter lets you build a new Arvados Collection from scratch. 1908 Write files to it. The CollectionWriter will upload data to Keep as 1909 appropriate, and provide you with the Collection manifest text when 1910 you're finished. 1911 1912 Arguments: 1913 * api_client: The API client to use to look up Collections. If not 1914 provided, CollectionReader will build one from available Arvados 1915 configuration. 1916 * num_retries: The default number of times to retry failed 1917 service requests. Default 0. You may change this value 1918 after instantiation, but note those changes may not 1919 propagate to related objects like the Keep client. 1920 * replication: The number of copies of each block to store. 1921 If this argument is None or not supplied, replication is 1922 the server-provided default if available, otherwise 2. 1923 """ 1924 self._api_client = api_client 1925 self.num_retries = num_retries 1926 self.replication = (2 if replication is None else replication) 1927 self._keep_client = None 1928 self._data_buffer = [] 1929 self._data_buffer_len = 0 1930 self._current_stream_files = [] 1931 self._current_stream_length = 0 1932 self._current_stream_locators = [] 1933 self._current_stream_name = '.' 1934 self._current_file_name = None 1935 self._current_file_pos = 0 1936 self._finished_streams = [] 1937 self._close_file = None 1938 self._queued_file = None 1939 self._queued_dirents = deque() 1940 self._queued_trees = deque() 1941 self._last_open = None 1942 1943 def __exit__(self, exc_type, exc_value, traceback): 1944 if exc_type is None: 1945 self.finish() 1946 1947 def do_queued_work(self): 1948 # The work queue consists of three pieces: 1949 # * _queued_file: The file object we're currently writing to the 1950 # Collection. 1951 # * _queued_dirents: Entries under the current directory 1952 # (_queued_trees[0]) that we want to write or recurse through. 1953 # This may contain files from subdirectories if 1954 # max_manifest_depth == 0 for this directory. 1955 # * _queued_trees: Directories that should be written as separate 1956 # streams to the Collection. 1957 # This function handles the smallest piece of work currently queued 1958 # (current file, then current directory, then next directory) until 1959 # no work remains. The _work_THING methods each do a unit of work on 1960 # THING. _queue_THING methods add a THING to the work queue. 1961 while True: 1962 if self._queued_file: 1963 self._work_file() 1964 elif self._queued_dirents: 1965 self._work_dirents() 1966 elif self._queued_trees: 1967 self._work_trees() 1968 else: 1969 break 1970 1971 def _work_file(self): 1972 while True: 1973 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) 1974 if not buf: 1975 break 1976 self.write(buf) 1977 self.finish_current_file() 1978 if self._close_file: 1979 self._queued_file.close() 1980 self._close_file = None 1981 self._queued_file = None 1982 1983 def _work_dirents(self): 1984 path, stream_name, max_manifest_depth = self._queued_trees[0] 1985 if stream_name != self.current_stream_name(): 1986 self.start_new_stream(stream_name) 1987 while self._queued_dirents: 1988 dirent = self._queued_dirents.popleft() 1989 target = os.path.join(path, dirent) 1990 if os.path.isdir(target): 1991 self._queue_tree(target, 1992 os.path.join(stream_name, dirent), 1993 max_manifest_depth - 1) 1994 else: 1995 self._queue_file(target, dirent) 1996 break 1997 if not self._queued_dirents: 1998 self._queued_trees.popleft() 1999 2000 def _work_trees(self): 2001 path, stream_name, max_manifest_depth = self._queued_trees[0] 2002 d = arvados.util.listdir_recursive( 2003 path, max_depth = (None if max_manifest_depth == 0 else 0)) 2004 if d: 2005 self._queue_dirents(stream_name, d) 2006 else: 2007 self._queued_trees.popleft() 2008 2009 def _queue_file(self, source, filename=None): 2010 assert (self._queued_file is None), "tried to queue more than one file" 2011 if not hasattr(source, 'read'): 2012 source = open(source, 'rb') 2013 self._close_file = True 2014 else: 2015 self._close_file = False 2016 if filename is None: 2017 filename = os.path.basename(source.name) 2018 self.start_new_file(filename) 2019 self._queued_file = source 2020 2021 def _queue_dirents(self, stream_name, dirents): 2022 assert (not self._queued_dirents), "tried to queue more than one tree" 2023 self._queued_dirents = deque(sorted(dirents)) 2024 2025 def _queue_tree(self, path, stream_name, max_manifest_depth): 2026 self._queued_trees.append((path, stream_name, max_manifest_depth)) 2027 2028 def write_file(self, source, filename=None): 2029 self._queue_file(source, filename) 2030 self.do_queued_work() 2031 2032 def write_directory_tree(self, 2033 path, stream_name='.', max_manifest_depth=-1): 2034 self._queue_tree(path, stream_name, max_manifest_depth) 2035 self.do_queued_work() 2036 2037 def write(self, newdata): 2038 if isinstance(newdata, bytes): 2039 pass 2040 elif isinstance(newdata, str): 2041 newdata = newdata.encode() 2042 elif hasattr(newdata, '__iter__'): 2043 for s in newdata: 2044 self.write(s) 2045 return 2046 self._data_buffer.append(newdata) 2047 self._data_buffer_len += len(newdata) 2048 self._current_stream_length += len(newdata) 2049 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 2050 self.flush_data() 2051 2052 def open(self, streampath, filename=None): 2053 """open(streampath[, filename]) -> file-like object 2054 2055 Pass in the path of a file to write to the Collection, either as a 2056 single string or as two separate stream name and file name arguments. 2057 This method returns a file-like object you can write to add it to the 2058 Collection. 2059 2060 You may only have one file object from the Collection open at a time, 2061 so be sure to close the object when you're done. Using the object in 2062 a with statement makes that easy: 2063 2064 with cwriter.open('./doc/page1.txt') as outfile: 2065 outfile.write(page1_data) 2066 with cwriter.open('./doc/page2.txt') as outfile: 2067 outfile.write(page2_data) 2068 """ 2069 if filename is None: 2070 streampath, filename = split(streampath) 2071 if self._last_open and not self._last_open.closed: 2072 raise errors.AssertionError( 2073 u"can't open '{}' when '{}' is still open".format( 2074 filename, self._last_open.name)) 2075 if streampath != self.current_stream_name(): 2076 self.start_new_stream(streampath) 2077 self.set_current_file_name(filename) 2078 self._last_open = _WriterFile(self, filename) 2079 return self._last_open 2080 2081 def flush_data(self): 2082 data_buffer = b''.join(self._data_buffer) 2083 if data_buffer: 2084 self._current_stream_locators.append( 2085 self._my_keep().put( 2086 data_buffer[0:config.KEEP_BLOCK_SIZE], 2087 copies=self.replication)) 2088 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 2089 self._data_buffer_len = len(self._data_buffer[0]) 2090 2091 def start_new_file(self, newfilename=None): 2092 self.finish_current_file() 2093 self.set_current_file_name(newfilename) 2094 2095 def set_current_file_name(self, newfilename): 2096 if re.search(r'[\t\n]', newfilename): 2097 raise errors.AssertionError( 2098 "Manifest filenames cannot contain whitespace: %s" % 2099 newfilename) 2100 elif re.search(r'\x00', newfilename): 2101 raise errors.AssertionError( 2102 "Manifest filenames cannot contain NUL characters: %s" % 2103 newfilename) 2104 self._current_file_name = newfilename 2105 2106 def current_file_name(self): 2107 return self._current_file_name 2108 2109 def finish_current_file(self): 2110 if self._current_file_name is None: 2111 if self._current_file_pos == self._current_stream_length: 2112 return 2113 raise errors.AssertionError( 2114 "Cannot finish an unnamed file " + 2115 "(%d bytes at offset %d in '%s' stream)" % 2116 (self._current_stream_length - self._current_file_pos, 2117 self._current_file_pos, 2118 self._current_stream_name)) 2119 self._current_stream_files.append([ 2120 self._current_file_pos, 2121 self._current_stream_length - self._current_file_pos, 2122 self._current_file_name]) 2123 self._current_file_pos = self._current_stream_length 2124 self._current_file_name = None 2125 2126 def start_new_stream(self, newstreamname='.'): 2127 self.finish_current_stream() 2128 self.set_current_stream_name(newstreamname) 2129 2130 def set_current_stream_name(self, newstreamname): 2131 if re.search(r'[\t\n]', newstreamname): 2132 raise errors.AssertionError( 2133 "Manifest stream names cannot contain whitespace: '%s'" % 2134 (newstreamname)) 2135 self._current_stream_name = '.' if newstreamname=='' else newstreamname 2136 2137 def current_stream_name(self): 2138 return self._current_stream_name 2139 2140 def finish_current_stream(self): 2141 self.finish_current_file() 2142 self.flush_data() 2143 if not self._current_stream_files: 2144 pass 2145 elif self._current_stream_name is None: 2146 raise errors.AssertionError( 2147 "Cannot finish an unnamed stream (%d bytes in %d files)" % 2148 (self._current_stream_length, len(self._current_stream_files))) 2149 else: 2150 if not self._current_stream_locators: 2151 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 2152 self._finished_streams.append([self._current_stream_name, 2153 self._current_stream_locators, 2154 self._current_stream_files]) 2155 self._current_stream_files = [] 2156 self._current_stream_length = 0 2157 self._current_stream_locators = [] 2158 self._current_stream_name = None 2159 self._current_file_pos = 0 2160 self._current_file_name = None 2161 2162 def finish(self): 2163 """Store the manifest in Keep and return its locator. 2164 2165 This is useful for storing manifest fragments (task outputs) 2166 temporarily in Keep during a Crunch job. 2167 2168 In other cases you should make a collection instead, by 2169 sending manifest_text() to the API server's "create 2170 collection" endpoint. 2171 """ 2172 return self._my_keep().put(self.manifest_text().encode(), 2173 copies=self.replication) 2174 2175 def portable_data_hash(self): 2176 stripped = self.stripped_manifest().encode() 2177 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 2178 2179 def manifest_text(self): 2180 self.finish_current_stream() 2181 manifest = '' 2182 2183 for stream in self._finished_streams: 2184 if not re.search(r'^\.(/.*)?$', stream[0]): 2185 manifest += './' 2186 manifest += stream[0].replace(' ', '\\040') 2187 manifest += ' ' + ' '.join(stream[1]) 2188 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 2189 manifest += "\n" 2190 2191 return manifest 2192 2193 def data_locators(self): 2194 ret = [] 2195 for name, locators, files in self._finished_streams: 2196 ret += locators 2197 return ret 2198 2199 def save_new(self, name=None): 2200 return self._api_client.collections().create( 2201 ensure_unique_name=True, 2202 body={ 2203 'name': name, 2204 'manifest_text': self.manifest_text(), 2205 }).execute(num_retries=self.num_retries) 2206 2207 2208class ResumableCollectionWriter(CollectionWriter): 2209 """CollectionWriter that can serialize internal state to disk 2210 2211 .. WARNING:: Deprecated 2212 This class is deprecated. Prefer `arvados.collection.Collection` 2213 instead. 2214 """ 2215 2216 STATE_PROPS = ['_current_stream_files', '_current_stream_length', 2217 '_current_stream_locators', '_current_stream_name', 2218 '_current_file_name', '_current_file_pos', '_close_file', 2219 '_data_buffer', '_dependencies', '_finished_streams', 2220 '_queued_dirents', '_queued_trees'] 2221 2222 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 2223 def __init__(self, api_client=None, **kwargs): 2224 self._dependencies = {} 2225 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) 2226 2227 @classmethod 2228 def from_state(cls, state, *init_args, **init_kwargs): 2229 # Try to build a new writer from scratch with the given state. 2230 # If the state is not suitable to resume (because files have changed, 2231 # been deleted, aren't predictable, etc.), raise a 2232 # StaleWriterStateError. Otherwise, return the initialized writer. 2233 # The caller is responsible for calling writer.do_queued_work() 2234 # appropriately after it's returned. 2235 writer = cls(*init_args, **init_kwargs) 2236 for attr_name in cls.STATE_PROPS: 2237 attr_value = state[attr_name] 2238 attr_class = getattr(writer, attr_name).__class__ 2239 # Coerce the value into the same type as the initial value, if 2240 # needed. 2241 if attr_class not in (type(None), attr_value.__class__): 2242 attr_value = attr_class(attr_value) 2243 setattr(writer, attr_name, attr_value) 2244 # Check dependencies before we try to resume anything. 2245 if any(KeepLocator(ls).permission_expired() 2246 for ls in writer._current_stream_locators): 2247 raise errors.StaleWriterStateError( 2248 "locators include expired permission hint") 2249 writer.check_dependencies() 2250 if state['_current_file'] is not None: 2251 path, pos = state['_current_file'] 2252 try: 2253 writer._queued_file = open(path, 'rb') 2254 writer._queued_file.seek(pos) 2255 except IOError as error: 2256 raise errors.StaleWriterStateError( 2257 u"failed to reopen active file {}: {}".format(path, error)) 2258 return writer 2259 2260 def check_dependencies(self): 2261 for path, orig_stat in listitems(self._dependencies): 2262 if not S_ISREG(orig_stat[ST_MODE]): 2263 raise errors.StaleWriterStateError(u"{} not file".format(path)) 2264 try: 2265 now_stat = tuple(os.stat(path)) 2266 except OSError as error: 2267 raise errors.StaleWriterStateError( 2268 u"failed to stat {}: {}".format(path, error)) 2269 if ((not S_ISREG(now_stat[ST_MODE])) or 2270 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 2271 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 2272 raise errors.StaleWriterStateError(u"{} changed".format(path)) 2273 2274 def dump_state(self, copy_func=lambda x: x): 2275 state = {attr: copy_func(getattr(self, attr)) 2276 for attr in self.STATE_PROPS} 2277 if self._queued_file is None: 2278 state['_current_file'] = None 2279 else: 2280 state['_current_file'] = (os.path.realpath(self._queued_file.name), 2281 self._queued_file.tell()) 2282 return state 2283 2284 def _queue_file(self, source, filename=None): 2285 try: 2286 src_path = os.path.realpath(source) 2287 except Exception: 2288 raise errors.AssertionError(u"{} not a file path".format(source)) 2289 try: 2290 path_stat = os.stat(src_path) 2291 except OSError as stat_error: 2292 path_stat = None 2293 super(ResumableCollectionWriter, self)._queue_file(source, filename) 2294 fd_stat = os.fstat(self._queued_file.fileno()) 2295 if not S_ISREG(fd_stat.st_mode): 2296 # We won't be able to resume from this cache anyway, so don't 2297 # worry about further checks. 2298 self._dependencies[source] = tuple(fd_stat) 2299 elif path_stat is None: 2300 raise errors.AssertionError( 2301 u"could not stat {}: {}".format(source, stat_error)) 2302 elif path_stat.st_ino != fd_stat.st_ino: 2303 raise errors.AssertionError( 2304 u"{} changed between open and stat calls".format(source)) 2305 else: 2306 self._dependencies[src_path] = tuple(fd_stat) 2307 2308 def write(self, data): 2309 if self._queued_file is None: 2310 raise errors.AssertionError( 2311 "resumable writer can't accept unsourced data") 2312 return super(ResumableCollectionWriter, self).write(data)
Argument value for Collection
methods to represent an added item
Argument value for Collection
methods to represent a removed item
Argument value for Collection
methods to represent a modified item
Argument value for Collection
methods to represent an item with token differences
create_type
value for Collection.find_or_create
create_type
value for Collection.find_or_create
93class CollectionBase(object): 94 """Abstract base class for Collection classes 95 96 .. ATTENTION:: Internal 97 This class is meant to be used by other parts of the SDK. User code 98 should instantiate or subclass `Collection` or one of its subclasses 99 directly. 100 """ 101 102 def __enter__(self): 103 """Enter a context block with this collection instance""" 104 return self 105 106 def __exit__(self, exc_type, exc_value, traceback): 107 """Exit a context block with this collection instance""" 108 pass 109 110 def _my_keep(self): 111 if self._keep_client is None: 112 self._keep_client = KeepClient(api_client=self._api_client, 113 num_retries=self.num_retries) 114 return self._keep_client 115 116 def stripped_manifest(self) -> str: 117 """Create a copy of the collection manifest with only size hints 118 119 This method returns a string with the current collection's manifest 120 text with all non-portable locator hints like permission hints and 121 remote cluster hints removed. The only hints in the returned manifest 122 will be size hints. 123 """ 124 raw = self.manifest_text() 125 clean = [] 126 for line in raw.split("\n"): 127 fields = line.split() 128 if fields: 129 clean_fields = fields[:1] + [ 130 (re.sub(r'\+[^\d][^\+]*', '', x) 131 if re.match(arvados.util.keep_locator_pattern, x) 132 else x) 133 for x in fields[1:]] 134 clean += [' '.join(clean_fields), "\n"] 135 return ''.join(clean)
Abstract base class for Collection classes
116 def stripped_manifest(self) -> str: 117 """Create a copy of the collection manifest with only size hints 118 119 This method returns a string with the current collection's manifest 120 text with all non-portable locator hints like permission hints and 121 remote cluster hints removed. The only hints in the returned manifest 122 will be size hints. 123 """ 124 raw = self.manifest_text() 125 clean = [] 126 for line in raw.split("\n"): 127 fields = line.split() 128 if fields: 129 clean_fields = fields[:1] + [ 130 (re.sub(r'\+[^\d][^\+]*', '', x) 131 if re.match(arvados.util.keep_locator_pattern, x) 132 else x) 133 for x in fields[1:]] 134 clean += [' '.join(clean_fields), "\n"] 135 return ''.join(clean)
Create a copy of the collection manifest with only size hints
This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.
161class RichCollectionBase(CollectionBase): 162 """Base class for Collection classes 163 164 .. ATTENTION:: Internal 165 This class is meant to be used by other parts of the SDK. User code 166 should instantiate or subclass `Collection` or one of its subclasses 167 directly. 168 """ 169 170 def __init__(self, parent=None): 171 self.parent = parent 172 self._committed = False 173 self._has_remote_blocks = False 174 self._callback = None 175 self._items = {} 176 177 def _my_api(self): 178 raise NotImplementedError() 179 180 def _my_keep(self): 181 raise NotImplementedError() 182 183 def _my_block_manager(self): 184 raise NotImplementedError() 185 186 def writable(self) -> bool: 187 """Indicate whether this collection object can be modified 188 189 This method returns `False` if this object is a `CollectionReader`, 190 else `True`. 191 """ 192 raise NotImplementedError() 193 194 def root_collection(self) -> 'Collection': 195 """Get this collection's root collection object 196 197 If you open a subcollection with `Collection.find`, calling this method 198 on that subcollection returns the source Collection object. 199 """ 200 raise NotImplementedError() 201 202 def stream_name(self) -> str: 203 """Get the name of the manifest stream represented by this collection 204 205 If you open a subcollection with `Collection.find`, calling this method 206 on that subcollection returns the name of the stream you opened. 207 """ 208 raise NotImplementedError() 209 210 @synchronized 211 def has_remote_blocks(self) -> bool: 212 """Indiciate whether the collection refers to remote data 213 214 Returns `True` if the collection manifest includes any Keep locators 215 with a remote hint (`+R`), else `False`. 216 """ 217 if self._has_remote_blocks: 218 return True 219 for item in self: 220 if self[item].has_remote_blocks(): 221 return True 222 return False 223 224 @synchronized 225 def set_has_remote_blocks(self, val: bool) -> None: 226 """Cache whether this collection refers to remote blocks 227 228 .. ATTENTION:: Internal 229 This method is only meant to be used by other Collection methods. 230 231 Set this collection's cached "has remote blocks" flag to the given 232 value. 233 """ 234 self._has_remote_blocks = val 235 if self.parent: 236 self.parent.set_has_remote_blocks(val) 237 238 @must_be_writable 239 @synchronized 240 def find_or_create( 241 self, 242 path: str, 243 create_type: CreateType, 244 ) -> CollectionItem: 245 """Get the item at the given path, creating it if necessary 246 247 If `path` refers to a stream in this collection, returns a 248 corresponding `Subcollection` object. If `path` refers to a file in 249 this collection, returns a corresponding 250 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 251 this collection, then this method creates a new object and returns 252 it, creating parent streams as needed. The type of object created is 253 determined by the value of `create_type`. 254 255 Arguments: 256 257 * path: str --- The path to find or create within this collection. 258 259 * create_type: Literal[COLLECTION, FILE] --- The type of object to 260 create at `path` if one does not exist. Passing `COLLECTION` 261 creates a stream and returns the corresponding 262 `Subcollection`. Passing `FILE` creates a new file and returns the 263 corresponding `arvados.arvfile.ArvadosFile`. 264 """ 265 pathcomponents = path.split("/", 1) 266 if pathcomponents[0]: 267 item = self._items.get(pathcomponents[0]) 268 if len(pathcomponents) == 1: 269 if item is None: 270 # create new file 271 if create_type == COLLECTION: 272 item = Subcollection(self, pathcomponents[0]) 273 else: 274 item = ArvadosFile(self, pathcomponents[0]) 275 self._items[pathcomponents[0]] = item 276 self.set_committed(False) 277 self.notify(ADD, self, pathcomponents[0], item) 278 return item 279 else: 280 if item is None: 281 # create new collection 282 item = Subcollection(self, pathcomponents[0]) 283 self._items[pathcomponents[0]] = item 284 self.set_committed(False) 285 self.notify(ADD, self, pathcomponents[0], item) 286 if isinstance(item, RichCollectionBase): 287 return item.find_or_create(pathcomponents[1], create_type) 288 else: 289 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 290 else: 291 return self 292 293 @synchronized 294 def find(self, path: str) -> CollectionItem: 295 """Get the item at the given path 296 297 If `path` refers to a stream in this collection, returns a 298 corresponding `Subcollection` object. If `path` refers to a file in 299 this collection, returns a corresponding 300 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 301 this collection, then this method raises `NotADirectoryError`. 302 303 Arguments: 304 305 * path: str --- The path to find or create within this collection. 306 """ 307 if not path: 308 raise errors.ArgumentError("Parameter 'path' is empty.") 309 310 pathcomponents = path.split("/", 1) 311 if pathcomponents[0] == '': 312 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 313 314 item = self._items.get(pathcomponents[0]) 315 if item is None: 316 return None 317 elif len(pathcomponents) == 1: 318 return item 319 else: 320 if isinstance(item, RichCollectionBase): 321 if pathcomponents[1]: 322 return item.find(pathcomponents[1]) 323 else: 324 return item 325 else: 326 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 327 328 @synchronized 329 def mkdirs(self, path: str) -> 'Subcollection': 330 """Create and return a subcollection at `path` 331 332 If `path` exists within this collection, raises `FileExistsError`. 333 Otherwise, creates a stream at that path and returns the 334 corresponding `Subcollection`. 335 """ 336 if self.find(path) != None: 337 raise IOError(errno.EEXIST, "Directory or file exists", path) 338 339 return self.find_or_create(path, COLLECTION) 340 341 def open( 342 self, 343 path: str, 344 mode: str="r", 345 encoding: Optional[str]=None, 346 ) -> IO: 347 """Open a file-like object within the collection 348 349 This method returns a file-like object that can read and/or write the 350 file located at `path` within the collection. If you attempt to write 351 a `path` that does not exist, the file is created with `find_or_create`. 352 If the file cannot be opened for any other reason, this method raises 353 `OSError` with an appropriate errno. 354 355 Arguments: 356 357 * path: str --- The path of the file to open within this collection 358 359 * mode: str --- The mode to open this file. Supports all the same 360 values as `builtins.open`. 361 362 * encoding: str | None --- The text encoding of the file. Only used 363 when the file is opened in text mode. The default is 364 platform-dependent. 365 """ 366 if not re.search(r'^[rwa][bt]?\+?$', mode): 367 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 368 369 if mode[0] == 'r' and '+' not in mode: 370 fclass = ArvadosFileReader 371 arvfile = self.find(path) 372 elif not self.writable(): 373 raise IOError(errno.EROFS, "Collection is read only") 374 else: 375 fclass = ArvadosFileWriter 376 arvfile = self.find_or_create(path, FILE) 377 378 if arvfile is None: 379 raise IOError(errno.ENOENT, "File not found", path) 380 if not isinstance(arvfile, ArvadosFile): 381 raise IOError(errno.EISDIR, "Is a directory", path) 382 383 if mode[0] == 'w': 384 arvfile.truncate(0) 385 386 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 387 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 388 if 'b' not in mode: 389 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 390 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 391 return f 392 393 def modified(self) -> bool: 394 """Indicate whether this collection has an API server record 395 396 Returns `False` if this collection corresponds to a record loaded from 397 the API server, `True` otherwise. 398 """ 399 return not self.committed() 400 401 @synchronized 402 def committed(self): 403 """Indicate whether this collection has an API server record 404 405 Returns `True` if this collection corresponds to a record loaded from 406 the API server, `False` otherwise. 407 """ 408 return self._committed 409 410 @synchronized 411 def set_committed(self, value: bool=True): 412 """Cache whether this collection has an API server record 413 414 .. ATTENTION:: Internal 415 This method is only meant to be used by other Collection methods. 416 417 Set this collection's cached "committed" flag to the given 418 value and propagates it as needed. 419 """ 420 if value == self._committed: 421 return 422 if value: 423 for k,v in listitems(self._items): 424 v.set_committed(True) 425 self._committed = True 426 else: 427 self._committed = False 428 if self.parent is not None: 429 self.parent.set_committed(False) 430 431 @synchronized 432 def __iter__(self) -> Iterator[str]: 433 """Iterate names of streams and files in this collection 434 435 This method does not recurse. It only iterates the contents of this 436 collection's corresponding stream. 437 """ 438 return iter(viewkeys(self._items)) 439 440 @synchronized 441 def __getitem__(self, k: str) -> CollectionItem: 442 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 443 444 This method does not recurse. If you want to search a path, use 445 `RichCollectionBase.find` instead. 446 """ 447 return self._items[k] 448 449 @synchronized 450 def __contains__(self, k: str) -> bool: 451 """Indicate whether this collection has an item with this name 452 453 This method does not recurse. It you want to check a path, use 454 `RichCollectionBase.exists` instead. 455 """ 456 return k in self._items 457 458 @synchronized 459 def __len__(self): 460 """Get the number of items directly contained in this collection 461 462 This method does not recurse. It only counts the streams and files 463 in this collection's corresponding stream. 464 """ 465 return len(self._items) 466 467 @must_be_writable 468 @synchronized 469 def __delitem__(self, p: str) -> None: 470 """Delete an item from this collection's stream 471 472 This method does not recurse. If you want to remove an item by a 473 path, use `RichCollectionBase.remove` instead. 474 """ 475 del self._items[p] 476 self.set_committed(False) 477 self.notify(DEL, self, p, None) 478 479 @synchronized 480 def keys(self) -> Iterator[str]: 481 """Iterate names of streams and files in this collection 482 483 This method does not recurse. It only iterates the contents of this 484 collection's corresponding stream. 485 """ 486 return self._items.keys() 487 488 @synchronized 489 def values(self) -> List[CollectionItem]: 490 """Get a list of objects in this collection's stream 491 492 The return value includes a `Subcollection` for every stream, and an 493 `arvados.arvfile.ArvadosFile` for every file, directly within this 494 collection's stream. This method does not recurse. 495 """ 496 return listvalues(self._items) 497 498 @synchronized 499 def items(self) -> List[Tuple[str, CollectionItem]]: 500 """Get a list of `(name, object)` tuples from this collection's stream 501 502 The return value includes a `Subcollection` for every stream, and an 503 `arvados.arvfile.ArvadosFile` for every file, directly within this 504 collection's stream. This method does not recurse. 505 """ 506 return listitems(self._items) 507 508 def exists(self, path: str) -> bool: 509 """Indicate whether this collection includes an item at `path` 510 511 This method returns `True` if `path` refers to a stream or file within 512 this collection, else `False`. 513 514 Arguments: 515 516 * path: str --- The path to check for existence within this collection 517 """ 518 return self.find(path) is not None 519 520 @must_be_writable 521 @synchronized 522 def remove(self, path: str, recursive: bool=False) -> None: 523 """Remove the file or stream at `path` 524 525 Arguments: 526 527 * path: str --- The path of the item to remove from the collection 528 529 * recursive: bool --- Controls the method's behavior if `path` refers 530 to a nonempty stream. If `False` (the default), this method raises 531 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 532 items under the stream. 533 """ 534 if not path: 535 raise errors.ArgumentError("Parameter 'path' is empty.") 536 537 pathcomponents = path.split("/", 1) 538 item = self._items.get(pathcomponents[0]) 539 if item is None: 540 raise IOError(errno.ENOENT, "File not found", path) 541 if len(pathcomponents) == 1: 542 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 543 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 544 deleteditem = self._items[pathcomponents[0]] 545 del self._items[pathcomponents[0]] 546 self.set_committed(False) 547 self.notify(DEL, self, pathcomponents[0], deleteditem) 548 else: 549 item.remove(pathcomponents[1], recursive=recursive) 550 551 def _clonefrom(self, source): 552 for k,v in listitems(source): 553 self._items[k] = v.clone(self, k) 554 555 def clone(self): 556 raise NotImplementedError() 557 558 @must_be_writable 559 @synchronized 560 def add( 561 self, 562 source_obj: CollectionItem, 563 target_name: str, 564 overwrite: bool=False, 565 reparent: bool=False, 566 ) -> None: 567 """Copy or move a file or subcollection object to this collection 568 569 Arguments: 570 571 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 572 to add to this collection 573 574 * target_name: str --- The path inside this collection where 575 `source_obj` should be added. 576 577 * overwrite: bool --- Controls the behavior of this method when the 578 collection already contains an object at `target_name`. If `False` 579 (the default), this method will raise `FileExistsError`. If `True`, 580 the object at `target_name` will be replaced with `source_obj`. 581 582 * reparent: bool --- Controls whether this method copies or moves 583 `source_obj`. If `False` (the default), `source_obj` is copied into 584 this collection. If `True`, `source_obj` is moved into this 585 collection. 586 """ 587 if target_name in self and not overwrite: 588 raise IOError(errno.EEXIST, "File already exists", target_name) 589 590 modified_from = None 591 if target_name in self: 592 modified_from = self[target_name] 593 594 # Actually make the move or copy. 595 if reparent: 596 source_obj._reparent(self, target_name) 597 item = source_obj 598 else: 599 item = source_obj.clone(self, target_name) 600 601 self._items[target_name] = item 602 self.set_committed(False) 603 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 604 self.set_has_remote_blocks(True) 605 606 if modified_from: 607 self.notify(MOD, self, target_name, (modified_from, item)) 608 else: 609 self.notify(ADD, self, target_name, item) 610 611 def _get_src_target(self, source, target_path, source_collection, create_dest): 612 if source_collection is None: 613 source_collection = self 614 615 # Find the object 616 if isinstance(source, basestring): 617 source_obj = source_collection.find(source) 618 if source_obj is None: 619 raise IOError(errno.ENOENT, "File not found", source) 620 sourcecomponents = source.split("/") 621 else: 622 source_obj = source 623 sourcecomponents = None 624 625 # Find parent collection the target path 626 targetcomponents = target_path.split("/") 627 628 # Determine the name to use. 629 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 630 631 if not target_name: 632 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 633 634 if create_dest: 635 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 636 else: 637 if len(targetcomponents) > 1: 638 target_dir = self.find("/".join(targetcomponents[0:-1])) 639 else: 640 target_dir = self 641 642 if target_dir is None: 643 raise IOError(errno.ENOENT, "Target directory not found", target_name) 644 645 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 646 target_dir = target_dir[target_name] 647 target_name = sourcecomponents[-1] 648 649 return (source_obj, target_dir, target_name) 650 651 @must_be_writable 652 @synchronized 653 def copy( 654 self, 655 source: Union[str, CollectionItem], 656 target_path: str, 657 source_collection: Optional['RichCollectionBase']=None, 658 overwrite: bool=False, 659 ) -> None: 660 """Copy a file or subcollection object to this collection 661 662 Arguments: 663 664 * source: str | arvados.arvfile.ArvadosFile | 665 arvados.collection.Subcollection --- The file or subcollection to 666 add to this collection. If `source` is a str, the object will be 667 found by looking up this path from `source_collection` (see 668 below). 669 670 * target_path: str --- The path inside this collection where the 671 source object should be added. 672 673 * source_collection: arvados.collection.Collection | None --- The 674 collection to find the source object from when `source` is a 675 path. Defaults to the current collection (`self`). 676 677 * overwrite: bool --- Controls the behavior of this method when the 678 collection already contains an object at `target_path`. If `False` 679 (the default), this method will raise `FileExistsError`. If `True`, 680 the object at `target_path` will be replaced with `source_obj`. 681 """ 682 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 683 target_dir.add(source_obj, target_name, overwrite, False) 684 685 @must_be_writable 686 @synchronized 687 def rename( 688 self, 689 source: Union[str, CollectionItem], 690 target_path: str, 691 source_collection: Optional['RichCollectionBase']=None, 692 overwrite: bool=False, 693 ) -> None: 694 """Move a file or subcollection object to this collection 695 696 Arguments: 697 698 * source: str | arvados.arvfile.ArvadosFile | 699 arvados.collection.Subcollection --- The file or subcollection to 700 add to this collection. If `source` is a str, the object will be 701 found by looking up this path from `source_collection` (see 702 below). 703 704 * target_path: str --- The path inside this collection where the 705 source object should be added. 706 707 * source_collection: arvados.collection.Collection | None --- The 708 collection to find the source object from when `source` is a 709 path. Defaults to the current collection (`self`). 710 711 * overwrite: bool --- Controls the behavior of this method when the 712 collection already contains an object at `target_path`. If `False` 713 (the default), this method will raise `FileExistsError`. If `True`, 714 the object at `target_path` will be replaced with `source_obj`. 715 """ 716 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 717 if not source_obj.writable(): 718 raise IOError(errno.EROFS, "Source collection is read only", source) 719 target_dir.add(source_obj, target_name, overwrite, True) 720 721 def portable_manifest_text(self, stream_name: str=".") -> str: 722 """Get the portable manifest text for this collection 723 724 The portable manifest text is normalized, and does not include access 725 tokens. This method does not flush outstanding blocks to Keep. 726 727 Arguments: 728 729 * stream_name: str --- The name to use for this collection's stream in 730 the generated manifest. Default `'.'`. 731 """ 732 return self._get_manifest_text(stream_name, True, True) 733 734 @synchronized 735 def manifest_text( 736 self, 737 stream_name: str=".", 738 strip: bool=False, 739 normalize: bool=False, 740 only_committed: bool=False, 741 ) -> str: 742 """Get the manifest text for this collection 743 744 Arguments: 745 746 * stream_name: str --- The name to use for this collection's stream in 747 the generated manifest. Default `'.'`. 748 749 * strip: bool --- Controls whether or not the returned manifest text 750 includes access tokens. If `False` (the default), the manifest text 751 will include access tokens. If `True`, the manifest text will not 752 include access tokens. 753 754 * normalize: bool --- Controls whether or not the returned manifest 755 text is normalized. Default `False`. 756 757 * only_committed: bool --- Controls whether or not this method uploads 758 pending data to Keep before building and returning the manifest text. 759 If `False` (the default), this method will finish uploading all data 760 to Keep, then return the final manifest. If `True`, this method will 761 build and return a manifest that only refers to the data that has 762 finished uploading at the time this method was called. 763 """ 764 if not only_committed: 765 self._my_block_manager().commit_all() 766 return self._get_manifest_text(stream_name, strip, normalize, 767 only_committed=only_committed) 768 769 @synchronized 770 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 771 """Get the manifest text for this collection, sub collections and files. 772 773 :stream_name: 774 Name to use for this stream (directory) 775 776 :strip: 777 If True, remove signing tokens from block locators if present. 778 If False (default), block locators are left unchanged. 779 780 :normalize: 781 If True, always export the manifest text in normalized form 782 even if the Collection is not modified. If False (default) and the collection 783 is not modified, return the original manifest text even if it is not 784 in normalized form. 785 786 :only_committed: 787 If True, only include blocks that were already committed to Keep. 788 789 """ 790 791 if not self.committed() or self._manifest_text is None or normalize: 792 stream = {} 793 buf = [] 794 sorted_keys = sorted(self.keys()) 795 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 796 # Create a stream per file `k` 797 arvfile = self[filename] 798 filestream = [] 799 for segment in arvfile.segments(): 800 loc = segment.locator 801 if arvfile.parent._my_block_manager().is_bufferblock(loc): 802 if only_committed: 803 continue 804 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 805 if strip: 806 loc = KeepLocator(loc).stripped() 807 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 808 segment.segment_offset, segment.range_size)) 809 stream[filename] = filestream 810 if stream: 811 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 812 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 813 buf.append(self[dirname].manifest_text( 814 stream_name=os.path.join(stream_name, dirname), 815 strip=strip, normalize=True, only_committed=only_committed)) 816 return "".join(buf) 817 else: 818 if strip: 819 return self.stripped_manifest() 820 else: 821 return self._manifest_text 822 823 @synchronized 824 def _copy_remote_blocks(self, remote_blocks={}): 825 """Scan through the entire collection and ask Keep to copy remote blocks. 826 827 When accessing a remote collection, blocks will have a remote signature 828 (+R instead of +A). Collect these signatures and request Keep to copy the 829 blocks to the local cluster, returning local (+A) signatures. 830 831 :remote_blocks: 832 Shared cache of remote to local block mappings. This is used to avoid 833 doing extra work when blocks are shared by more than one file in 834 different subdirectories. 835 836 """ 837 for item in self: 838 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 839 return remote_blocks 840 841 @synchronized 842 def diff( 843 self, 844 end_collection: 'RichCollectionBase', 845 prefix: str=".", 846 holding_collection: Optional['Collection']=None, 847 ) -> ChangeList: 848 """Build a list of differences between this collection and another 849 850 Arguments: 851 852 * end_collection: arvados.collection.RichCollectionBase --- A 853 collection object with the desired end state. The returned diff 854 list will describe how to go from the current collection object 855 `self` to `end_collection`. 856 857 * prefix: str --- The name to use for this collection's stream in 858 the diff list. Default `'.'`. 859 860 * holding_collection: arvados.collection.Collection | None --- A 861 collection object used to hold objects for the returned diff 862 list. By default, a new empty collection is created. 863 """ 864 changes = [] 865 if holding_collection is None: 866 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 867 for k in self: 868 if k not in end_collection: 869 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 870 for k in end_collection: 871 if k in self: 872 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 873 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 874 elif end_collection[k] != self[k]: 875 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 876 else: 877 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 878 else: 879 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 880 return changes 881 882 @must_be_writable 883 @synchronized 884 def apply(self, changes: ChangeList) -> None: 885 """Apply a list of changes from to this collection 886 887 This method takes a list of changes generated by 888 `RichCollectionBase.diff` and applies it to this 889 collection. Afterward, the state of this collection object will 890 match the state of `end_collection` passed to `diff`. If a change 891 conflicts with a local change, it will be saved to an alternate path 892 indicating the conflict. 893 894 Arguments: 895 896 * changes: arvados.collection.ChangeList --- The list of differences 897 generated by `RichCollectionBase.diff`. 898 """ 899 if changes: 900 self.set_committed(False) 901 for change in changes: 902 event_type = change[0] 903 path = change[1] 904 initial = change[2] 905 local = self.find(path) 906 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 907 time.gmtime())) 908 if event_type == ADD: 909 if local is None: 910 # No local file at path, safe to copy over new file 911 self.copy(initial, path) 912 elif local is not None and local != initial: 913 # There is already local file and it is different: 914 # save change to conflict file. 915 self.copy(initial, conflictpath) 916 elif event_type == MOD or event_type == TOK: 917 final = change[3] 918 if local == initial: 919 # Local matches the "initial" item so it has not 920 # changed locally and is safe to update. 921 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 922 # Replace contents of local file with new contents 923 local.replace_contents(final) 924 else: 925 # Overwrite path with new item; this can happen if 926 # path was a file and is now a collection or vice versa 927 self.copy(final, path, overwrite=True) 928 else: 929 # Local is missing (presumably deleted) or local doesn't 930 # match the "start" value, so save change to conflict file 931 self.copy(final, conflictpath) 932 elif event_type == DEL: 933 if local == initial: 934 # Local item matches "initial" value, so it is safe to remove. 935 self.remove(path, recursive=True) 936 # else, the file is modified or already removed, in either 937 # case we don't want to try to remove it. 938 939 def portable_data_hash(self) -> str: 940 """Get the portable data hash for this collection's manifest""" 941 if self._manifest_locator and self.committed(): 942 # If the collection is already saved on the API server, and it's committed 943 # then return API server's PDH response. 944 return self._portable_data_hash 945 else: 946 stripped = self.portable_manifest_text().encode() 947 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 948 949 @synchronized 950 def subscribe(self, callback: ChangeCallback) -> None: 951 """Set a notify callback for changes to this collection 952 953 Arguments: 954 955 * callback: arvados.collection.ChangeCallback --- The callable to 956 call each time the collection is changed. 957 """ 958 if self._callback is None: 959 self._callback = callback 960 else: 961 raise errors.ArgumentError("A callback is already set on this collection.") 962 963 @synchronized 964 def unsubscribe(self) -> None: 965 """Remove any notify callback set for changes to this collection""" 966 if self._callback is not None: 967 self._callback = None 968 969 @synchronized 970 def notify( 971 self, 972 event: ChangeType, 973 collection: 'RichCollectionBase', 974 name: str, 975 item: CollectionItem, 976 ) -> None: 977 """Notify any subscribed callback about a change to this collection 978 979 .. ATTENTION:: Internal 980 This method is only meant to be used by other Collection methods. 981 982 If a callback has been registered with `RichCollectionBase.subscribe`, 983 it will be called with information about a change to this collection. 984 Then this notification will be propagated to this collection's root. 985 986 Arguments: 987 988 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 989 the collection. 990 991 * collection: arvados.collection.RichCollectionBase --- The 992 collection that was modified. 993 994 * name: str --- The name of the file or stream within `collection` that 995 was modified. 996 997 * item: arvados.arvfile.ArvadosFile | 998 arvados.collection.Subcollection --- The new contents at `name` 999 within `collection`. 1000 """ 1001 if self._callback: 1002 self._callback(event, collection, name, item) 1003 self.root_collection().notify(event, collection, name, item) 1004 1005 @synchronized 1006 def __eq__(self, other: Any) -> bool: 1007 """Indicate whether this collection object is equal to another""" 1008 if other is self: 1009 return True 1010 if not isinstance(other, RichCollectionBase): 1011 return False 1012 if len(self._items) != len(other): 1013 return False 1014 for k in self._items: 1015 if k not in other: 1016 return False 1017 if self._items[k] != other[k]: 1018 return False 1019 return True 1020 1021 def __ne__(self, other: Any) -> bool: 1022 """Indicate whether this collection object is not equal to another""" 1023 return not self.__eq__(other) 1024 1025 @synchronized 1026 def flush(self) -> None: 1027 """Upload any pending data to Keep""" 1028 for e in listvalues(self): 1029 e.flush()
Base class for Collection classes
186 def writable(self) -> bool: 187 """Indicate whether this collection object can be modified 188 189 This method returns `False` if this object is a `CollectionReader`, 190 else `True`. 191 """ 192 raise NotImplementedError()
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
194 def root_collection(self) -> 'Collection': 195 """Get this collection's root collection object 196 197 If you open a subcollection with `Collection.find`, calling this method 198 on that subcollection returns the source Collection object. 199 """ 200 raise NotImplementedError()
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
202 def stream_name(self) -> str: 203 """Get the name of the manifest stream represented by this collection 204 205 If you open a subcollection with `Collection.find`, calling this method 206 on that subcollection returns the name of the stream you opened. 207 """ 208 raise NotImplementedError()
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
210 @synchronized 211 def has_remote_blocks(self) -> bool: 212 """Indiciate whether the collection refers to remote data 213 214 Returns `True` if the collection manifest includes any Keep locators 215 with a remote hint (`+R`), else `False`. 216 """ 217 if self._has_remote_blocks: 218 return True 219 for item in self: 220 if self[item].has_remote_blocks(): 221 return True 222 return False
Indiciate whether the collection refers to remote data
Returns True
if the collection manifest includes any Keep locators
with a remote hint (+R
), else False
.
224 @synchronized 225 def set_has_remote_blocks(self, val: bool) -> None: 226 """Cache whether this collection refers to remote blocks 227 228 .. ATTENTION:: Internal 229 This method is only meant to be used by other Collection methods. 230 231 Set this collection's cached "has remote blocks" flag to the given 232 value. 233 """ 234 self._has_remote_blocks = val 235 if self.parent: 236 self.parent.set_has_remote_blocks(val)
Cache whether this collection refers to remote blocks
Set this collection’s cached “has remote blocks” flag to the given value.
238 @must_be_writable 239 @synchronized 240 def find_or_create( 241 self, 242 path: str, 243 create_type: CreateType, 244 ) -> CollectionItem: 245 """Get the item at the given path, creating it if necessary 246 247 If `path` refers to a stream in this collection, returns a 248 corresponding `Subcollection` object. If `path` refers to a file in 249 this collection, returns a corresponding 250 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 251 this collection, then this method creates a new object and returns 252 it, creating parent streams as needed. The type of object created is 253 determined by the value of `create_type`. 254 255 Arguments: 256 257 * path: str --- The path to find or create within this collection. 258 259 * create_type: Literal[COLLECTION, FILE] --- The type of object to 260 create at `path` if one does not exist. Passing `COLLECTION` 261 creates a stream and returns the corresponding 262 `Subcollection`. Passing `FILE` creates a new file and returns the 263 corresponding `arvados.arvfile.ArvadosFile`. 264 """ 265 pathcomponents = path.split("/", 1) 266 if pathcomponents[0]: 267 item = self._items.get(pathcomponents[0]) 268 if len(pathcomponents) == 1: 269 if item is None: 270 # create new file 271 if create_type == COLLECTION: 272 item = Subcollection(self, pathcomponents[0]) 273 else: 274 item = ArvadosFile(self, pathcomponents[0]) 275 self._items[pathcomponents[0]] = item 276 self.set_committed(False) 277 self.notify(ADD, self, pathcomponents[0], item) 278 return item 279 else: 280 if item is None: 281 # create new collection 282 item = Subcollection(self, pathcomponents[0]) 283 self._items[pathcomponents[0]] = item 284 self.set_committed(False) 285 self.notify(ADD, self, pathcomponents[0], item) 286 if isinstance(item, RichCollectionBase): 287 return item.find_or_create(pathcomponents[1], create_type) 288 else: 289 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 290 else: 291 return self
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
293 @synchronized 294 def find(self, path: str) -> CollectionItem: 295 """Get the item at the given path 296 297 If `path` refers to a stream in this collection, returns a 298 corresponding `Subcollection` object. If `path` refers to a file in 299 this collection, returns a corresponding 300 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 301 this collection, then this method raises `NotADirectoryError`. 302 303 Arguments: 304 305 * path: str --- The path to find or create within this collection. 306 """ 307 if not path: 308 raise errors.ArgumentError("Parameter 'path' is empty.") 309 310 pathcomponents = path.split("/", 1) 311 if pathcomponents[0] == '': 312 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 313 314 item = self._items.get(pathcomponents[0]) 315 if item is None: 316 return None 317 elif len(pathcomponents) == 1: 318 return item 319 else: 320 if isinstance(item, RichCollectionBase): 321 if pathcomponents[1]: 322 return item.find(pathcomponents[1]) 323 else: 324 return item 325 else: 326 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
328 @synchronized 329 def mkdirs(self, path: str) -> 'Subcollection': 330 """Create and return a subcollection at `path` 331 332 If `path` exists within this collection, raises `FileExistsError`. 333 Otherwise, creates a stream at that path and returns the 334 corresponding `Subcollection`. 335 """ 336 if self.find(path) != None: 337 raise IOError(errno.EEXIST, "Directory or file exists", path) 338 339 return self.find_or_create(path, COLLECTION)
Create and return a subcollection at path
If path
exists within this collection, raises FileExistsError
.
Otherwise, creates a stream at that path and returns the
corresponding Subcollection
.
341 def open( 342 self, 343 path: str, 344 mode: str="r", 345 encoding: Optional[str]=None, 346 ) -> IO: 347 """Open a file-like object within the collection 348 349 This method returns a file-like object that can read and/or write the 350 file located at `path` within the collection. If you attempt to write 351 a `path` that does not exist, the file is created with `find_or_create`. 352 If the file cannot be opened for any other reason, this method raises 353 `OSError` with an appropriate errno. 354 355 Arguments: 356 357 * path: str --- The path of the file to open within this collection 358 359 * mode: str --- The mode to open this file. Supports all the same 360 values as `builtins.open`. 361 362 * encoding: str | None --- The text encoding of the file. Only used 363 when the file is opened in text mode. The default is 364 platform-dependent. 365 """ 366 if not re.search(r'^[rwa][bt]?\+?$', mode): 367 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 368 369 if mode[0] == 'r' and '+' not in mode: 370 fclass = ArvadosFileReader 371 arvfile = self.find(path) 372 elif not self.writable(): 373 raise IOError(errno.EROFS, "Collection is read only") 374 else: 375 fclass = ArvadosFileWriter 376 arvfile = self.find_or_create(path, FILE) 377 378 if arvfile is None: 379 raise IOError(errno.ENOENT, "File not found", path) 380 if not isinstance(arvfile, ArvadosFile): 381 raise IOError(errno.EISDIR, "Is a directory", path) 382 383 if mode[0] == 'w': 384 arvfile.truncate(0) 385 386 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 387 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 388 if 'b' not in mode: 389 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 390 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 391 return f
Open a file-like object within the collection
This method returns a file-like object that can read and/or write the
file located at path
within the collection. If you attempt to write
a path
that does not exist, the file is created with find_or_create
.
If the file cannot be opened for any other reason, this method raises
OSError
with an appropriate errno.
Arguments:
path: str — The path of the file to open within this collection
mode: str — The mode to open this file. Supports all the same values as
builtins.open
.encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.
393 def modified(self) -> bool: 394 """Indicate whether this collection has an API server record 395 396 Returns `False` if this collection corresponds to a record loaded from 397 the API server, `True` otherwise. 398 """ 399 return not self.committed()
Indicate whether this collection has an API server record
Returns False
if this collection corresponds to a record loaded from
the API server, True
otherwise.
401 @synchronized 402 def committed(self): 403 """Indicate whether this collection has an API server record 404 405 Returns `True` if this collection corresponds to a record loaded from 406 the API server, `False` otherwise. 407 """ 408 return self._committed
Indicate whether this collection has an API server record
Returns True
if this collection corresponds to a record loaded from
the API server, False
otherwise.
410 @synchronized 411 def set_committed(self, value: bool=True): 412 """Cache whether this collection has an API server record 413 414 .. ATTENTION:: Internal 415 This method is only meant to be used by other Collection methods. 416 417 Set this collection's cached "committed" flag to the given 418 value and propagates it as needed. 419 """ 420 if value == self._committed: 421 return 422 if value: 423 for k,v in listitems(self._items): 424 v.set_committed(True) 425 self._committed = True 426 else: 427 self._committed = False 428 if self.parent is not None: 429 self.parent.set_committed(False)
Cache whether this collection has an API server record
Set this collection’s cached “committed” flag to the given value and propagates it as needed.
479 @synchronized 480 def keys(self) -> Iterator[str]: 481 """Iterate names of streams and files in this collection 482 483 This method does not recurse. It only iterates the contents of this 484 collection's corresponding stream. 485 """ 486 return self._items.keys()
Iterate names of streams and files in this collection
This method does not recurse. It only iterates the contents of this collection’s corresponding stream.
488 @synchronized 489 def values(self) -> List[CollectionItem]: 490 """Get a list of objects in this collection's stream 491 492 The return value includes a `Subcollection` for every stream, and an 493 `arvados.arvfile.ArvadosFile` for every file, directly within this 494 collection's stream. This method does not recurse. 495 """ 496 return listvalues(self._items)
Get a list of objects in this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
498 @synchronized 499 def items(self) -> List[Tuple[str, CollectionItem]]: 500 """Get a list of `(name, object)` tuples from this collection's stream 501 502 The return value includes a `Subcollection` for every stream, and an 503 `arvados.arvfile.ArvadosFile` for every file, directly within this 504 collection's stream. This method does not recurse. 505 """ 506 return listitems(self._items)
Get a list of (name, object)
tuples from this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
508 def exists(self, path: str) -> bool: 509 """Indicate whether this collection includes an item at `path` 510 511 This method returns `True` if `path` refers to a stream or file within 512 this collection, else `False`. 513 514 Arguments: 515 516 * path: str --- The path to check for existence within this collection 517 """ 518 return self.find(path) is not None
Indicate whether this collection includes an item at path
This method returns True
if path
refers to a stream or file within
this collection, else False
.
Arguments:
- path: str — The path to check for existence within this collection
520 @must_be_writable 521 @synchronized 522 def remove(self, path: str, recursive: bool=False) -> None: 523 """Remove the file or stream at `path` 524 525 Arguments: 526 527 * path: str --- The path of the item to remove from the collection 528 529 * recursive: bool --- Controls the method's behavior if `path` refers 530 to a nonempty stream. If `False` (the default), this method raises 531 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 532 items under the stream. 533 """ 534 if not path: 535 raise errors.ArgumentError("Parameter 'path' is empty.") 536 537 pathcomponents = path.split("/", 1) 538 item = self._items.get(pathcomponents[0]) 539 if item is None: 540 raise IOError(errno.ENOENT, "File not found", path) 541 if len(pathcomponents) == 1: 542 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 543 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 544 deleteditem = self._items[pathcomponents[0]] 545 del self._items[pathcomponents[0]] 546 self.set_committed(False) 547 self.notify(DEL, self, pathcomponents[0], deleteditem) 548 else: 549 item.remove(pathcomponents[1], recursive=recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
558 @must_be_writable 559 @synchronized 560 def add( 561 self, 562 source_obj: CollectionItem, 563 target_name: str, 564 overwrite: bool=False, 565 reparent: bool=False, 566 ) -> None: 567 """Copy or move a file or subcollection object to this collection 568 569 Arguments: 570 571 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 572 to add to this collection 573 574 * target_name: str --- The path inside this collection where 575 `source_obj` should be added. 576 577 * overwrite: bool --- Controls the behavior of this method when the 578 collection already contains an object at `target_name`. If `False` 579 (the default), this method will raise `FileExistsError`. If `True`, 580 the object at `target_name` will be replaced with `source_obj`. 581 582 * reparent: bool --- Controls whether this method copies or moves 583 `source_obj`. If `False` (the default), `source_obj` is copied into 584 this collection. If `True`, `source_obj` is moved into this 585 collection. 586 """ 587 if target_name in self and not overwrite: 588 raise IOError(errno.EEXIST, "File already exists", target_name) 589 590 modified_from = None 591 if target_name in self: 592 modified_from = self[target_name] 593 594 # Actually make the move or copy. 595 if reparent: 596 source_obj._reparent(self, target_name) 597 item = source_obj 598 else: 599 item = source_obj.clone(self, target_name) 600 601 self._items[target_name] = item 602 self.set_committed(False) 603 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 604 self.set_has_remote_blocks(True) 605 606 if modified_from: 607 self.notify(MOD, self, target_name, (modified_from, item)) 608 else: 609 self.notify(ADD, self, target_name, item)
Copy or move a file or subcollection object to this collection
Arguments:
source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection
target_name: str — The path inside this collection where
source_obj
should be added.overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_name
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_name
will be replaced withsource_obj
.reparent: bool — Controls whether this method copies or moves
source_obj
. IfFalse
(the default),source_obj
is copied into this collection. IfTrue
,source_obj
is moved into this collection.
651 @must_be_writable 652 @synchronized 653 def copy( 654 self, 655 source: Union[str, CollectionItem], 656 target_path: str, 657 source_collection: Optional['RichCollectionBase']=None, 658 overwrite: bool=False, 659 ) -> None: 660 """Copy a file or subcollection object to this collection 661 662 Arguments: 663 664 * source: str | arvados.arvfile.ArvadosFile | 665 arvados.collection.Subcollection --- The file or subcollection to 666 add to this collection. If `source` is a str, the object will be 667 found by looking up this path from `source_collection` (see 668 below). 669 670 * target_path: str --- The path inside this collection where the 671 source object should be added. 672 673 * source_collection: arvados.collection.Collection | None --- The 674 collection to find the source object from when `source` is a 675 path. Defaults to the current collection (`self`). 676 677 * overwrite: bool --- Controls the behavior of this method when the 678 collection already contains an object at `target_path`. If `False` 679 (the default), this method will raise `FileExistsError`. If `True`, 680 the object at `target_path` will be replaced with `source_obj`. 681 """ 682 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 683 target_dir.add(source_obj, target_name, overwrite, False)
Copy a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
685 @must_be_writable 686 @synchronized 687 def rename( 688 self, 689 source: Union[str, CollectionItem], 690 target_path: str, 691 source_collection: Optional['RichCollectionBase']=None, 692 overwrite: bool=False, 693 ) -> None: 694 """Move a file or subcollection object to this collection 695 696 Arguments: 697 698 * source: str | arvados.arvfile.ArvadosFile | 699 arvados.collection.Subcollection --- The file or subcollection to 700 add to this collection. If `source` is a str, the object will be 701 found by looking up this path from `source_collection` (see 702 below). 703 704 * target_path: str --- The path inside this collection where the 705 source object should be added. 706 707 * source_collection: arvados.collection.Collection | None --- The 708 collection to find the source object from when `source` is a 709 path. Defaults to the current collection (`self`). 710 711 * overwrite: bool --- Controls the behavior of this method when the 712 collection already contains an object at `target_path`. If `False` 713 (the default), this method will raise `FileExistsError`. If `True`, 714 the object at `target_path` will be replaced with `source_obj`. 715 """ 716 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 717 if not source_obj.writable(): 718 raise IOError(errno.EROFS, "Source collection is read only", source) 719 target_dir.add(source_obj, target_name, overwrite, True)
Move a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
721 def portable_manifest_text(self, stream_name: str=".") -> str: 722 """Get the portable manifest text for this collection 723 724 The portable manifest text is normalized, and does not include access 725 tokens. This method does not flush outstanding blocks to Keep. 726 727 Arguments: 728 729 * stream_name: str --- The name to use for this collection's stream in 730 the generated manifest. Default `'.'`. 731 """ 732 return self._get_manifest_text(stream_name, True, True)
Get the portable manifest text for this collection
The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.
Arguments:
- stream_name: str — The name to use for this collection’s stream in
the generated manifest. Default
'.'
.
734 @synchronized 735 def manifest_text( 736 self, 737 stream_name: str=".", 738 strip: bool=False, 739 normalize: bool=False, 740 only_committed: bool=False, 741 ) -> str: 742 """Get the manifest text for this collection 743 744 Arguments: 745 746 * stream_name: str --- The name to use for this collection's stream in 747 the generated manifest. Default `'.'`. 748 749 * strip: bool --- Controls whether or not the returned manifest text 750 includes access tokens. If `False` (the default), the manifest text 751 will include access tokens. If `True`, the manifest text will not 752 include access tokens. 753 754 * normalize: bool --- Controls whether or not the returned manifest 755 text is normalized. Default `False`. 756 757 * only_committed: bool --- Controls whether or not this method uploads 758 pending data to Keep before building and returning the manifest text. 759 If `False` (the default), this method will finish uploading all data 760 to Keep, then return the final manifest. If `True`, this method will 761 build and return a manifest that only refers to the data that has 762 finished uploading at the time this method was called. 763 """ 764 if not only_committed: 765 self._my_block_manager().commit_all() 766 return self._get_manifest_text(stream_name, strip, normalize, 767 only_committed=only_committed)
Get the manifest text for this collection
Arguments:
stream_name: str — The name to use for this collection’s stream in the generated manifest. Default
'.'
.strip: bool — Controls whether or not the returned manifest text includes access tokens. If
False
(the default), the manifest text will include access tokens. IfTrue
, the manifest text will not include access tokens.normalize: bool — Controls whether or not the returned manifest text is normalized. Default
False
.only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If
False
(the default), this method will finish uploading all data to Keep, then return the final manifest. IfTrue
, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.
841 @synchronized 842 def diff( 843 self, 844 end_collection: 'RichCollectionBase', 845 prefix: str=".", 846 holding_collection: Optional['Collection']=None, 847 ) -> ChangeList: 848 """Build a list of differences between this collection and another 849 850 Arguments: 851 852 * end_collection: arvados.collection.RichCollectionBase --- A 853 collection object with the desired end state. The returned diff 854 list will describe how to go from the current collection object 855 `self` to `end_collection`. 856 857 * prefix: str --- The name to use for this collection's stream in 858 the diff list. Default `'.'`. 859 860 * holding_collection: arvados.collection.Collection | None --- A 861 collection object used to hold objects for the returned diff 862 list. By default, a new empty collection is created. 863 """ 864 changes = [] 865 if holding_collection is None: 866 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 867 for k in self: 868 if k not in end_collection: 869 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 870 for k in end_collection: 871 if k in self: 872 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 873 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 874 elif end_collection[k] != self[k]: 875 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 876 else: 877 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 878 else: 879 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 880 return changes
Build a list of differences between this collection and another
Arguments:
end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object
self
toend_collection
.prefix: str — The name to use for this collection’s stream in the diff list. Default
'.'
.holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.
882 @must_be_writable 883 @synchronized 884 def apply(self, changes: ChangeList) -> None: 885 """Apply a list of changes from to this collection 886 887 This method takes a list of changes generated by 888 `RichCollectionBase.diff` and applies it to this 889 collection. Afterward, the state of this collection object will 890 match the state of `end_collection` passed to `diff`. If a change 891 conflicts with a local change, it will be saved to an alternate path 892 indicating the conflict. 893 894 Arguments: 895 896 * changes: arvados.collection.ChangeList --- The list of differences 897 generated by `RichCollectionBase.diff`. 898 """ 899 if changes: 900 self.set_committed(False) 901 for change in changes: 902 event_type = change[0] 903 path = change[1] 904 initial = change[2] 905 local = self.find(path) 906 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 907 time.gmtime())) 908 if event_type == ADD: 909 if local is None: 910 # No local file at path, safe to copy over new file 911 self.copy(initial, path) 912 elif local is not None and local != initial: 913 # There is already local file and it is different: 914 # save change to conflict file. 915 self.copy(initial, conflictpath) 916 elif event_type == MOD or event_type == TOK: 917 final = change[3] 918 if local == initial: 919 # Local matches the "initial" item so it has not 920 # changed locally and is safe to update. 921 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 922 # Replace contents of local file with new contents 923 local.replace_contents(final) 924 else: 925 # Overwrite path with new item; this can happen if 926 # path was a file and is now a collection or vice versa 927 self.copy(final, path, overwrite=True) 928 else: 929 # Local is missing (presumably deleted) or local doesn't 930 # match the "start" value, so save change to conflict file 931 self.copy(final, conflictpath) 932 elif event_type == DEL: 933 if local == initial: 934 # Local item matches "initial" value, so it is safe to remove. 935 self.remove(path, recursive=True) 936 # else, the file is modified or already removed, in either 937 # case we don't want to try to remove it.
Apply a list of changes from to this collection
This method takes a list of changes generated by
RichCollectionBase.diff
and applies it to this
collection. Afterward, the state of this collection object will
match the state of end_collection
passed to diff
. If a change
conflicts with a local change, it will be saved to an alternate path
indicating the conflict.
Arguments:
- changes: arvados.collection.ChangeList — The list of differences
generated by
RichCollectionBase.diff
.
939 def portable_data_hash(self) -> str: 940 """Get the portable data hash for this collection's manifest""" 941 if self._manifest_locator and self.committed(): 942 # If the collection is already saved on the API server, and it's committed 943 # then return API server's PDH response. 944 return self._portable_data_hash 945 else: 946 stripped = self.portable_manifest_text().encode() 947 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
Get the portable data hash for this collection’s manifest
949 @synchronized 950 def subscribe(self, callback: ChangeCallback) -> None: 951 """Set a notify callback for changes to this collection 952 953 Arguments: 954 955 * callback: arvados.collection.ChangeCallback --- The callable to 956 call each time the collection is changed. 957 """ 958 if self._callback is None: 959 self._callback = callback 960 else: 961 raise errors.ArgumentError("A callback is already set on this collection.")
Set a notify callback for changes to this collection
Arguments:
- callback: arvados.collection.ChangeCallback — The callable to call each time the collection is changed.
963 @synchronized 964 def unsubscribe(self) -> None: 965 """Remove any notify callback set for changes to this collection""" 966 if self._callback is not None: 967 self._callback = None
Remove any notify callback set for changes to this collection
969 @synchronized 970 def notify( 971 self, 972 event: ChangeType, 973 collection: 'RichCollectionBase', 974 name: str, 975 item: CollectionItem, 976 ) -> None: 977 """Notify any subscribed callback about a change to this collection 978 979 .. ATTENTION:: Internal 980 This method is only meant to be used by other Collection methods. 981 982 If a callback has been registered with `RichCollectionBase.subscribe`, 983 it will be called with information about a change to this collection. 984 Then this notification will be propagated to this collection's root. 985 986 Arguments: 987 988 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 989 the collection. 990 991 * collection: arvados.collection.RichCollectionBase --- The 992 collection that was modified. 993 994 * name: str --- The name of the file or stream within `collection` that 995 was modified. 996 997 * item: arvados.arvfile.ArvadosFile | 998 arvados.collection.Subcollection --- The new contents at `name` 999 within `collection`. 1000 """ 1001 if self._callback: 1002 self._callback(event, collection, name, item) 1003 self.root_collection().notify(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1025 @synchronized 1026 def flush(self) -> None: 1027 """Upload any pending data to Keep""" 1028 for e in listvalues(self): 1029 e.flush()
Upload any pending data to Keep
Inherited Members
1032class Collection(RichCollectionBase): 1033 """Read and manipulate an Arvados collection 1034 1035 This class provides a high-level interface to create, read, and update 1036 Arvados collections and their contents. Refer to the Arvados Python SDK 1037 cookbook for [an introduction to using the Collection class][cookbook]. 1038 1039 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1040 """ 1041 1042 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1043 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1044 keep_client: Optional['arvados.keep.KeepClient']=None, 1045 num_retries: int=10, 1046 parent: Optional['Collection']=None, 1047 apiconfig: Optional[Mapping[str, str]]=None, 1048 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1049 replication_desired: Optional[int]=None, 1050 storage_classes_desired: Optional[List[str]]=None, 1051 put_threads: Optional[int]=None): 1052 """Initialize a Collection object 1053 1054 Arguments: 1055 1056 * manifest_locator_or_text: str | None --- This string can contain a 1057 collection manifest text, portable data hash, or UUID. When given a 1058 portable data hash or UUID, this instance will load a collection 1059 record from the API server. Otherwise, this instance will represent a 1060 new collection without an API server record. The default value `None` 1061 instantiates a new collection with an empty manifest. 1062 1063 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1064 Arvados API client object this instance uses to make requests. If 1065 none is given, this instance creates its own client using the 1066 settings from `apiconfig` (see below). If your client instantiates 1067 many Collection objects, you can help limit memory utilization by 1068 calling `arvados.api.api` to construct an 1069 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1070 for every Collection. 1071 1072 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1073 object this instance uses to make requests. If none is given, this 1074 instance creates its own client using its `api_client`. 1075 1076 * num_retries: int --- The number of times that client requests are 1077 retried. Default 10. 1078 1079 * parent: arvados.collection.Collection | None --- The parent Collection 1080 object of this instance, if any. This argument is primarily used by 1081 other Collection methods; user client code shouldn't need to use it. 1082 1083 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1084 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1085 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1086 Collection object constructs one from these settings. If no 1087 mapping is provided, calls `arvados.config.settings` to get these 1088 parameters from user configuration. 1089 1090 * block_manager: arvados.arvfile._BlockManager | None --- The 1091 _BlockManager object used by this instance to coordinate reading 1092 and writing Keep data blocks. If none is given, this instance 1093 constructs its own. This argument is primarily used by other 1094 Collection methods; user client code shouldn't need to use it. 1095 1096 * replication_desired: int | None --- This controls both the value of 1097 the `replication_desired` field on API collection records saved by 1098 this class, as well as the number of Keep services that the object 1099 writes new data blocks to. If none is given, uses the default value 1100 configured for the cluster. 1101 1102 * storage_classes_desired: list[str] | None --- This controls both 1103 the value of the `storage_classes_desired` field on API collection 1104 records saved by this class, as well as selecting which specific 1105 Keep services the object writes new data blocks to. If none is 1106 given, defaults to an empty list. 1107 1108 * put_threads: int | None --- The number of threads to run 1109 simultaneously to upload data blocks to Keep. This value is used when 1110 building a new `block_manager`. It is unused when a `block_manager` 1111 is provided. 1112 """ 1113 1114 if storage_classes_desired and type(storage_classes_desired) is not list: 1115 raise errors.ArgumentError("storage_classes_desired must be list type.") 1116 1117 super(Collection, self).__init__(parent) 1118 self._api_client = api_client 1119 self._keep_client = keep_client 1120 1121 # Use the keep client from ThreadSafeApiCache 1122 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1123 self._keep_client = self._api_client.keep 1124 1125 self._block_manager = block_manager 1126 self.replication_desired = replication_desired 1127 self._storage_classes_desired = storage_classes_desired 1128 self.put_threads = put_threads 1129 1130 if apiconfig: 1131 self._config = apiconfig 1132 else: 1133 self._config = config.settings() 1134 1135 self.num_retries = num_retries 1136 self._manifest_locator = None 1137 self._manifest_text = None 1138 self._portable_data_hash = None 1139 self._api_response = None 1140 self._past_versions = set() 1141 1142 self.lock = threading.RLock() 1143 self.events = None 1144 1145 if manifest_locator_or_text: 1146 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1147 self._manifest_locator = manifest_locator_or_text 1148 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1149 self._manifest_locator = manifest_locator_or_text 1150 if not self._has_local_collection_uuid(): 1151 self._has_remote_blocks = True 1152 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1153 self._manifest_text = manifest_locator_or_text 1154 if '+R' in self._manifest_text: 1155 self._has_remote_blocks = True 1156 else: 1157 raise errors.ArgumentError( 1158 "Argument to CollectionReader is not a manifest or a collection UUID") 1159 1160 try: 1161 self._populate() 1162 except errors.SyntaxError as e: 1163 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1164 1165 def storage_classes_desired(self) -> List[str]: 1166 """Get this collection's `storage_classes_desired` value""" 1167 return self._storage_classes_desired or [] 1168 1169 def root_collection(self) -> 'Collection': 1170 return self 1171 1172 def get_properties(self) -> Properties: 1173 """Get this collection's properties 1174 1175 This method always returns a dict. If this collection object does not 1176 have an associated API record, or that record does not have any 1177 properties set, this method returns an empty dict. 1178 """ 1179 if self._api_response and self._api_response["properties"]: 1180 return self._api_response["properties"] 1181 else: 1182 return {} 1183 1184 def get_trash_at(self) -> Optional[datetime.datetime]: 1185 """Get this collection's `trash_at` field 1186 1187 This method parses the `trash_at` field of the collection's API 1188 record and returns a datetime from it. If that field is not set, or 1189 this collection object does not have an associated API record, 1190 returns None. 1191 """ 1192 if self._api_response and self._api_response["trash_at"]: 1193 try: 1194 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1195 except ValueError: 1196 return None 1197 else: 1198 return None 1199 1200 def stream_name(self) -> str: 1201 return "." 1202 1203 def writable(self) -> bool: 1204 return True 1205 1206 @synchronized 1207 def known_past_version( 1208 self, 1209 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1210 ) -> bool: 1211 """Indicate whether an API record for this collection has been seen before 1212 1213 As this collection object loads records from the API server, it records 1214 their `modified_at` and `portable_data_hash` fields. This method accepts 1215 a 2-tuple with values for those fields, and returns `True` if the 1216 combination was previously loaded. 1217 """ 1218 return modified_at_and_portable_data_hash in self._past_versions 1219 1220 @synchronized 1221 @retry_method 1222 def update( 1223 self, 1224 other: Optional['Collection']=None, 1225 num_retries: Optional[int]=None, 1226 ) -> None: 1227 """Merge another collection's contents into this one 1228 1229 This method compares the manifest of this collection instance with 1230 another, then updates this instance's manifest with changes from the 1231 other, renaming files to flag conflicts where necessary. 1232 1233 When called without any arguments, this method reloads the collection's 1234 API record, and updates this instance with any changes that have 1235 appeared server-side. If this instance does not have a corresponding 1236 API record, this method raises `arvados.errors.ArgumentError`. 1237 1238 Arguments: 1239 1240 * other: arvados.collection.Collection | None --- The collection 1241 whose contents should be merged into this instance. When not 1242 provided, this method reloads this collection's API record and 1243 constructs a Collection object from it. If this instance does not 1244 have a corresponding API record, this method raises 1245 `arvados.errors.ArgumentError`. 1246 1247 * num_retries: int | None --- The number of times to retry reloading 1248 the collection's API record from the API server. If not specified, 1249 uses the `num_retries` provided when this instance was constructed. 1250 """ 1251 if other is None: 1252 if self._manifest_locator is None: 1253 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1254 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1255 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1256 response.get("portable_data_hash") != self.portable_data_hash()): 1257 # The record on the server is different from our current one, but we've seen it before, 1258 # so ignore it because it's already been merged. 1259 # However, if it's the same as our current record, proceed with the update, because we want to update 1260 # our tokens. 1261 return 1262 else: 1263 self._remember_api_response(response) 1264 other = CollectionReader(response["manifest_text"]) 1265 baseline = CollectionReader(self._manifest_text) 1266 self.apply(baseline.diff(other)) 1267 self._manifest_text = self.manifest_text() 1268 1269 @synchronized 1270 def _my_api(self): 1271 if self._api_client is None: 1272 self._api_client = ThreadSafeApiCache(self._config, version='v1') 1273 if self._keep_client is None: 1274 self._keep_client = self._api_client.keep 1275 return self._api_client 1276 1277 @synchronized 1278 def _my_keep(self): 1279 if self._keep_client is None: 1280 if self._api_client is None: 1281 self._my_api() 1282 else: 1283 self._keep_client = KeepClient(api_client=self._api_client) 1284 return self._keep_client 1285 1286 @synchronized 1287 def _my_block_manager(self): 1288 if self._block_manager is None: 1289 copies = (self.replication_desired or 1290 self._my_api()._rootDesc.get('defaultCollectionReplication', 1291 2)) 1292 self._block_manager = _BlockManager(self._my_keep(), 1293 copies=copies, 1294 put_threads=self.put_threads, 1295 num_retries=self.num_retries, 1296 storage_classes_func=self.storage_classes_desired) 1297 return self._block_manager 1298 1299 def _remember_api_response(self, response): 1300 self._api_response = response 1301 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1302 1303 def _populate_from_api_server(self): 1304 # As in KeepClient itself, we must wait until the last 1305 # possible moment to instantiate an API client, in order to 1306 # avoid tripping up clients that don't have access to an API 1307 # server. If we do build one, make sure our Keep client uses 1308 # it. If instantiation fails, we'll fall back to the except 1309 # clause, just like any other Collection lookup 1310 # failure. Return an exception, or None if successful. 1311 self._remember_api_response(self._my_api().collections().get( 1312 uuid=self._manifest_locator).execute( 1313 num_retries=self.num_retries)) 1314 self._manifest_text = self._api_response['manifest_text'] 1315 self._portable_data_hash = self._api_response['portable_data_hash'] 1316 # If not overriden via kwargs, we should try to load the 1317 # replication_desired and storage_classes_desired from the API server 1318 if self.replication_desired is None: 1319 self.replication_desired = self._api_response.get('replication_desired', None) 1320 if self._storage_classes_desired is None: 1321 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1322 1323 def _populate(self): 1324 if self._manifest_text is None: 1325 if self._manifest_locator is None: 1326 return 1327 else: 1328 self._populate_from_api_server() 1329 self._baseline_manifest = self._manifest_text 1330 self._import_manifest(self._manifest_text) 1331 1332 def _has_collection_uuid(self): 1333 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1334 1335 def _has_local_collection_uuid(self): 1336 return self._has_collection_uuid and \ 1337 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1338 1339 def __enter__(self): 1340 return self 1341 1342 def __exit__(self, exc_type, exc_value, traceback): 1343 """Exit a context with this collection instance 1344 1345 If no exception was raised inside the context block, and this 1346 collection is writable and has a corresponding API record, that 1347 record will be updated to match the state of this instance at the end 1348 of the block. 1349 """ 1350 if exc_type is None: 1351 if self.writable() and self._has_collection_uuid(): 1352 self.save() 1353 self.stop_threads() 1354 1355 def stop_threads(self) -> None: 1356 """Stop background Keep upload/download threads""" 1357 if self._block_manager is not None: 1358 self._block_manager.stop_threads() 1359 1360 @synchronized 1361 def manifest_locator(self) -> Optional[str]: 1362 """Get this collection's manifest locator, if any 1363 1364 * If this collection instance is associated with an API record with a 1365 UUID, return that. 1366 * Otherwise, if this collection instance was loaded from an API record 1367 by portable data hash, return that. 1368 * Otherwise, return `None`. 1369 """ 1370 return self._manifest_locator 1371 1372 @synchronized 1373 def clone( 1374 self, 1375 new_parent: Optional['Collection']=None, 1376 new_name: Optional[str]=None, 1377 readonly: bool=False, 1378 new_config: Optional[Mapping[str, str]]=None, 1379 ) -> 'Collection': 1380 """Create a Collection object with the same contents as this instance 1381 1382 This method creates a new Collection object with contents that match 1383 this instance's. The new collection will not be associated with any API 1384 record. 1385 1386 Arguments: 1387 1388 * new_parent: arvados.collection.Collection | None --- This value is 1389 passed to the new Collection's constructor as the `parent` 1390 argument. 1391 1392 * new_name: str | None --- This value is unused. 1393 1394 * readonly: bool --- If this value is true, this method constructs and 1395 returns a `CollectionReader`. Otherwise, it returns a mutable 1396 `Collection`. Default `False`. 1397 1398 * new_config: Mapping[str, str] | None --- This value is passed to the 1399 new Collection's constructor as `apiconfig`. If no value is provided, 1400 defaults to the configuration passed to this instance's constructor. 1401 """ 1402 if new_config is None: 1403 new_config = self._config 1404 if readonly: 1405 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1406 else: 1407 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1408 1409 newcollection._clonefrom(self) 1410 return newcollection 1411 1412 @synchronized 1413 def api_response(self) -> Optional[Dict[str, Any]]: 1414 """Get this instance's associated API record 1415 1416 If this Collection instance has an associated API record, return it. 1417 Otherwise, return `None`. 1418 """ 1419 return self._api_response 1420 1421 def find_or_create( 1422 self, 1423 path: str, 1424 create_type: CreateType, 1425 ) -> CollectionItem: 1426 if path == ".": 1427 return self 1428 else: 1429 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1430 1431 def find(self, path: str) -> CollectionItem: 1432 if path == ".": 1433 return self 1434 else: 1435 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1436 1437 def remove(self, path: str, recursive: bool=False) -> None: 1438 if path == ".": 1439 raise errors.ArgumentError("Cannot remove '.'") 1440 else: 1441 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1442 1443 @must_be_writable 1444 @synchronized 1445 @retry_method 1446 def save( 1447 self, 1448 properties: Optional[Properties]=None, 1449 storage_classes: Optional[StorageClasses]=None, 1450 trash_at: Optional[datetime.datetime]=None, 1451 merge: bool=True, 1452 num_retries: Optional[int]=None, 1453 preserve_version: bool=False, 1454 ) -> str: 1455 """Save collection to an existing API record 1456 1457 This method updates the instance's corresponding API record to match 1458 the instance's state. If this instance does not have a corresponding API 1459 record yet, raises `AssertionError`. (To create a new API record, use 1460 `Collection.save_new`.) This method returns the saved collection 1461 manifest. 1462 1463 Arguments: 1464 1465 * properties: dict[str, Any] | None --- If provided, the API record will 1466 be updated with these properties. Note this will completely replace 1467 any existing properties. 1468 1469 * storage_classes: list[str] | None --- If provided, the API record will 1470 be updated with this value in the `storage_classes_desired` field. 1471 This value will also be saved on the instance and used for any 1472 changes that follow. 1473 1474 * trash_at: datetime.datetime | None --- If provided, the API record 1475 will be updated with this value in the `trash_at` field. 1476 1477 * merge: bool --- If `True` (the default), this method will first 1478 reload this collection's API record, and merge any new contents into 1479 this instance before saving changes. See `Collection.update` for 1480 details. 1481 1482 * num_retries: int | None --- The number of times to retry reloading 1483 the collection's API record from the API server. If not specified, 1484 uses the `num_retries` provided when this instance was constructed. 1485 1486 * preserve_version: bool --- This value will be passed to directly 1487 to the underlying API call. If `True`, the Arvados API will 1488 preserve the versions of this collection both immediately before 1489 and after the update. If `True` when the API server is not 1490 configured with collection versioning, this method raises 1491 `arvados.errors.ArgumentError`. 1492 """ 1493 if properties and type(properties) is not dict: 1494 raise errors.ArgumentError("properties must be dictionary type.") 1495 1496 if storage_classes and type(storage_classes) is not list: 1497 raise errors.ArgumentError("storage_classes must be list type.") 1498 if storage_classes: 1499 self._storage_classes_desired = storage_classes 1500 1501 if trash_at and type(trash_at) is not datetime.datetime: 1502 raise errors.ArgumentError("trash_at must be datetime type.") 1503 1504 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1505 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1506 1507 body={} 1508 if properties: 1509 body["properties"] = properties 1510 if self.storage_classes_desired(): 1511 body["storage_classes_desired"] = self.storage_classes_desired() 1512 if trash_at: 1513 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1514 body["trash_at"] = t 1515 if preserve_version: 1516 body["preserve_version"] = preserve_version 1517 1518 if not self.committed(): 1519 if self._has_remote_blocks: 1520 # Copy any remote blocks to the local cluster. 1521 self._copy_remote_blocks(remote_blocks={}) 1522 self._has_remote_blocks = False 1523 if not self._has_collection_uuid(): 1524 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1525 elif not self._has_local_collection_uuid(): 1526 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1527 1528 self._my_block_manager().commit_all() 1529 1530 if merge: 1531 self.update() 1532 1533 text = self.manifest_text(strip=False) 1534 body['manifest_text'] = text 1535 1536 self._remember_api_response(self._my_api().collections().update( 1537 uuid=self._manifest_locator, 1538 body=body 1539 ).execute(num_retries=num_retries)) 1540 self._manifest_text = self._api_response["manifest_text"] 1541 self._portable_data_hash = self._api_response["portable_data_hash"] 1542 self.set_committed(True) 1543 elif body: 1544 self._remember_api_response(self._my_api().collections().update( 1545 uuid=self._manifest_locator, 1546 body=body 1547 ).execute(num_retries=num_retries)) 1548 1549 return self._manifest_text 1550 1551 1552 @must_be_writable 1553 @synchronized 1554 @retry_method 1555 def save_new( 1556 self, 1557 name: Optional[str]=None, 1558 create_collection_record: bool=True, 1559 owner_uuid: Optional[str]=None, 1560 properties: Optional[Properties]=None, 1561 storage_classes: Optional[StorageClasses]=None, 1562 trash_at: Optional[datetime.datetime]=None, 1563 ensure_unique_name: bool=False, 1564 num_retries: Optional[int]=None, 1565 preserve_version: bool=False, 1566 ): 1567 """Save collection to a new API record 1568 1569 This method finishes uploading new data blocks and (optionally) 1570 creates a new API collection record with the provided data. If a new 1571 record is created, this instance becomes associated with that record 1572 for future updates like `save()`. This method returns the saved 1573 collection manifest. 1574 1575 Arguments: 1576 1577 * name: str | None --- The `name` field to use on the new collection 1578 record. If not specified, a generic default name is generated. 1579 1580 * create_collection_record: bool --- If `True` (the default), creates a 1581 collection record on the API server. If `False`, the method finishes 1582 all data uploads and only returns the resulting collection manifest 1583 without sending it to the API server. 1584 1585 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1586 new collection record. 1587 1588 * properties: dict[str, Any] | None --- The `properties` field to use on 1589 the new collection record. 1590 1591 * storage_classes: list[str] | None --- The 1592 `storage_classes_desired` field to use on the new collection record. 1593 1594 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1595 on the new collection record. 1596 1597 * ensure_unique_name: bool --- This value is passed directly to the 1598 Arvados API when creating the collection record. If `True`, the API 1599 server may modify the submitted `name` to ensure the collection's 1600 `name`+`owner_uuid` combination is unique. If `False` (the default), 1601 if a collection already exists with this same `name`+`owner_uuid` 1602 combination, creating a collection record will raise a validation 1603 error. 1604 1605 * num_retries: int | None --- The number of times to retry reloading 1606 the collection's API record from the API server. If not specified, 1607 uses the `num_retries` provided when this instance was constructed. 1608 1609 * preserve_version: bool --- This value will be passed to directly 1610 to the underlying API call. If `True`, the Arvados API will 1611 preserve the versions of this collection both immediately before 1612 and after the update. If `True` when the API server is not 1613 configured with collection versioning, this method raises 1614 `arvados.errors.ArgumentError`. 1615 """ 1616 if properties and type(properties) is not dict: 1617 raise errors.ArgumentError("properties must be dictionary type.") 1618 1619 if storage_classes and type(storage_classes) is not list: 1620 raise errors.ArgumentError("storage_classes must be list type.") 1621 1622 if trash_at and type(trash_at) is not datetime.datetime: 1623 raise errors.ArgumentError("trash_at must be datetime type.") 1624 1625 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1626 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1627 1628 if self._has_remote_blocks: 1629 # Copy any remote blocks to the local cluster. 1630 self._copy_remote_blocks(remote_blocks={}) 1631 self._has_remote_blocks = False 1632 1633 if storage_classes: 1634 self._storage_classes_desired = storage_classes 1635 1636 self._my_block_manager().commit_all() 1637 text = self.manifest_text(strip=False) 1638 1639 if create_collection_record: 1640 if name is None: 1641 name = "New collection" 1642 ensure_unique_name = True 1643 1644 body = {"manifest_text": text, 1645 "name": name, 1646 "replication_desired": self.replication_desired} 1647 if owner_uuid: 1648 body["owner_uuid"] = owner_uuid 1649 if properties: 1650 body["properties"] = properties 1651 if self.storage_classes_desired(): 1652 body["storage_classes_desired"] = self.storage_classes_desired() 1653 if trash_at: 1654 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1655 body["trash_at"] = t 1656 if preserve_version: 1657 body["preserve_version"] = preserve_version 1658 1659 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1660 text = self._api_response["manifest_text"] 1661 1662 self._manifest_locator = self._api_response["uuid"] 1663 self._portable_data_hash = self._api_response["portable_data_hash"] 1664 1665 self._manifest_text = text 1666 self.set_committed(True) 1667 1668 return text 1669 1670 _token_re = re.compile(r'(\S+)(\s+|$)') 1671 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1672 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1673 1674 def _unescape_manifest_path(self, path): 1675 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1676 1677 @synchronized 1678 def _import_manifest(self, manifest_text): 1679 """Import a manifest into a `Collection`. 1680 1681 :manifest_text: 1682 The manifest text to import from. 1683 1684 """ 1685 if len(self) > 0: 1686 raise ArgumentError("Can only import manifest into an empty collection") 1687 1688 STREAM_NAME = 0 1689 BLOCKS = 1 1690 SEGMENTS = 2 1691 1692 stream_name = None 1693 state = STREAM_NAME 1694 1695 for token_and_separator in self._token_re.finditer(manifest_text): 1696 tok = token_and_separator.group(1) 1697 sep = token_and_separator.group(2) 1698 1699 if state == STREAM_NAME: 1700 # starting a new stream 1701 stream_name = self._unescape_manifest_path(tok) 1702 blocks = [] 1703 segments = [] 1704 streamoffset = 0 1705 state = BLOCKS 1706 self.find_or_create(stream_name, COLLECTION) 1707 continue 1708 1709 if state == BLOCKS: 1710 block_locator = self._block_re.match(tok) 1711 if block_locator: 1712 blocksize = int(block_locator.group(1)) 1713 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1714 streamoffset += blocksize 1715 else: 1716 state = SEGMENTS 1717 1718 if state == SEGMENTS: 1719 file_segment = self._segment_re.match(tok) 1720 if file_segment: 1721 pos = int(file_segment.group(1)) 1722 size = int(file_segment.group(2)) 1723 name = self._unescape_manifest_path(file_segment.group(3)) 1724 if name.split('/')[-1] == '.': 1725 # placeholder for persisting an empty directory, not a real file 1726 if len(name) > 2: 1727 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1728 else: 1729 filepath = os.path.join(stream_name, name) 1730 try: 1731 afile = self.find_or_create(filepath, FILE) 1732 except IOError as e: 1733 if e.errno == errno.ENOTDIR: 1734 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1735 else: 1736 raise e from None 1737 if isinstance(afile, ArvadosFile): 1738 afile.add_segment(blocks, pos, size) 1739 else: 1740 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1741 else: 1742 # error! 1743 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1744 1745 if sep == "\n": 1746 stream_name = None 1747 state = STREAM_NAME 1748 1749 self.set_committed(True) 1750 1751 @synchronized 1752 def notify( 1753 self, 1754 event: ChangeType, 1755 collection: 'RichCollectionBase', 1756 name: str, 1757 item: CollectionItem, 1758 ) -> None: 1759 if self._callback: 1760 self._callback(event, collection, name, item)
Read and manipulate an Arvados collection
This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.
1042 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1043 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1044 keep_client: Optional['arvados.keep.KeepClient']=None, 1045 num_retries: int=10, 1046 parent: Optional['Collection']=None, 1047 apiconfig: Optional[Mapping[str, str]]=None, 1048 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1049 replication_desired: Optional[int]=None, 1050 storage_classes_desired: Optional[List[str]]=None, 1051 put_threads: Optional[int]=None): 1052 """Initialize a Collection object 1053 1054 Arguments: 1055 1056 * manifest_locator_or_text: str | None --- This string can contain a 1057 collection manifest text, portable data hash, or UUID. When given a 1058 portable data hash or UUID, this instance will load a collection 1059 record from the API server. Otherwise, this instance will represent a 1060 new collection without an API server record. The default value `None` 1061 instantiates a new collection with an empty manifest. 1062 1063 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1064 Arvados API client object this instance uses to make requests. If 1065 none is given, this instance creates its own client using the 1066 settings from `apiconfig` (see below). If your client instantiates 1067 many Collection objects, you can help limit memory utilization by 1068 calling `arvados.api.api` to construct an 1069 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1070 for every Collection. 1071 1072 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1073 object this instance uses to make requests. If none is given, this 1074 instance creates its own client using its `api_client`. 1075 1076 * num_retries: int --- The number of times that client requests are 1077 retried. Default 10. 1078 1079 * parent: arvados.collection.Collection | None --- The parent Collection 1080 object of this instance, if any. This argument is primarily used by 1081 other Collection methods; user client code shouldn't need to use it. 1082 1083 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1084 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1085 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1086 Collection object constructs one from these settings. If no 1087 mapping is provided, calls `arvados.config.settings` to get these 1088 parameters from user configuration. 1089 1090 * block_manager: arvados.arvfile._BlockManager | None --- The 1091 _BlockManager object used by this instance to coordinate reading 1092 and writing Keep data blocks. If none is given, this instance 1093 constructs its own. This argument is primarily used by other 1094 Collection methods; user client code shouldn't need to use it. 1095 1096 * replication_desired: int | None --- This controls both the value of 1097 the `replication_desired` field on API collection records saved by 1098 this class, as well as the number of Keep services that the object 1099 writes new data blocks to. If none is given, uses the default value 1100 configured for the cluster. 1101 1102 * storage_classes_desired: list[str] | None --- This controls both 1103 the value of the `storage_classes_desired` field on API collection 1104 records saved by this class, as well as selecting which specific 1105 Keep services the object writes new data blocks to. If none is 1106 given, defaults to an empty list. 1107 1108 * put_threads: int | None --- The number of threads to run 1109 simultaneously to upload data blocks to Keep. This value is used when 1110 building a new `block_manager`. It is unused when a `block_manager` 1111 is provided. 1112 """ 1113 1114 if storage_classes_desired and type(storage_classes_desired) is not list: 1115 raise errors.ArgumentError("storage_classes_desired must be list type.") 1116 1117 super(Collection, self).__init__(parent) 1118 self._api_client = api_client 1119 self._keep_client = keep_client 1120 1121 # Use the keep client from ThreadSafeApiCache 1122 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1123 self._keep_client = self._api_client.keep 1124 1125 self._block_manager = block_manager 1126 self.replication_desired = replication_desired 1127 self._storage_classes_desired = storage_classes_desired 1128 self.put_threads = put_threads 1129 1130 if apiconfig: 1131 self._config = apiconfig 1132 else: 1133 self._config = config.settings() 1134 1135 self.num_retries = num_retries 1136 self._manifest_locator = None 1137 self._manifest_text = None 1138 self._portable_data_hash = None 1139 self._api_response = None 1140 self._past_versions = set() 1141 1142 self.lock = threading.RLock() 1143 self.events = None 1144 1145 if manifest_locator_or_text: 1146 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1147 self._manifest_locator = manifest_locator_or_text 1148 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1149 self._manifest_locator = manifest_locator_or_text 1150 if not self._has_local_collection_uuid(): 1151 self._has_remote_blocks = True 1152 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1153 self._manifest_text = manifest_locator_or_text 1154 if '+R' in self._manifest_text: 1155 self._has_remote_blocks = True 1156 else: 1157 raise errors.ArgumentError( 1158 "Argument to CollectionReader is not a manifest or a collection UUID") 1159 1160 try: 1161 self._populate() 1162 except errors.SyntaxError as e: 1163 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.safeapi.ThreadSafeApiCache
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
1165 def storage_classes_desired(self) -> List[str]: 1166 """Get this collection's `storage_classes_desired` value""" 1167 return self._storage_classes_desired or []
Get this collection’s storage_classes_desired
value
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
1172 def get_properties(self) -> Properties: 1173 """Get this collection's properties 1174 1175 This method always returns a dict. If this collection object does not 1176 have an associated API record, or that record does not have any 1177 properties set, this method returns an empty dict. 1178 """ 1179 if self._api_response and self._api_response["properties"]: 1180 return self._api_response["properties"] 1181 else: 1182 return {}
Get this collection’s properties
This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.
1184 def get_trash_at(self) -> Optional[datetime.datetime]: 1185 """Get this collection's `trash_at` field 1186 1187 This method parses the `trash_at` field of the collection's API 1188 record and returns a datetime from it. If that field is not set, or 1189 this collection object does not have an associated API record, 1190 returns None. 1191 """ 1192 if self._api_response and self._api_response["trash_at"]: 1193 try: 1194 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1195 except ValueError: 1196 return None 1197 else: 1198 return None
Get this collection’s trash_at
field
This method parses the trash_at
field of the collection’s API
record and returns a datetime from it. If that field is not set, or
this collection object does not have an associated API record,
returns None.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
1206 @synchronized 1207 def known_past_version( 1208 self, 1209 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1210 ) -> bool: 1211 """Indicate whether an API record for this collection has been seen before 1212 1213 As this collection object loads records from the API server, it records 1214 their `modified_at` and `portable_data_hash` fields. This method accepts 1215 a 2-tuple with values for those fields, and returns `True` if the 1216 combination was previously loaded. 1217 """ 1218 return modified_at_and_portable_data_hash in self._past_versions
Indicate whether an API record for this collection has been seen before
As this collection object loads records from the API server, it records
their modified_at
and portable_data_hash
fields. This method accepts
a 2-tuple with values for those fields, and returns True
if the
combination was previously loaded.
1220 @synchronized 1221 @retry_method 1222 def update( 1223 self, 1224 other: Optional['Collection']=None, 1225 num_retries: Optional[int]=None, 1226 ) -> None: 1227 """Merge another collection's contents into this one 1228 1229 This method compares the manifest of this collection instance with 1230 another, then updates this instance's manifest with changes from the 1231 other, renaming files to flag conflicts where necessary. 1232 1233 When called without any arguments, this method reloads the collection's 1234 API record, and updates this instance with any changes that have 1235 appeared server-side. If this instance does not have a corresponding 1236 API record, this method raises `arvados.errors.ArgumentError`. 1237 1238 Arguments: 1239 1240 * other: arvados.collection.Collection | None --- The collection 1241 whose contents should be merged into this instance. When not 1242 provided, this method reloads this collection's API record and 1243 constructs a Collection object from it. If this instance does not 1244 have a corresponding API record, this method raises 1245 `arvados.errors.ArgumentError`. 1246 1247 * num_retries: int | None --- The number of times to retry reloading 1248 the collection's API record from the API server. If not specified, 1249 uses the `num_retries` provided when this instance was constructed. 1250 """ 1251 if other is None: 1252 if self._manifest_locator is None: 1253 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1254 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1255 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1256 response.get("portable_data_hash") != self.portable_data_hash()): 1257 # The record on the server is different from our current one, but we've seen it before, 1258 # so ignore it because it's already been merged. 1259 # However, if it's the same as our current record, proceed with the update, because we want to update 1260 # our tokens. 1261 return 1262 else: 1263 self._remember_api_response(response) 1264 other = CollectionReader(response["manifest_text"]) 1265 baseline = CollectionReader(self._manifest_text) 1266 self.apply(baseline.diff(other)) 1267 self._manifest_text = self.manifest_text()
Merge another collection’s contents into this one
This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.
When called without any arguments, this method reloads the collection’s
API record, and updates this instance with any changes that have
appeared server-side. If this instance does not have a corresponding
API record, this method raises arvados.errors.ArgumentError
.
Arguments:
other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises
arvados.errors.ArgumentError
.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.
1355 def stop_threads(self) -> None: 1356 """Stop background Keep upload/download threads""" 1357 if self._block_manager is not None: 1358 self._block_manager.stop_threads()
Stop background Keep upload/download threads
1360 @synchronized 1361 def manifest_locator(self) -> Optional[str]: 1362 """Get this collection's manifest locator, if any 1363 1364 * If this collection instance is associated with an API record with a 1365 UUID, return that. 1366 * Otherwise, if this collection instance was loaded from an API record 1367 by portable data hash, return that. 1368 * Otherwise, return `None`. 1369 """ 1370 return self._manifest_locator
Get this collection’s manifest locator, if any
- If this collection instance is associated with an API record with a UUID, return that.
- Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
- Otherwise, return
None
.
1372 @synchronized 1373 def clone( 1374 self, 1375 new_parent: Optional['Collection']=None, 1376 new_name: Optional[str]=None, 1377 readonly: bool=False, 1378 new_config: Optional[Mapping[str, str]]=None, 1379 ) -> 'Collection': 1380 """Create a Collection object with the same contents as this instance 1381 1382 This method creates a new Collection object with contents that match 1383 this instance's. The new collection will not be associated with any API 1384 record. 1385 1386 Arguments: 1387 1388 * new_parent: arvados.collection.Collection | None --- This value is 1389 passed to the new Collection's constructor as the `parent` 1390 argument. 1391 1392 * new_name: str | None --- This value is unused. 1393 1394 * readonly: bool --- If this value is true, this method constructs and 1395 returns a `CollectionReader`. Otherwise, it returns a mutable 1396 `Collection`. Default `False`. 1397 1398 * new_config: Mapping[str, str] | None --- This value is passed to the 1399 new Collection's constructor as `apiconfig`. If no value is provided, 1400 defaults to the configuration passed to this instance's constructor. 1401 """ 1402 if new_config is None: 1403 new_config = self._config 1404 if readonly: 1405 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1406 else: 1407 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1408 1409 newcollection._clonefrom(self) 1410 return newcollection
Create a Collection object with the same contents as this instance
This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.
Arguments:
new_parent: Collection | None — This value is passed to the new Collection’s constructor as the
parent
argument.new_name: str | None — This value is unused.
readonly: bool — If this value is true, this method constructs and returns a
CollectionReader
. Otherwise, it returns a mutableCollection
. DefaultFalse
.new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as
apiconfig
. If no value is provided, defaults to the configuration passed to this instance’s constructor.
1412 @synchronized 1413 def api_response(self) -> Optional[Dict[str, Any]]: 1414 """Get this instance's associated API record 1415 1416 If this Collection instance has an associated API record, return it. 1417 Otherwise, return `None`. 1418 """ 1419 return self._api_response
Get this instance’s associated API record
If this Collection instance has an associated API record, return it.
Otherwise, return None
.
1421 def find_or_create( 1422 self, 1423 path: str, 1424 create_type: CreateType, 1425 ) -> CollectionItem: 1426 if path == ".": 1427 return self 1428 else: 1429 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
1431 def find(self, path: str) -> CollectionItem: 1432 if path == ".": 1433 return self 1434 else: 1435 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
1437 def remove(self, path: str, recursive: bool=False) -> None: 1438 if path == ".": 1439 raise errors.ArgumentError("Cannot remove '.'") 1440 else: 1441 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
1443 @must_be_writable 1444 @synchronized 1445 @retry_method 1446 def save( 1447 self, 1448 properties: Optional[Properties]=None, 1449 storage_classes: Optional[StorageClasses]=None, 1450 trash_at: Optional[datetime.datetime]=None, 1451 merge: bool=True, 1452 num_retries: Optional[int]=None, 1453 preserve_version: bool=False, 1454 ) -> str: 1455 """Save collection to an existing API record 1456 1457 This method updates the instance's corresponding API record to match 1458 the instance's state. If this instance does not have a corresponding API 1459 record yet, raises `AssertionError`. (To create a new API record, use 1460 `Collection.save_new`.) This method returns the saved collection 1461 manifest. 1462 1463 Arguments: 1464 1465 * properties: dict[str, Any] | None --- If provided, the API record will 1466 be updated with these properties. Note this will completely replace 1467 any existing properties. 1468 1469 * storage_classes: list[str] | None --- If provided, the API record will 1470 be updated with this value in the `storage_classes_desired` field. 1471 This value will also be saved on the instance and used for any 1472 changes that follow. 1473 1474 * trash_at: datetime.datetime | None --- If provided, the API record 1475 will be updated with this value in the `trash_at` field. 1476 1477 * merge: bool --- If `True` (the default), this method will first 1478 reload this collection's API record, and merge any new contents into 1479 this instance before saving changes. See `Collection.update` for 1480 details. 1481 1482 * num_retries: int | None --- The number of times to retry reloading 1483 the collection's API record from the API server. If not specified, 1484 uses the `num_retries` provided when this instance was constructed. 1485 1486 * preserve_version: bool --- This value will be passed to directly 1487 to the underlying API call. If `True`, the Arvados API will 1488 preserve the versions of this collection both immediately before 1489 and after the update. If `True` when the API server is not 1490 configured with collection versioning, this method raises 1491 `arvados.errors.ArgumentError`. 1492 """ 1493 if properties and type(properties) is not dict: 1494 raise errors.ArgumentError("properties must be dictionary type.") 1495 1496 if storage_classes and type(storage_classes) is not list: 1497 raise errors.ArgumentError("storage_classes must be list type.") 1498 if storage_classes: 1499 self._storage_classes_desired = storage_classes 1500 1501 if trash_at and type(trash_at) is not datetime.datetime: 1502 raise errors.ArgumentError("trash_at must be datetime type.") 1503 1504 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1505 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1506 1507 body={} 1508 if properties: 1509 body["properties"] = properties 1510 if self.storage_classes_desired(): 1511 body["storage_classes_desired"] = self.storage_classes_desired() 1512 if trash_at: 1513 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1514 body["trash_at"] = t 1515 if preserve_version: 1516 body["preserve_version"] = preserve_version 1517 1518 if not self.committed(): 1519 if self._has_remote_blocks: 1520 # Copy any remote blocks to the local cluster. 1521 self._copy_remote_blocks(remote_blocks={}) 1522 self._has_remote_blocks = False 1523 if not self._has_collection_uuid(): 1524 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1525 elif not self._has_local_collection_uuid(): 1526 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1527 1528 self._my_block_manager().commit_all() 1529 1530 if merge: 1531 self.update() 1532 1533 text = self.manifest_text(strip=False) 1534 body['manifest_text'] = text 1535 1536 self._remember_api_response(self._my_api().collections().update( 1537 uuid=self._manifest_locator, 1538 body=body 1539 ).execute(num_retries=num_retries)) 1540 self._manifest_text = self._api_response["manifest_text"] 1541 self._portable_data_hash = self._api_response["portable_data_hash"] 1542 self.set_committed(True) 1543 elif body: 1544 self._remember_api_response(self._my_api().collections().update( 1545 uuid=self._manifest_locator, 1546 body=body 1547 ).execute(num_retries=num_retries)) 1548 1549 return self._manifest_text
Save collection to an existing API record
This method updates the instance’s corresponding API record to match
the instance’s state. If this instance does not have a corresponding API
record yet, raises AssertionError
. (To create a new API record, use
Collection.save_new
.) This method returns the saved collection
manifest.
Arguments:
properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.
storage_classes: list[str] | None — If provided, the API record will be updated with this value in the
storage_classes_desired
field. This value will also be saved on the instance and used for any changes that follow.trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the
trash_at
field.merge: bool — If
True
(the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. SeeCollection.update
for details.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1552 @must_be_writable 1553 @synchronized 1554 @retry_method 1555 def save_new( 1556 self, 1557 name: Optional[str]=None, 1558 create_collection_record: bool=True, 1559 owner_uuid: Optional[str]=None, 1560 properties: Optional[Properties]=None, 1561 storage_classes: Optional[StorageClasses]=None, 1562 trash_at: Optional[datetime.datetime]=None, 1563 ensure_unique_name: bool=False, 1564 num_retries: Optional[int]=None, 1565 preserve_version: bool=False, 1566 ): 1567 """Save collection to a new API record 1568 1569 This method finishes uploading new data blocks and (optionally) 1570 creates a new API collection record with the provided data. If a new 1571 record is created, this instance becomes associated with that record 1572 for future updates like `save()`. This method returns the saved 1573 collection manifest. 1574 1575 Arguments: 1576 1577 * name: str | None --- The `name` field to use on the new collection 1578 record. If not specified, a generic default name is generated. 1579 1580 * create_collection_record: bool --- If `True` (the default), creates a 1581 collection record on the API server. If `False`, the method finishes 1582 all data uploads and only returns the resulting collection manifest 1583 without sending it to the API server. 1584 1585 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1586 new collection record. 1587 1588 * properties: dict[str, Any] | None --- The `properties` field to use on 1589 the new collection record. 1590 1591 * storage_classes: list[str] | None --- The 1592 `storage_classes_desired` field to use on the new collection record. 1593 1594 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1595 on the new collection record. 1596 1597 * ensure_unique_name: bool --- This value is passed directly to the 1598 Arvados API when creating the collection record. If `True`, the API 1599 server may modify the submitted `name` to ensure the collection's 1600 `name`+`owner_uuid` combination is unique. If `False` (the default), 1601 if a collection already exists with this same `name`+`owner_uuid` 1602 combination, creating a collection record will raise a validation 1603 error. 1604 1605 * num_retries: int | None --- The number of times to retry reloading 1606 the collection's API record from the API server. If not specified, 1607 uses the `num_retries` provided when this instance was constructed. 1608 1609 * preserve_version: bool --- This value will be passed to directly 1610 to the underlying API call. If `True`, the Arvados API will 1611 preserve the versions of this collection both immediately before 1612 and after the update. If `True` when the API server is not 1613 configured with collection versioning, this method raises 1614 `arvados.errors.ArgumentError`. 1615 """ 1616 if properties and type(properties) is not dict: 1617 raise errors.ArgumentError("properties must be dictionary type.") 1618 1619 if storage_classes and type(storage_classes) is not list: 1620 raise errors.ArgumentError("storage_classes must be list type.") 1621 1622 if trash_at and type(trash_at) is not datetime.datetime: 1623 raise errors.ArgumentError("trash_at must be datetime type.") 1624 1625 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1626 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1627 1628 if self._has_remote_blocks: 1629 # Copy any remote blocks to the local cluster. 1630 self._copy_remote_blocks(remote_blocks={}) 1631 self._has_remote_blocks = False 1632 1633 if storage_classes: 1634 self._storage_classes_desired = storage_classes 1635 1636 self._my_block_manager().commit_all() 1637 text = self.manifest_text(strip=False) 1638 1639 if create_collection_record: 1640 if name is None: 1641 name = "New collection" 1642 ensure_unique_name = True 1643 1644 body = {"manifest_text": text, 1645 "name": name, 1646 "replication_desired": self.replication_desired} 1647 if owner_uuid: 1648 body["owner_uuid"] = owner_uuid 1649 if properties: 1650 body["properties"] = properties 1651 if self.storage_classes_desired(): 1652 body["storage_classes_desired"] = self.storage_classes_desired() 1653 if trash_at: 1654 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1655 body["trash_at"] = t 1656 if preserve_version: 1657 body["preserve_version"] = preserve_version 1658 1659 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1660 text = self._api_response["manifest_text"] 1661 1662 self._manifest_locator = self._api_response["uuid"] 1663 self._portable_data_hash = self._api_response["portable_data_hash"] 1664 1665 self._manifest_text = text 1666 self.set_committed(True) 1667 1668 return text
Save collection to a new API record
This method finishes uploading new data blocks and (optionally)
creates a new API collection record with the provided data. If a new
record is created, this instance becomes associated with that record
for future updates like save()
. This method returns the saved
collection manifest.
Arguments:
name: str | None — The
name
field to use on the new collection record. If not specified, a generic default name is generated.create_collection_record: bool — If
True
(the default), creates a collection record on the API server. IfFalse
, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.owner_uuid: str | None — The
owner_uuid
field to use on the new collection record.properties: dict[str, Any] | None — The
properties
field to use on the new collection record.storage_classes: list[str] | None — The
storage_classes_desired
field to use on the new collection record.trash_at: datetime.datetime | None — The
trash_at
field to use on the new collection record.ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If
True
, the API server may modify the submittedname
to ensure the collection’sname
+owner_uuid
combination is unique. IfFalse
(the default), if a collection already exists with this samename
+owner_uuid
combination, creating a collection record will raise a validation error.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1751 @synchronized 1752 def notify( 1753 self, 1754 event: ChangeType, 1755 collection: 'RichCollectionBase', 1756 name: str, 1757 item: CollectionItem, 1758 ) -> None: 1759 if self._callback: 1760 self._callback(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1763class Subcollection(RichCollectionBase): 1764 """Read and manipulate a stream/directory within an Arvados collection 1765 1766 This class represents a single stream (like a directory) within an Arvados 1767 `Collection`. It is returned by `Collection.find` and provides the same API. 1768 Operations that work on the API collection record propagate to the parent 1769 `Collection` object. 1770 """ 1771 1772 def __init__(self, parent, name): 1773 super(Subcollection, self).__init__(parent) 1774 self.lock = self.root_collection().lock 1775 self._manifest_text = None 1776 self.name = name 1777 self.num_retries = parent.num_retries 1778 1779 def root_collection(self) -> 'Collection': 1780 return self.parent.root_collection() 1781 1782 def writable(self) -> bool: 1783 return self.root_collection().writable() 1784 1785 def _my_api(self): 1786 return self.root_collection()._my_api() 1787 1788 def _my_keep(self): 1789 return self.root_collection()._my_keep() 1790 1791 def _my_block_manager(self): 1792 return self.root_collection()._my_block_manager() 1793 1794 def stream_name(self) -> str: 1795 return os.path.join(self.parent.stream_name(), self.name) 1796 1797 @synchronized 1798 def clone( 1799 self, 1800 new_parent: Optional['Collection']=None, 1801 new_name: Optional[str]=None, 1802 ) -> 'Subcollection': 1803 c = Subcollection(new_parent, new_name) 1804 c._clonefrom(self) 1805 return c 1806 1807 @must_be_writable 1808 @synchronized 1809 def _reparent(self, newparent, newname): 1810 self.set_committed(False) 1811 self.flush() 1812 self.parent.remove(self.name, recursive=True) 1813 self.parent = newparent 1814 self.name = newname 1815 self.lock = self.parent.root_collection().lock 1816 1817 @synchronized 1818 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1819 """Encode empty directories by using an \056-named (".") empty file""" 1820 if len(self._items) == 0: 1821 return "%s %s 0:0:\\056\n" % ( 1822 escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1823 return super(Subcollection, self)._get_manifest_text(stream_name, 1824 strip, normalize, 1825 only_committed)
Read and manipulate a stream/directory within an Arvados collection
This class represents a single stream (like a directory) within an Arvados
Collection
. It is returned by Collection.find
and provides the same API.
Operations that work on the API collection record propagate to the parent
Collection
object.
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Inherited Members
1828class CollectionReader(Collection): 1829 """Read-only `Collection` subclass 1830 1831 This class will never create or update any API collection records. You can 1832 use this class for additional code safety when you only need to read 1833 existing collections. 1834 """ 1835 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1836 self._in_init = True 1837 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1838 self._in_init = False 1839 1840 # Forego any locking since it should never change once initialized. 1841 self.lock = NoopLock() 1842 1843 # Backwards compatability with old CollectionReader 1844 # all_streams() and all_files() 1845 self._streams = None 1846 1847 def writable(self) -> bool: 1848 return self._in_init 1849 1850 def _populate_streams(orig_func): 1851 @functools.wraps(orig_func) 1852 def populate_streams_wrapper(self, *args, **kwargs): 1853 # Defer populating self._streams until needed since it creates a copy of the manifest. 1854 if self._streams is None: 1855 if self._manifest_text: 1856 self._streams = [sline.split() 1857 for sline in self._manifest_text.split("\n") 1858 if sline] 1859 else: 1860 self._streams = [] 1861 return orig_func(self, *args, **kwargs) 1862 return populate_streams_wrapper 1863 1864 @arvados.util._deprecated('3.0', 'Collection iteration') 1865 @_populate_streams 1866 def normalize(self): 1867 """Normalize the streams returned by `all_streams`""" 1868 streams = {} 1869 for s in self.all_streams(): 1870 for f in s.all_files(): 1871 streamname, filename = split(s.name() + "/" + f.name()) 1872 if streamname not in streams: 1873 streams[streamname] = {} 1874 if filename not in streams[streamname]: 1875 streams[streamname][filename] = [] 1876 for r in f.segments: 1877 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1878 1879 self._streams = [normalize_stream(s, streams[s]) 1880 for s in sorted(streams)] 1881 1882 @arvados.util._deprecated('3.0', 'Collection iteration') 1883 @_populate_streams 1884 def all_streams(self): 1885 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1886 for s in self._streams] 1887 1888 @arvados.util._deprecated('3.0', 'Collection iteration') 1889 @_populate_streams 1890 def all_files(self): 1891 for s in self.all_streams(): 1892 for f in s.all_files(): 1893 yield f
Read-only Collection
subclass
This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.
1835 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1836 self._in_init = True 1837 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1838 self._in_init = False 1839 1840 # Forego any locking since it should never change once initialized. 1841 self.lock = NoopLock() 1842 1843 # Backwards compatability with old CollectionReader 1844 # all_streams() and all_files() 1845 self._streams = None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.safeapi.ThreadSafeApiCache
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
1864 @arvados.util._deprecated('3.0', 'Collection iteration') 1865 @_populate_streams 1866 def normalize(self): 1867 """Normalize the streams returned by `all_streams`""" 1868 streams = {} 1869 for s in self.all_streams(): 1870 for f in s.all_files(): 1871 streamname, filename = split(s.name() + "/" + f.name()) 1872 if streamname not in streams: 1873 streams[streamname] = {} 1874 if filename not in streams[streamname]: 1875 streams[streamname][filename] = [] 1876 for r in f.segments: 1877 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1878 1879 self._streams = [normalize_stream(s, streams[s]) 1880 for s in sorted(streams)]
Normalize the streams returned by all_streams
1882 @arvados.util._deprecated('3.0', 'Collection iteration') 1883 @_populate_streams 1884 def all_streams(self): 1885 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1886 for s in self._streams]
1888 @arvados.util._deprecated('3.0', 'Collection iteration') 1889 @_populate_streams 1890 def all_files(self): 1891 for s in self.all_streams(): 1892 for f in s.all_files(): 1893 yield f
Inherited Members
1896class CollectionWriter(CollectionBase): 1897 """Create a new collection from scratch 1898 1899 .. WARNING:: Deprecated 1900 This class is deprecated. Prefer `arvados.collection.Collection` 1901 instead. 1902 """ 1903 1904 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 1905 def __init__(self, api_client=None, num_retries=0, replication=None): 1906 """Instantiate a CollectionWriter. 1907 1908 CollectionWriter lets you build a new Arvados Collection from scratch. 1909 Write files to it. The CollectionWriter will upload data to Keep as 1910 appropriate, and provide you with the Collection manifest text when 1911 you're finished. 1912 1913 Arguments: 1914 * api_client: The API client to use to look up Collections. If not 1915 provided, CollectionReader will build one from available Arvados 1916 configuration. 1917 * num_retries: The default number of times to retry failed 1918 service requests. Default 0. You may change this value 1919 after instantiation, but note those changes may not 1920 propagate to related objects like the Keep client. 1921 * replication: The number of copies of each block to store. 1922 If this argument is None or not supplied, replication is 1923 the server-provided default if available, otherwise 2. 1924 """ 1925 self._api_client = api_client 1926 self.num_retries = num_retries 1927 self.replication = (2 if replication is None else replication) 1928 self._keep_client = None 1929 self._data_buffer = [] 1930 self._data_buffer_len = 0 1931 self._current_stream_files = [] 1932 self._current_stream_length = 0 1933 self._current_stream_locators = [] 1934 self._current_stream_name = '.' 1935 self._current_file_name = None 1936 self._current_file_pos = 0 1937 self._finished_streams = [] 1938 self._close_file = None 1939 self._queued_file = None 1940 self._queued_dirents = deque() 1941 self._queued_trees = deque() 1942 self._last_open = None 1943 1944 def __exit__(self, exc_type, exc_value, traceback): 1945 if exc_type is None: 1946 self.finish() 1947 1948 def do_queued_work(self): 1949 # The work queue consists of three pieces: 1950 # * _queued_file: The file object we're currently writing to the 1951 # Collection. 1952 # * _queued_dirents: Entries under the current directory 1953 # (_queued_trees[0]) that we want to write or recurse through. 1954 # This may contain files from subdirectories if 1955 # max_manifest_depth == 0 for this directory. 1956 # * _queued_trees: Directories that should be written as separate 1957 # streams to the Collection. 1958 # This function handles the smallest piece of work currently queued 1959 # (current file, then current directory, then next directory) until 1960 # no work remains. The _work_THING methods each do a unit of work on 1961 # THING. _queue_THING methods add a THING to the work queue. 1962 while True: 1963 if self._queued_file: 1964 self._work_file() 1965 elif self._queued_dirents: 1966 self._work_dirents() 1967 elif self._queued_trees: 1968 self._work_trees() 1969 else: 1970 break 1971 1972 def _work_file(self): 1973 while True: 1974 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) 1975 if not buf: 1976 break 1977 self.write(buf) 1978 self.finish_current_file() 1979 if self._close_file: 1980 self._queued_file.close() 1981 self._close_file = None 1982 self._queued_file = None 1983 1984 def _work_dirents(self): 1985 path, stream_name, max_manifest_depth = self._queued_trees[0] 1986 if stream_name != self.current_stream_name(): 1987 self.start_new_stream(stream_name) 1988 while self._queued_dirents: 1989 dirent = self._queued_dirents.popleft() 1990 target = os.path.join(path, dirent) 1991 if os.path.isdir(target): 1992 self._queue_tree(target, 1993 os.path.join(stream_name, dirent), 1994 max_manifest_depth - 1) 1995 else: 1996 self._queue_file(target, dirent) 1997 break 1998 if not self._queued_dirents: 1999 self._queued_trees.popleft() 2000 2001 def _work_trees(self): 2002 path, stream_name, max_manifest_depth = self._queued_trees[0] 2003 d = arvados.util.listdir_recursive( 2004 path, max_depth = (None if max_manifest_depth == 0 else 0)) 2005 if d: 2006 self._queue_dirents(stream_name, d) 2007 else: 2008 self._queued_trees.popleft() 2009 2010 def _queue_file(self, source, filename=None): 2011 assert (self._queued_file is None), "tried to queue more than one file" 2012 if not hasattr(source, 'read'): 2013 source = open(source, 'rb') 2014 self._close_file = True 2015 else: 2016 self._close_file = False 2017 if filename is None: 2018 filename = os.path.basename(source.name) 2019 self.start_new_file(filename) 2020 self._queued_file = source 2021 2022 def _queue_dirents(self, stream_name, dirents): 2023 assert (not self._queued_dirents), "tried to queue more than one tree" 2024 self._queued_dirents = deque(sorted(dirents)) 2025 2026 def _queue_tree(self, path, stream_name, max_manifest_depth): 2027 self._queued_trees.append((path, stream_name, max_manifest_depth)) 2028 2029 def write_file(self, source, filename=None): 2030 self._queue_file(source, filename) 2031 self.do_queued_work() 2032 2033 def write_directory_tree(self, 2034 path, stream_name='.', max_manifest_depth=-1): 2035 self._queue_tree(path, stream_name, max_manifest_depth) 2036 self.do_queued_work() 2037 2038 def write(self, newdata): 2039 if isinstance(newdata, bytes): 2040 pass 2041 elif isinstance(newdata, str): 2042 newdata = newdata.encode() 2043 elif hasattr(newdata, '__iter__'): 2044 for s in newdata: 2045 self.write(s) 2046 return 2047 self._data_buffer.append(newdata) 2048 self._data_buffer_len += len(newdata) 2049 self._current_stream_length += len(newdata) 2050 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 2051 self.flush_data() 2052 2053 def open(self, streampath, filename=None): 2054 """open(streampath[, filename]) -> file-like object 2055 2056 Pass in the path of a file to write to the Collection, either as a 2057 single string or as two separate stream name and file name arguments. 2058 This method returns a file-like object you can write to add it to the 2059 Collection. 2060 2061 You may only have one file object from the Collection open at a time, 2062 so be sure to close the object when you're done. Using the object in 2063 a with statement makes that easy: 2064 2065 with cwriter.open('./doc/page1.txt') as outfile: 2066 outfile.write(page1_data) 2067 with cwriter.open('./doc/page2.txt') as outfile: 2068 outfile.write(page2_data) 2069 """ 2070 if filename is None: 2071 streampath, filename = split(streampath) 2072 if self._last_open and not self._last_open.closed: 2073 raise errors.AssertionError( 2074 u"can't open '{}' when '{}' is still open".format( 2075 filename, self._last_open.name)) 2076 if streampath != self.current_stream_name(): 2077 self.start_new_stream(streampath) 2078 self.set_current_file_name(filename) 2079 self._last_open = _WriterFile(self, filename) 2080 return self._last_open 2081 2082 def flush_data(self): 2083 data_buffer = b''.join(self._data_buffer) 2084 if data_buffer: 2085 self._current_stream_locators.append( 2086 self._my_keep().put( 2087 data_buffer[0:config.KEEP_BLOCK_SIZE], 2088 copies=self.replication)) 2089 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 2090 self._data_buffer_len = len(self._data_buffer[0]) 2091 2092 def start_new_file(self, newfilename=None): 2093 self.finish_current_file() 2094 self.set_current_file_name(newfilename) 2095 2096 def set_current_file_name(self, newfilename): 2097 if re.search(r'[\t\n]', newfilename): 2098 raise errors.AssertionError( 2099 "Manifest filenames cannot contain whitespace: %s" % 2100 newfilename) 2101 elif re.search(r'\x00', newfilename): 2102 raise errors.AssertionError( 2103 "Manifest filenames cannot contain NUL characters: %s" % 2104 newfilename) 2105 self._current_file_name = newfilename 2106 2107 def current_file_name(self): 2108 return self._current_file_name 2109 2110 def finish_current_file(self): 2111 if self._current_file_name is None: 2112 if self._current_file_pos == self._current_stream_length: 2113 return 2114 raise errors.AssertionError( 2115 "Cannot finish an unnamed file " + 2116 "(%d bytes at offset %d in '%s' stream)" % 2117 (self._current_stream_length - self._current_file_pos, 2118 self._current_file_pos, 2119 self._current_stream_name)) 2120 self._current_stream_files.append([ 2121 self._current_file_pos, 2122 self._current_stream_length - self._current_file_pos, 2123 self._current_file_name]) 2124 self._current_file_pos = self._current_stream_length 2125 self._current_file_name = None 2126 2127 def start_new_stream(self, newstreamname='.'): 2128 self.finish_current_stream() 2129 self.set_current_stream_name(newstreamname) 2130 2131 def set_current_stream_name(self, newstreamname): 2132 if re.search(r'[\t\n]', newstreamname): 2133 raise errors.AssertionError( 2134 "Manifest stream names cannot contain whitespace: '%s'" % 2135 (newstreamname)) 2136 self._current_stream_name = '.' if newstreamname=='' else newstreamname 2137 2138 def current_stream_name(self): 2139 return self._current_stream_name 2140 2141 def finish_current_stream(self): 2142 self.finish_current_file() 2143 self.flush_data() 2144 if not self._current_stream_files: 2145 pass 2146 elif self._current_stream_name is None: 2147 raise errors.AssertionError( 2148 "Cannot finish an unnamed stream (%d bytes in %d files)" % 2149 (self._current_stream_length, len(self._current_stream_files))) 2150 else: 2151 if not self._current_stream_locators: 2152 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 2153 self._finished_streams.append([self._current_stream_name, 2154 self._current_stream_locators, 2155 self._current_stream_files]) 2156 self._current_stream_files = [] 2157 self._current_stream_length = 0 2158 self._current_stream_locators = [] 2159 self._current_stream_name = None 2160 self._current_file_pos = 0 2161 self._current_file_name = None 2162 2163 def finish(self): 2164 """Store the manifest in Keep and return its locator. 2165 2166 This is useful for storing manifest fragments (task outputs) 2167 temporarily in Keep during a Crunch job. 2168 2169 In other cases you should make a collection instead, by 2170 sending manifest_text() to the API server's "create 2171 collection" endpoint. 2172 """ 2173 return self._my_keep().put(self.manifest_text().encode(), 2174 copies=self.replication) 2175 2176 def portable_data_hash(self): 2177 stripped = self.stripped_manifest().encode() 2178 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 2179 2180 def manifest_text(self): 2181 self.finish_current_stream() 2182 manifest = '' 2183 2184 for stream in self._finished_streams: 2185 if not re.search(r'^\.(/.*)?$', stream[0]): 2186 manifest += './' 2187 manifest += stream[0].replace(' ', '\\040') 2188 manifest += ' ' + ' '.join(stream[1]) 2189 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 2190 manifest += "\n" 2191 2192 return manifest 2193 2194 def data_locators(self): 2195 ret = [] 2196 for name, locators, files in self._finished_streams: 2197 ret += locators 2198 return ret 2199 2200 def save_new(self, name=None): 2201 return self._api_client.collections().create( 2202 ensure_unique_name=True, 2203 body={ 2204 'name': name, 2205 'manifest_text': self.manifest_text(), 2206 }).execute(num_retries=self.num_retries)
Create a new collection from scratch
1904 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 1905 def __init__(self, api_client=None, num_retries=0, replication=None): 1906 """Instantiate a CollectionWriter. 1907 1908 CollectionWriter lets you build a new Arvados Collection from scratch. 1909 Write files to it. The CollectionWriter will upload data to Keep as 1910 appropriate, and provide you with the Collection manifest text when 1911 you're finished. 1912 1913 Arguments: 1914 * api_client: The API client to use to look up Collections. If not 1915 provided, CollectionReader will build one from available Arvados 1916 configuration. 1917 * num_retries: The default number of times to retry failed 1918 service requests. Default 0. You may change this value 1919 after instantiation, but note those changes may not 1920 propagate to related objects like the Keep client. 1921 * replication: The number of copies of each block to store. 1922 If this argument is None or not supplied, replication is 1923 the server-provided default if available, otherwise 2. 1924 """ 1925 self._api_client = api_client 1926 self.num_retries = num_retries 1927 self.replication = (2 if replication is None else replication) 1928 self._keep_client = None 1929 self._data_buffer = [] 1930 self._data_buffer_len = 0 1931 self._current_stream_files = [] 1932 self._current_stream_length = 0 1933 self._current_stream_locators = [] 1934 self._current_stream_name = '.' 1935 self._current_file_name = None 1936 self._current_file_pos = 0 1937 self._finished_streams = [] 1938 self._close_file = None 1939 self._queued_file = None 1940 self._queued_dirents = deque() 1941 self._queued_trees = deque() 1942 self._last_open = None
Instantiate a CollectionWriter.
CollectionWriter lets you build a new Arvados Collection from scratch. Write files to it. The CollectionWriter will upload data to Keep as appropriate, and provide you with the Collection manifest text when you’re finished.
Arguments:
- api_client: The API client to use to look up Collections. If not provided, CollectionReader will build one from available Arvados configuration.
- num_retries: The default number of times to retry failed service requests. Default 0. You may change this value after instantiation, but note those changes may not propagate to related objects like the Keep client.
- replication: The number of copies of each block to store. If this argument is None or not supplied, replication is the server-provided default if available, otherwise 2.
1948 def do_queued_work(self): 1949 # The work queue consists of three pieces: 1950 # * _queued_file: The file object we're currently writing to the 1951 # Collection. 1952 # * _queued_dirents: Entries under the current directory 1953 # (_queued_trees[0]) that we want to write or recurse through. 1954 # This may contain files from subdirectories if 1955 # max_manifest_depth == 0 for this directory. 1956 # * _queued_trees: Directories that should be written as separate 1957 # streams to the Collection. 1958 # This function handles the smallest piece of work currently queued 1959 # (current file, then current directory, then next directory) until 1960 # no work remains. The _work_THING methods each do a unit of work on 1961 # THING. _queue_THING methods add a THING to the work queue. 1962 while True: 1963 if self._queued_file: 1964 self._work_file() 1965 elif self._queued_dirents: 1966 self._work_dirents() 1967 elif self._queued_trees: 1968 self._work_trees() 1969 else: 1970 break
2038 def write(self, newdata): 2039 if isinstance(newdata, bytes): 2040 pass 2041 elif isinstance(newdata, str): 2042 newdata = newdata.encode() 2043 elif hasattr(newdata, '__iter__'): 2044 for s in newdata: 2045 self.write(s) 2046 return 2047 self._data_buffer.append(newdata) 2048 self._data_buffer_len += len(newdata) 2049 self._current_stream_length += len(newdata) 2050 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 2051 self.flush_data()
2053 def open(self, streampath, filename=None): 2054 """open(streampath[, filename]) -> file-like object 2055 2056 Pass in the path of a file to write to the Collection, either as a 2057 single string or as two separate stream name and file name arguments. 2058 This method returns a file-like object you can write to add it to the 2059 Collection. 2060 2061 You may only have one file object from the Collection open at a time, 2062 so be sure to close the object when you're done. Using the object in 2063 a with statement makes that easy: 2064 2065 with cwriter.open('./doc/page1.txt') as outfile: 2066 outfile.write(page1_data) 2067 with cwriter.open('./doc/page2.txt') as outfile: 2068 outfile.write(page2_data) 2069 """ 2070 if filename is None: 2071 streampath, filename = split(streampath) 2072 if self._last_open and not self._last_open.closed: 2073 raise errors.AssertionError( 2074 u"can't open '{}' when '{}' is still open".format( 2075 filename, self._last_open.name)) 2076 if streampath != self.current_stream_name(): 2077 self.start_new_stream(streampath) 2078 self.set_current_file_name(filename) 2079 self._last_open = _WriterFile(self, filename) 2080 return self._last_open
open(streampath[, filename]) -> file-like object
Pass in the path of a file to write to the Collection, either as a single string or as two separate stream name and file name arguments. This method returns a file-like object you can write to add it to the Collection.
You may only have one file object from the Collection open at a time, so be sure to close the object when you’re done. Using the object in a with statement makes that easy:
with cwriter.open('./doc/page1.txt') as outfile:
outfile.write(page1_data)
with cwriter.open('./doc/page2.txt') as outfile:
outfile.write(page2_data)
2082 def flush_data(self): 2083 data_buffer = b''.join(self._data_buffer) 2084 if data_buffer: 2085 self._current_stream_locators.append( 2086 self._my_keep().put( 2087 data_buffer[0:config.KEEP_BLOCK_SIZE], 2088 copies=self.replication)) 2089 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 2090 self._data_buffer_len = len(self._data_buffer[0])
2096 def set_current_file_name(self, newfilename): 2097 if re.search(r'[\t\n]', newfilename): 2098 raise errors.AssertionError( 2099 "Manifest filenames cannot contain whitespace: %s" % 2100 newfilename) 2101 elif re.search(r'\x00', newfilename): 2102 raise errors.AssertionError( 2103 "Manifest filenames cannot contain NUL characters: %s" % 2104 newfilename) 2105 self._current_file_name = newfilename
2110 def finish_current_file(self): 2111 if self._current_file_name is None: 2112 if self._current_file_pos == self._current_stream_length: 2113 return 2114 raise errors.AssertionError( 2115 "Cannot finish an unnamed file " + 2116 "(%d bytes at offset %d in '%s' stream)" % 2117 (self._current_stream_length - self._current_file_pos, 2118 self._current_file_pos, 2119 self._current_stream_name)) 2120 self._current_stream_files.append([ 2121 self._current_file_pos, 2122 self._current_stream_length - self._current_file_pos, 2123 self._current_file_name]) 2124 self._current_file_pos = self._current_stream_length 2125 self._current_file_name = None
2141 def finish_current_stream(self): 2142 self.finish_current_file() 2143 self.flush_data() 2144 if not self._current_stream_files: 2145 pass 2146 elif self._current_stream_name is None: 2147 raise errors.AssertionError( 2148 "Cannot finish an unnamed stream (%d bytes in %d files)" % 2149 (self._current_stream_length, len(self._current_stream_files))) 2150 else: 2151 if not self._current_stream_locators: 2152 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 2153 self._finished_streams.append([self._current_stream_name, 2154 self._current_stream_locators, 2155 self._current_stream_files]) 2156 self._current_stream_files = [] 2157 self._current_stream_length = 0 2158 self._current_stream_locators = [] 2159 self._current_stream_name = None 2160 self._current_file_pos = 0 2161 self._current_file_name = None
2163 def finish(self): 2164 """Store the manifest in Keep and return its locator. 2165 2166 This is useful for storing manifest fragments (task outputs) 2167 temporarily in Keep during a Crunch job. 2168 2169 In other cases you should make a collection instead, by 2170 sending manifest_text() to the API server's "create 2171 collection" endpoint. 2172 """ 2173 return self._my_keep().put(self.manifest_text().encode(), 2174 copies=self.replication)
Store the manifest in Keep and return its locator.
This is useful for storing manifest fragments (task outputs) temporarily in Keep during a Crunch job.
In other cases you should make a collection instead, by sending manifest_text() to the API server’s “create collection” endpoint.
2180 def manifest_text(self): 2181 self.finish_current_stream() 2182 manifest = '' 2183 2184 for stream in self._finished_streams: 2185 if not re.search(r'^\.(/.*)?$', stream[0]): 2186 manifest += './' 2187 manifest += stream[0].replace(' ', '\\040') 2188 manifest += ' ' + ' '.join(stream[1]) 2189 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 2190 manifest += "\n" 2191 2192 return manifest
Inherited Members
2209class ResumableCollectionWriter(CollectionWriter): 2210 """CollectionWriter that can serialize internal state to disk 2211 2212 .. WARNING:: Deprecated 2213 This class is deprecated. Prefer `arvados.collection.Collection` 2214 instead. 2215 """ 2216 2217 STATE_PROPS = ['_current_stream_files', '_current_stream_length', 2218 '_current_stream_locators', '_current_stream_name', 2219 '_current_file_name', '_current_file_pos', '_close_file', 2220 '_data_buffer', '_dependencies', '_finished_streams', 2221 '_queued_dirents', '_queued_trees'] 2222 2223 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 2224 def __init__(self, api_client=None, **kwargs): 2225 self._dependencies = {} 2226 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) 2227 2228 @classmethod 2229 def from_state(cls, state, *init_args, **init_kwargs): 2230 # Try to build a new writer from scratch with the given state. 2231 # If the state is not suitable to resume (because files have changed, 2232 # been deleted, aren't predictable, etc.), raise a 2233 # StaleWriterStateError. Otherwise, return the initialized writer. 2234 # The caller is responsible for calling writer.do_queued_work() 2235 # appropriately after it's returned. 2236 writer = cls(*init_args, **init_kwargs) 2237 for attr_name in cls.STATE_PROPS: 2238 attr_value = state[attr_name] 2239 attr_class = getattr(writer, attr_name).__class__ 2240 # Coerce the value into the same type as the initial value, if 2241 # needed. 2242 if attr_class not in (type(None), attr_value.__class__): 2243 attr_value = attr_class(attr_value) 2244 setattr(writer, attr_name, attr_value) 2245 # Check dependencies before we try to resume anything. 2246 if any(KeepLocator(ls).permission_expired() 2247 for ls in writer._current_stream_locators): 2248 raise errors.StaleWriterStateError( 2249 "locators include expired permission hint") 2250 writer.check_dependencies() 2251 if state['_current_file'] is not None: 2252 path, pos = state['_current_file'] 2253 try: 2254 writer._queued_file = open(path, 'rb') 2255 writer._queued_file.seek(pos) 2256 except IOError as error: 2257 raise errors.StaleWriterStateError( 2258 u"failed to reopen active file {}: {}".format(path, error)) 2259 return writer 2260 2261 def check_dependencies(self): 2262 for path, orig_stat in listitems(self._dependencies): 2263 if not S_ISREG(orig_stat[ST_MODE]): 2264 raise errors.StaleWriterStateError(u"{} not file".format(path)) 2265 try: 2266 now_stat = tuple(os.stat(path)) 2267 except OSError as error: 2268 raise errors.StaleWriterStateError( 2269 u"failed to stat {}: {}".format(path, error)) 2270 if ((not S_ISREG(now_stat[ST_MODE])) or 2271 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 2272 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 2273 raise errors.StaleWriterStateError(u"{} changed".format(path)) 2274 2275 def dump_state(self, copy_func=lambda x: x): 2276 state = {attr: copy_func(getattr(self, attr)) 2277 for attr in self.STATE_PROPS} 2278 if self._queued_file is None: 2279 state['_current_file'] = None 2280 else: 2281 state['_current_file'] = (os.path.realpath(self._queued_file.name), 2282 self._queued_file.tell()) 2283 return state 2284 2285 def _queue_file(self, source, filename=None): 2286 try: 2287 src_path = os.path.realpath(source) 2288 except Exception: 2289 raise errors.AssertionError(u"{} not a file path".format(source)) 2290 try: 2291 path_stat = os.stat(src_path) 2292 except OSError as stat_error: 2293 path_stat = None 2294 super(ResumableCollectionWriter, self)._queue_file(source, filename) 2295 fd_stat = os.fstat(self._queued_file.fileno()) 2296 if not S_ISREG(fd_stat.st_mode): 2297 # We won't be able to resume from this cache anyway, so don't 2298 # worry about further checks. 2299 self._dependencies[source] = tuple(fd_stat) 2300 elif path_stat is None: 2301 raise errors.AssertionError( 2302 u"could not stat {}: {}".format(source, stat_error)) 2303 elif path_stat.st_ino != fd_stat.st_ino: 2304 raise errors.AssertionError( 2305 u"{} changed between open and stat calls".format(source)) 2306 else: 2307 self._dependencies[src_path] = tuple(fd_stat) 2308 2309 def write(self, data): 2310 if self._queued_file is None: 2311 raise errors.AssertionError( 2312 "resumable writer can't accept unsourced data") 2313 return super(ResumableCollectionWriter, self).write(data)
CollectionWriter that can serialize internal state to disk
2223 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 2224 def __init__(self, api_client=None, **kwargs): 2225 self._dependencies = {} 2226 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2228 @classmethod 2229 def from_state(cls, state, *init_args, **init_kwargs): 2230 # Try to build a new writer from scratch with the given state. 2231 # If the state is not suitable to resume (because files have changed, 2232 # been deleted, aren't predictable, etc.), raise a 2233 # StaleWriterStateError. Otherwise, return the initialized writer. 2234 # The caller is responsible for calling writer.do_queued_work() 2235 # appropriately after it's returned. 2236 writer = cls(*init_args, **init_kwargs) 2237 for attr_name in cls.STATE_PROPS: 2238 attr_value = state[attr_name] 2239 attr_class = getattr(writer, attr_name).__class__ 2240 # Coerce the value into the same type as the initial value, if 2241 # needed. 2242 if attr_class not in (type(None), attr_value.__class__): 2243 attr_value = attr_class(attr_value) 2244 setattr(writer, attr_name, attr_value) 2245 # Check dependencies before we try to resume anything. 2246 if any(KeepLocator(ls).permission_expired() 2247 for ls in writer._current_stream_locators): 2248 raise errors.StaleWriterStateError( 2249 "locators include expired permission hint") 2250 writer.check_dependencies() 2251 if state['_current_file'] is not None: 2252 path, pos = state['_current_file'] 2253 try: 2254 writer._queued_file = open(path, 'rb') 2255 writer._queued_file.seek(pos) 2256 except IOError as error: 2257 raise errors.StaleWriterStateError( 2258 u"failed to reopen active file {}: {}".format(path, error)) 2259 return writer
2261 def check_dependencies(self): 2262 for path, orig_stat in listitems(self._dependencies): 2263 if not S_ISREG(orig_stat[ST_MODE]): 2264 raise errors.StaleWriterStateError(u"{} not file".format(path)) 2265 try: 2266 now_stat = tuple(os.stat(path)) 2267 except OSError as error: 2268 raise errors.StaleWriterStateError( 2269 u"failed to stat {}: {}".format(path, error)) 2270 if ((not S_ISREG(now_stat[ST_MODE])) or 2271 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 2272 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 2273 raise errors.StaleWriterStateError(u"{} changed".format(path))
2275 def dump_state(self, copy_func=lambda x: x): 2276 state = {attr: copy_func(getattr(self, attr)) 2277 for attr in self.STATE_PROPS} 2278 if self._queued_file is None: 2279 state['_current_file'] = None 2280 else: 2281 state['_current_file'] = (os.path.realpath(self._queued_file.name), 2282 self._queued_file.tell()) 2283 return state
Inherited Members
- CollectionWriter
- num_retries
- replication
- do_queued_work
- write_file
- write_directory_tree
- open
- flush_data
- start_new_file
- set_current_file_name
- current_file_name
- finish_current_file
- start_new_stream
- set_current_stream_name
- current_stream_name
- finish_current_stream
- finish
- portable_data_hash
- manifest_text
- data_locators
- save_new