arvados.collection
Tools to work with Arvados collections
This module provides high-level interfaces to create, read, and update
Arvados collections. Most users will want to instantiate Collection
objects, and use methods like Collection.open
and Collection.mkdirs
to
read and write data in the collection. Refer to the Arvados Python SDK
cookbook for an introduction to using the Collection class.
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4"""Tools to work with Arvados collections 5 6This module provides high-level interfaces to create, read, and update 7Arvados collections. Most users will want to instantiate `Collection` 8objects, and use methods like `Collection.open` and `Collection.mkdirs` to 9read and write data in the collection. Refer to the Arvados Python SDK 10cookbook for [an introduction to using the Collection class][cookbook]. 11 12[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 13""" 14 15from __future__ import absolute_import 16from future.utils import listitems, listvalues, viewkeys 17from builtins import str 18from past.builtins import basestring 19from builtins import object 20import ciso8601 21import datetime 22import errno 23import functools 24import hashlib 25import io 26import logging 27import os 28import re 29import sys 30import threading 31import time 32 33from collections import deque 34from stat import * 35 36from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock 37from .keep import KeepLocator, KeepClient 38from .stream import StreamReader 39from ._normalize_stream import normalize_stream, escape 40from ._ranges import Range, LocatorAndRange 41from .safeapi import ThreadSafeApiCache 42import arvados.config as config 43import arvados.errors as errors 44import arvados.util 45import arvados.events as events 46from arvados.retry import retry_method 47 48from typing import ( 49 Any, 50 Callable, 51 Dict, 52 IO, 53 Iterator, 54 List, 55 Mapping, 56 Optional, 57 Tuple, 58 Union, 59) 60 61if sys.version_info < (3, 8): 62 from typing_extensions import Literal 63else: 64 from typing import Literal 65 66_logger = logging.getLogger('arvados.collection') 67 68ADD = "add" 69"""Argument value for `Collection` methods to represent an added item""" 70DEL = "del" 71"""Argument value for `Collection` methods to represent a removed item""" 72MOD = "mod" 73"""Argument value for `Collection` methods to represent a modified item""" 74TOK = "tok" 75"""Argument value for `Collection` methods to represent an item with token differences""" 76FILE = "file" 77"""`create_type` value for `Collection.find_or_create`""" 78COLLECTION = "collection" 79"""`create_type` value for `Collection.find_or_create`""" 80 81ChangeList = List[Union[ 82 Tuple[Literal[ADD, DEL], str, 'Collection'], 83 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'], 84]] 85ChangeType = Literal[ADD, DEL, MOD, TOK] 86CollectionItem = Union[ArvadosFile, 'Collection'] 87ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object] 88CreateType = Literal[COLLECTION, FILE] 89Properties = Dict[str, Any] 90StorageClasses = List[str] 91 92class CollectionBase(object): 93 """Abstract base class for Collection classes 94 95 .. ATTENTION:: Internal 96 This class is meant to be used by other parts of the SDK. User code 97 should instantiate or subclass `Collection` or one of its subclasses 98 directly. 99 """ 100 101 def __enter__(self): 102 """Enter a context block with this collection instance""" 103 return self 104 105 def __exit__(self, exc_type, exc_value, traceback): 106 """Exit a context block with this collection instance""" 107 pass 108 109 def _my_keep(self): 110 if self._keep_client is None: 111 self._keep_client = KeepClient(api_client=self._api_client, 112 num_retries=self.num_retries) 113 return self._keep_client 114 115 def stripped_manifest(self) -> str: 116 """Create a copy of the collection manifest with only size hints 117 118 This method returns a string with the current collection's manifest 119 text with all non-portable locator hints like permission hints and 120 remote cluster hints removed. The only hints in the returned manifest 121 will be size hints. 122 """ 123 raw = self.manifest_text() 124 clean = [] 125 for line in raw.split("\n"): 126 fields = line.split() 127 if fields: 128 clean_fields = fields[:1] + [ 129 (re.sub(r'\+[^\d][^\+]*', '', x) 130 if re.match(arvados.util.keep_locator_pattern, x) 131 else x) 132 for x in fields[1:]] 133 clean += [' '.join(clean_fields), "\n"] 134 return ''.join(clean) 135 136 137class _WriterFile(_FileLikeObjectBase): 138 def __init__(self, coll_writer, name): 139 super(_WriterFile, self).__init__(name, 'wb') 140 self.dest = coll_writer 141 142 def close(self): 143 super(_WriterFile, self).close() 144 self.dest.finish_current_file() 145 146 @_FileLikeObjectBase._before_close 147 def write(self, data): 148 self.dest.write(data) 149 150 @_FileLikeObjectBase._before_close 151 def writelines(self, seq): 152 for data in seq: 153 self.write(data) 154 155 @_FileLikeObjectBase._before_close 156 def flush(self): 157 self.dest.flush_data() 158 159 160class RichCollectionBase(CollectionBase): 161 """Base class for Collection classes 162 163 .. ATTENTION:: Internal 164 This class is meant to be used by other parts of the SDK. User code 165 should instantiate or subclass `Collection` or one of its subclasses 166 directly. 167 """ 168 169 def __init__(self, parent=None): 170 self.parent = parent 171 self._committed = False 172 self._has_remote_blocks = False 173 self._callback = None 174 self._items = {} 175 176 def _my_api(self): 177 raise NotImplementedError() 178 179 def _my_keep(self): 180 raise NotImplementedError() 181 182 def _my_block_manager(self): 183 raise NotImplementedError() 184 185 def writable(self) -> bool: 186 """Indicate whether this collection object can be modified 187 188 This method returns `False` if this object is a `CollectionReader`, 189 else `True`. 190 """ 191 raise NotImplementedError() 192 193 def root_collection(self) -> 'Collection': 194 """Get this collection's root collection object 195 196 If you open a subcollection with `Collection.find`, calling this method 197 on that subcollection returns the source Collection object. 198 """ 199 raise NotImplementedError() 200 201 def stream_name(self) -> str: 202 """Get the name of the manifest stream represented by this collection 203 204 If you open a subcollection with `Collection.find`, calling this method 205 on that subcollection returns the name of the stream you opened. 206 """ 207 raise NotImplementedError() 208 209 @synchronized 210 def has_remote_blocks(self) -> bool: 211 """Indiciate whether the collection refers to remote data 212 213 Returns `True` if the collection manifest includes any Keep locators 214 with a remote hint (`+R`), else `False`. 215 """ 216 if self._has_remote_blocks: 217 return True 218 for item in self: 219 if self[item].has_remote_blocks(): 220 return True 221 return False 222 223 @synchronized 224 def set_has_remote_blocks(self, val: bool) -> None: 225 """Cache whether this collection refers to remote blocks 226 227 .. ATTENTION:: Internal 228 This method is only meant to be used by other Collection methods. 229 230 Set this collection's cached "has remote blocks" flag to the given 231 value. 232 """ 233 self._has_remote_blocks = val 234 if self.parent: 235 self.parent.set_has_remote_blocks(val) 236 237 @must_be_writable 238 @synchronized 239 def find_or_create( 240 self, 241 path: str, 242 create_type: CreateType, 243 ) -> CollectionItem: 244 """Get the item at the given path, creating it if necessary 245 246 If `path` refers to a stream in this collection, returns a 247 corresponding `Subcollection` object. If `path` refers to a file in 248 this collection, returns a corresponding 249 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 250 this collection, then this method creates a new object and returns 251 it, creating parent streams as needed. The type of object created is 252 determined by the value of `create_type`. 253 254 Arguments: 255 256 * path: str --- The path to find or create within this collection. 257 258 * create_type: Literal[COLLECTION, FILE] --- The type of object to 259 create at `path` if one does not exist. Passing `COLLECTION` 260 creates a stream and returns the corresponding 261 `Subcollection`. Passing `FILE` creates a new file and returns the 262 corresponding `arvados.arvfile.ArvadosFile`. 263 """ 264 pathcomponents = path.split("/", 1) 265 if pathcomponents[0]: 266 item = self._items.get(pathcomponents[0]) 267 if len(pathcomponents) == 1: 268 if item is None: 269 # create new file 270 if create_type == COLLECTION: 271 item = Subcollection(self, pathcomponents[0]) 272 else: 273 item = ArvadosFile(self, pathcomponents[0]) 274 self._items[pathcomponents[0]] = item 275 self.set_committed(False) 276 self.notify(ADD, self, pathcomponents[0], item) 277 return item 278 else: 279 if item is None: 280 # create new collection 281 item = Subcollection(self, pathcomponents[0]) 282 self._items[pathcomponents[0]] = item 283 self.set_committed(False) 284 self.notify(ADD, self, pathcomponents[0], item) 285 if isinstance(item, RichCollectionBase): 286 return item.find_or_create(pathcomponents[1], create_type) 287 else: 288 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 289 else: 290 return self 291 292 @synchronized 293 def find(self, path: str) -> CollectionItem: 294 """Get the item at the given path 295 296 If `path` refers to a stream in this collection, returns a 297 corresponding `Subcollection` object. If `path` refers to a file in 298 this collection, returns a corresponding 299 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 300 this collection, then this method raises `NotADirectoryError`. 301 302 Arguments: 303 304 * path: str --- The path to find or create within this collection. 305 """ 306 if not path: 307 raise errors.ArgumentError("Parameter 'path' is empty.") 308 309 pathcomponents = path.split("/", 1) 310 if pathcomponents[0] == '': 311 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 312 313 item = self._items.get(pathcomponents[0]) 314 if item is None: 315 return None 316 elif len(pathcomponents) == 1: 317 return item 318 else: 319 if isinstance(item, RichCollectionBase): 320 if pathcomponents[1]: 321 return item.find(pathcomponents[1]) 322 else: 323 return item 324 else: 325 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 326 327 @synchronized 328 def mkdirs(self, path: str) -> 'Subcollection': 329 """Create and return a subcollection at `path` 330 331 If `path` exists within this collection, raises `FileExistsError`. 332 Otherwise, creates a stream at that path and returns the 333 corresponding `Subcollection`. 334 """ 335 if self.find(path) != None: 336 raise IOError(errno.EEXIST, "Directory or file exists", path) 337 338 return self.find_or_create(path, COLLECTION) 339 340 def open( 341 self, 342 path: str, 343 mode: str="r", 344 encoding: Optional[str]=None 345 ) -> IO: 346 """Open a file-like object within the collection 347 348 This method returns a file-like object that can read and/or write the 349 file located at `path` within the collection. If you attempt to write 350 a `path` that does not exist, the file is created with `find_or_create`. 351 If the file cannot be opened for any other reason, this method raises 352 `OSError` with an appropriate errno. 353 354 Arguments: 355 356 * path: str --- The path of the file to open within this collection 357 358 * mode: str --- The mode to open this file. Supports all the same 359 values as `builtins.open`. 360 361 * encoding: str | None --- The text encoding of the file. Only used 362 when the file is opened in text mode. The default is 363 platform-dependent. 364 365 """ 366 if not re.search(r'^[rwa][bt]?\+?$', mode): 367 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 368 369 if mode[0] == 'r' and '+' not in mode: 370 fclass = ArvadosFileReader 371 arvfile = self.find(path) 372 elif not self.writable(): 373 raise IOError(errno.EROFS, "Collection is read only") 374 else: 375 fclass = ArvadosFileWriter 376 arvfile = self.find_or_create(path, FILE) 377 378 if arvfile is None: 379 raise IOError(errno.ENOENT, "File not found", path) 380 if not isinstance(arvfile, ArvadosFile): 381 raise IOError(errno.EISDIR, "Is a directory", path) 382 383 if mode[0] == 'w': 384 arvfile.truncate(0) 385 386 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 387 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 388 if 'b' not in mode: 389 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 390 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 391 return f 392 393 def modified(self) -> bool: 394 """Indicate whether this collection has an API server record 395 396 Returns `False` if this collection corresponds to a record loaded from 397 the API server, `True` otherwise. 398 """ 399 return not self.committed() 400 401 @synchronized 402 def committed(self): 403 """Indicate whether this collection has an API server record 404 405 Returns `True` if this collection corresponds to a record loaded from 406 the API server, `False` otherwise. 407 """ 408 return self._committed 409 410 @synchronized 411 def set_committed(self, value: bool=True): 412 """Cache whether this collection has an API server record 413 414 .. ATTENTION:: Internal 415 This method is only meant to be used by other Collection methods. 416 417 Set this collection's cached "committed" flag to the given 418 value and propagates it as needed. 419 """ 420 if value == self._committed: 421 return 422 if value: 423 for k,v in listitems(self._items): 424 v.set_committed(True) 425 self._committed = True 426 else: 427 self._committed = False 428 if self.parent is not None: 429 self.parent.set_committed(False) 430 431 @synchronized 432 def __iter__(self) -> Iterator[str]: 433 """Iterate names of streams and files in this collection 434 435 This method does not recurse. It only iterates the contents of this 436 collection's corresponding stream. 437 """ 438 return iter(viewkeys(self._items)) 439 440 @synchronized 441 def __getitem__(self, k: str) -> CollectionItem: 442 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 443 444 This method does not recurse. If you want to search a path, use 445 `RichCollectionBase.find` instead. 446 """ 447 return self._items[k] 448 449 @synchronized 450 def __contains__(self, k: str) -> bool: 451 """Indicate whether this collection has an item with this name 452 453 This method does not recurse. It you want to check a path, use 454 `RichCollectionBase.exists` instead. 455 """ 456 return k in self._items 457 458 @synchronized 459 def __len__(self): 460 """Get the number of items directly contained in this collection 461 462 This method does not recurse. It only counts the streams and files 463 in this collection's corresponding stream. 464 """ 465 return len(self._items) 466 467 @must_be_writable 468 @synchronized 469 def __delitem__(self, p: str) -> None: 470 """Delete an item from this collection's stream 471 472 This method does not recurse. If you want to remove an item by a 473 path, use `RichCollectionBase.remove` instead. 474 """ 475 del self._items[p] 476 self.set_committed(False) 477 self.notify(DEL, self, p, None) 478 479 @synchronized 480 def keys(self) -> Iterator[str]: 481 """Iterate names of streams and files in this collection 482 483 This method does not recurse. It only iterates the contents of this 484 collection's corresponding stream. 485 """ 486 return self._items.keys() 487 488 @synchronized 489 def values(self) -> List[CollectionItem]: 490 """Get a list of objects in this collection's stream 491 492 The return value includes a `Subcollection` for every stream, and an 493 `arvados.arvfile.ArvadosFile` for every file, directly within this 494 collection's stream. This method does not recurse. 495 """ 496 return listvalues(self._items) 497 498 @synchronized 499 def items(self) -> List[Tuple[str, CollectionItem]]: 500 """Get a list of `(name, object)` tuples from this collection's stream 501 502 The return value includes a `Subcollection` for every stream, and an 503 `arvados.arvfile.ArvadosFile` for every file, directly within this 504 collection's stream. This method does not recurse. 505 """ 506 return listitems(self._items) 507 508 def exists(self, path: str) -> bool: 509 """Indicate whether this collection includes an item at `path` 510 511 This method returns `True` if `path` refers to a stream or file within 512 this collection, else `False`. 513 514 Arguments: 515 516 * path: str --- The path to check for existence within this collection 517 """ 518 return self.find(path) is not None 519 520 @must_be_writable 521 @synchronized 522 def remove(self, path: str, recursive: bool=False) -> None: 523 """Remove the file or stream at `path` 524 525 Arguments: 526 527 * path: str --- The path of the item to remove from the collection 528 529 * recursive: bool --- Controls the method's behavior if `path` refers 530 to a nonempty stream. If `False` (the default), this method raises 531 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 532 items under the stream. 533 """ 534 if not path: 535 raise errors.ArgumentError("Parameter 'path' is empty.") 536 537 pathcomponents = path.split("/", 1) 538 item = self._items.get(pathcomponents[0]) 539 if item is None: 540 raise IOError(errno.ENOENT, "File not found", path) 541 if len(pathcomponents) == 1: 542 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 543 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 544 deleteditem = self._items[pathcomponents[0]] 545 del self._items[pathcomponents[0]] 546 self.set_committed(False) 547 self.notify(DEL, self, pathcomponents[0], deleteditem) 548 else: 549 item.remove(pathcomponents[1], recursive=recursive) 550 551 def _clonefrom(self, source): 552 for k,v in listitems(source): 553 self._items[k] = v.clone(self, k) 554 555 def clone(self): 556 raise NotImplementedError() 557 558 @must_be_writable 559 @synchronized 560 def add( 561 self, 562 source_obj: CollectionItem, 563 target_name: str, 564 overwrite: bool=False, 565 reparent: bool=False, 566 ) -> None: 567 """Copy or move a file or subcollection object to this collection 568 569 Arguments: 570 571 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 572 to add to this collection 573 574 * target_name: str --- The path inside this collection where 575 `source_obj` should be added. 576 577 * overwrite: bool --- Controls the behavior of this method when the 578 collection already contains an object at `target_name`. If `False` 579 (the default), this method will raise `FileExistsError`. If `True`, 580 the object at `target_name` will be replaced with `source_obj`. 581 582 * reparent: bool --- Controls whether this method copies or moves 583 `source_obj`. If `False` (the default), `source_obj` is copied into 584 this collection. If `True`, `source_obj` is moved into this 585 collection. 586 """ 587 if target_name in self and not overwrite: 588 raise IOError(errno.EEXIST, "File already exists", target_name) 589 590 modified_from = None 591 if target_name in self: 592 modified_from = self[target_name] 593 594 # Actually make the move or copy. 595 if reparent: 596 source_obj._reparent(self, target_name) 597 item = source_obj 598 else: 599 item = source_obj.clone(self, target_name) 600 601 self._items[target_name] = item 602 self.set_committed(False) 603 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 604 self.set_has_remote_blocks(True) 605 606 if modified_from: 607 self.notify(MOD, self, target_name, (modified_from, item)) 608 else: 609 self.notify(ADD, self, target_name, item) 610 611 def _get_src_target(self, source, target_path, source_collection, create_dest): 612 if source_collection is None: 613 source_collection = self 614 615 # Find the object 616 if isinstance(source, basestring): 617 source_obj = source_collection.find(source) 618 if source_obj is None: 619 raise IOError(errno.ENOENT, "File not found", source) 620 sourcecomponents = source.split("/") 621 else: 622 source_obj = source 623 sourcecomponents = None 624 625 # Find parent collection the target path 626 targetcomponents = target_path.split("/") 627 628 # Determine the name to use. 629 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 630 631 if not target_name: 632 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 633 634 if create_dest: 635 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 636 else: 637 if len(targetcomponents) > 1: 638 target_dir = self.find("/".join(targetcomponents[0:-1])) 639 else: 640 target_dir = self 641 642 if target_dir is None: 643 raise IOError(errno.ENOENT, "Target directory not found", target_name) 644 645 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 646 target_dir = target_dir[target_name] 647 target_name = sourcecomponents[-1] 648 649 return (source_obj, target_dir, target_name) 650 651 @must_be_writable 652 @synchronized 653 def copy( 654 self, 655 source: Union[str, CollectionItem], 656 target_path: str, 657 source_collection: Optional['RichCollectionBase']=None, 658 overwrite: bool=False, 659 ) -> None: 660 """Copy a file or subcollection object to this collection 661 662 Arguments: 663 664 * source: str | arvados.arvfile.ArvadosFile | 665 arvados.collection.Subcollection --- The file or subcollection to 666 add to this collection. If `source` is a str, the object will be 667 found by looking up this path from `source_collection` (see 668 below). 669 670 * target_path: str --- The path inside this collection where the 671 source object should be added. 672 673 * source_collection: arvados.collection.Collection | None --- The 674 collection to find the source object from when `source` is a 675 path. Defaults to the current collection (`self`). 676 677 * overwrite: bool --- Controls the behavior of this method when the 678 collection already contains an object at `target_path`. If `False` 679 (the default), this method will raise `FileExistsError`. If `True`, 680 the object at `target_path` will be replaced with `source_obj`. 681 """ 682 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 683 target_dir.add(source_obj, target_name, overwrite, False) 684 685 @must_be_writable 686 @synchronized 687 def rename( 688 self, 689 source: Union[str, CollectionItem], 690 target_path: str, 691 source_collection: Optional['RichCollectionBase']=None, 692 overwrite: bool=False, 693 ) -> None: 694 """Move a file or subcollection object to this collection 695 696 Arguments: 697 698 * source: str | arvados.arvfile.ArvadosFile | 699 arvados.collection.Subcollection --- The file or subcollection to 700 add to this collection. If `source` is a str, the object will be 701 found by looking up this path from `source_collection` (see 702 below). 703 704 * target_path: str --- The path inside this collection where the 705 source object should be added. 706 707 * source_collection: arvados.collection.Collection | None --- The 708 collection to find the source object from when `source` is a 709 path. Defaults to the current collection (`self`). 710 711 * overwrite: bool --- Controls the behavior of this method when the 712 collection already contains an object at `target_path`. If `False` 713 (the default), this method will raise `FileExistsError`. If `True`, 714 the object at `target_path` will be replaced with `source_obj`. 715 """ 716 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 717 if not source_obj.writable(): 718 raise IOError(errno.EROFS, "Source collection is read only", source) 719 target_dir.add(source_obj, target_name, overwrite, True) 720 721 def portable_manifest_text(self, stream_name: str=".") -> str: 722 """Get the portable manifest text for this collection 723 724 The portable manifest text is normalized, and does not include access 725 tokens. This method does not flush outstanding blocks to Keep. 726 727 Arguments: 728 729 * stream_name: str --- The name to use for this collection's stream in 730 the generated manifest. Default `'.'`. 731 """ 732 return self._get_manifest_text(stream_name, True, True) 733 734 @synchronized 735 def manifest_text( 736 self, 737 stream_name: str=".", 738 strip: bool=False, 739 normalize: bool=False, 740 only_committed: bool=False, 741 ) -> str: 742 """Get the manifest text for this collection 743 744 Arguments: 745 746 * stream_name: str --- The name to use for this collection's stream in 747 the generated manifest. Default `'.'`. 748 749 * strip: bool --- Controls whether or not the returned manifest text 750 includes access tokens. If `False` (the default), the manifest text 751 will include access tokens. If `True`, the manifest text will not 752 include access tokens. 753 754 * normalize: bool --- Controls whether or not the returned manifest 755 text is normalized. Default `False`. 756 757 * only_committed: bool --- Controls whether or not this method uploads 758 pending data to Keep before building and returning the manifest text. 759 If `False` (the default), this method will finish uploading all data 760 to Keep, then return the final manifest. If `True`, this method will 761 build and return a manifest that only refers to the data that has 762 finished uploading at the time this method was called. 763 """ 764 if not only_committed: 765 self._my_block_manager().commit_all() 766 return self._get_manifest_text(stream_name, strip, normalize, 767 only_committed=only_committed) 768 769 @synchronized 770 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 771 """Get the manifest text for this collection, sub collections and files. 772 773 :stream_name: 774 Name to use for this stream (directory) 775 776 :strip: 777 If True, remove signing tokens from block locators if present. 778 If False (default), block locators are left unchanged. 779 780 :normalize: 781 If True, always export the manifest text in normalized form 782 even if the Collection is not modified. If False (default) and the collection 783 is not modified, return the original manifest text even if it is not 784 in normalized form. 785 786 :only_committed: 787 If True, only include blocks that were already committed to Keep. 788 789 """ 790 791 if not self.committed() or self._manifest_text is None or normalize: 792 stream = {} 793 buf = [] 794 sorted_keys = sorted(self.keys()) 795 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 796 # Create a stream per file `k` 797 arvfile = self[filename] 798 filestream = [] 799 for segment in arvfile.segments(): 800 loc = segment.locator 801 if arvfile.parent._my_block_manager().is_bufferblock(loc): 802 if only_committed: 803 continue 804 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 805 if strip: 806 loc = KeepLocator(loc).stripped() 807 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 808 segment.segment_offset, segment.range_size)) 809 stream[filename] = filestream 810 if stream: 811 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 812 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 813 buf.append(self[dirname].manifest_text( 814 stream_name=os.path.join(stream_name, dirname), 815 strip=strip, normalize=True, only_committed=only_committed)) 816 return "".join(buf) 817 else: 818 if strip: 819 return self.stripped_manifest() 820 else: 821 return self._manifest_text 822 823 @synchronized 824 def _copy_remote_blocks(self, remote_blocks={}): 825 """Scan through the entire collection and ask Keep to copy remote blocks. 826 827 When accessing a remote collection, blocks will have a remote signature 828 (+R instead of +A). Collect these signatures and request Keep to copy the 829 blocks to the local cluster, returning local (+A) signatures. 830 831 :remote_blocks: 832 Shared cache of remote to local block mappings. This is used to avoid 833 doing extra work when blocks are shared by more than one file in 834 different subdirectories. 835 836 """ 837 for item in self: 838 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 839 return remote_blocks 840 841 @synchronized 842 def diff( 843 self, 844 end_collection: 'RichCollectionBase', 845 prefix: str=".", 846 holding_collection: Optional['Collection']=None, 847 ) -> ChangeList: 848 """Build a list of differences between this collection and another 849 850 Arguments: 851 852 * end_collection: arvados.collection.RichCollectionBase --- A 853 collection object with the desired end state. The returned diff 854 list will describe how to go from the current collection object 855 `self` to `end_collection`. 856 857 * prefix: str --- The name to use for this collection's stream in 858 the diff list. Default `'.'`. 859 860 * holding_collection: arvados.collection.Collection | None --- A 861 collection object used to hold objects for the returned diff 862 list. By default, a new empty collection is created. 863 """ 864 changes = [] 865 if holding_collection is None: 866 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 867 for k in self: 868 if k not in end_collection: 869 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 870 for k in end_collection: 871 if k in self: 872 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 873 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 874 elif end_collection[k] != self[k]: 875 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 876 else: 877 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 878 else: 879 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 880 return changes 881 882 @must_be_writable 883 @synchronized 884 def apply(self, changes: ChangeList) -> None: 885 """Apply a list of changes from to this collection 886 887 This method takes a list of changes generated by 888 `RichCollectionBase.diff` and applies it to this 889 collection. Afterward, the state of this collection object will 890 match the state of `end_collection` passed to `diff`. If a change 891 conflicts with a local change, it will be saved to an alternate path 892 indicating the conflict. 893 894 Arguments: 895 896 * changes: arvados.collection.ChangeList --- The list of differences 897 generated by `RichCollectionBase.diff`. 898 """ 899 if changes: 900 self.set_committed(False) 901 for change in changes: 902 event_type = change[0] 903 path = change[1] 904 initial = change[2] 905 local = self.find(path) 906 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 907 time.gmtime())) 908 if event_type == ADD: 909 if local is None: 910 # No local file at path, safe to copy over new file 911 self.copy(initial, path) 912 elif local is not None and local != initial: 913 # There is already local file and it is different: 914 # save change to conflict file. 915 self.copy(initial, conflictpath) 916 elif event_type == MOD or event_type == TOK: 917 final = change[3] 918 if local == initial: 919 # Local matches the "initial" item so it has not 920 # changed locally and is safe to update. 921 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 922 # Replace contents of local file with new contents 923 local.replace_contents(final) 924 else: 925 # Overwrite path with new item; this can happen if 926 # path was a file and is now a collection or vice versa 927 self.copy(final, path, overwrite=True) 928 else: 929 # Local is missing (presumably deleted) or local doesn't 930 # match the "start" value, so save change to conflict file 931 self.copy(final, conflictpath) 932 elif event_type == DEL: 933 if local == initial: 934 # Local item matches "initial" value, so it is safe to remove. 935 self.remove(path, recursive=True) 936 # else, the file is modified or already removed, in either 937 # case we don't want to try to remove it. 938 939 def portable_data_hash(self) -> str: 940 """Get the portable data hash for this collection's manifest""" 941 if self._manifest_locator and self.committed(): 942 # If the collection is already saved on the API server, and it's committed 943 # then return API server's PDH response. 944 return self._portable_data_hash 945 else: 946 stripped = self.portable_manifest_text().encode() 947 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 948 949 @synchronized 950 def subscribe(self, callback: ChangeCallback) -> None: 951 """Set a notify callback for changes to this collection 952 953 Arguments: 954 955 * callback: arvados.collection.ChangeCallback --- The callable to 956 call each time the collection is changed. 957 """ 958 if self._callback is None: 959 self._callback = callback 960 else: 961 raise errors.ArgumentError("A callback is already set on this collection.") 962 963 @synchronized 964 def unsubscribe(self) -> None: 965 """Remove any notify callback set for changes to this collection""" 966 if self._callback is not None: 967 self._callback = None 968 969 @synchronized 970 def notify( 971 self, 972 event: ChangeType, 973 collection: 'RichCollectionBase', 974 name: str, 975 item: CollectionItem, 976 ) -> None: 977 """Notify any subscribed callback about a change to this collection 978 979 .. ATTENTION:: Internal 980 This method is only meant to be used by other Collection methods. 981 982 If a callback has been registered with `RichCollectionBase.subscribe`, 983 it will be called with information about a change to this collection. 984 Then this notification will be propagated to this collection's root. 985 986 Arguments: 987 988 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 989 the collection. 990 991 * collection: arvados.collection.RichCollectionBase --- The 992 collection that was modified. 993 994 * name: str --- The name of the file or stream within `collection` that 995 was modified. 996 997 * item: arvados.arvfile.ArvadosFile | 998 arvados.collection.Subcollection --- The new contents at `name` 999 within `collection`. 1000 """ 1001 if self._callback: 1002 self._callback(event, collection, name, item) 1003 self.root_collection().notify(event, collection, name, item) 1004 1005 @synchronized 1006 def __eq__(self, other: Any) -> bool: 1007 """Indicate whether this collection object is equal to another""" 1008 if other is self: 1009 return True 1010 if not isinstance(other, RichCollectionBase): 1011 return False 1012 if len(self._items) != len(other): 1013 return False 1014 for k in self._items: 1015 if k not in other: 1016 return False 1017 if self._items[k] != other[k]: 1018 return False 1019 return True 1020 1021 def __ne__(self, other: Any) -> bool: 1022 """Indicate whether this collection object is not equal to another""" 1023 return not self.__eq__(other) 1024 1025 @synchronized 1026 def flush(self) -> None: 1027 """Upload any pending data to Keep""" 1028 for e in listvalues(self): 1029 e.flush() 1030 1031 1032class Collection(RichCollectionBase): 1033 """Read and manipulate an Arvados collection 1034 1035 This class provides a high-level interface to create, read, and update 1036 Arvados collections and their contents. Refer to the Arvados Python SDK 1037 cookbook for [an introduction to using the Collection class][cookbook]. 1038 1039 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1040 """ 1041 1042 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1043 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1044 keep_client: Optional['arvados.keep.KeepClient']=None, 1045 num_retries: int=10, 1046 parent: Optional['Collection']=None, 1047 apiconfig: Optional[Mapping[str, str]]=None, 1048 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1049 replication_desired: Optional[int]=None, 1050 storage_classes_desired: Optional[List[str]]=None, 1051 put_threads: Optional[int]=None): 1052 """Initialize a Collection object 1053 1054 Arguments: 1055 1056 * manifest_locator_or_text: str | None --- This string can contain a 1057 collection manifest text, portable data hash, or UUID. When given a 1058 portable data hash or UUID, this instance will load a collection 1059 record from the API server. Otherwise, this instance will represent a 1060 new collection without an API server record. The default value `None` 1061 instantiates a new collection with an empty manifest. 1062 1063 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1064 Arvados API client object this instance uses to make requests. If 1065 none is given, this instance creates its own client using the 1066 settings from `apiconfig` (see below). If your client instantiates 1067 many Collection objects, you can help limit memory utilization by 1068 calling `arvados.api.api` to construct an 1069 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1070 for every Collection. 1071 1072 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1073 object this instance uses to make requests. If none is given, this 1074 instance creates its own client using its `api_client`. 1075 1076 * num_retries: int --- The number of times that client requests are 1077 retried. Default 10. 1078 1079 * parent: arvados.collection.Collection | None --- The parent Collection 1080 object of this instance, if any. This argument is primarily used by 1081 other Collection methods; user client code shouldn't need to use it. 1082 1083 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1084 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1085 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1086 Collection object constructs one from these settings. If no 1087 mapping is provided, calls `arvados.config.settings` to get these 1088 parameters from user configuration. 1089 1090 * block_manager: arvados.arvfile._BlockManager | None --- The 1091 _BlockManager object used by this instance to coordinate reading 1092 and writing Keep data blocks. If none is given, this instance 1093 constructs its own. This argument is primarily used by other 1094 Collection methods; user client code shouldn't need to use it. 1095 1096 * replication_desired: int | None --- This controls both the value of 1097 the `replication_desired` field on API collection records saved by 1098 this class, as well as the number of Keep services that the object 1099 writes new data blocks to. If none is given, uses the default value 1100 configured for the cluster. 1101 1102 * storage_classes_desired: list[str] | None --- This controls both 1103 the value of the `storage_classes_desired` field on API collection 1104 records saved by this class, as well as selecting which specific 1105 Keep services the object writes new data blocks to. If none is 1106 given, defaults to an empty list. 1107 1108 * put_threads: int | None --- The number of threads to run 1109 simultaneously to upload data blocks to Keep. This value is used when 1110 building a new `block_manager`. It is unused when a `block_manager` 1111 is provided. 1112 """ 1113 1114 if storage_classes_desired and type(storage_classes_desired) is not list: 1115 raise errors.ArgumentError("storage_classes_desired must be list type.") 1116 1117 super(Collection, self).__init__(parent) 1118 self._api_client = api_client 1119 self._keep_client = keep_client 1120 1121 # Use the keep client from ThreadSafeApiCache 1122 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1123 self._keep_client = self._api_client.keep 1124 1125 self._block_manager = block_manager 1126 self.replication_desired = replication_desired 1127 self._storage_classes_desired = storage_classes_desired 1128 self.put_threads = put_threads 1129 1130 if apiconfig: 1131 self._config = apiconfig 1132 else: 1133 self._config = config.settings() 1134 1135 self.num_retries = num_retries 1136 self._manifest_locator = None 1137 self._manifest_text = None 1138 self._portable_data_hash = None 1139 self._api_response = None 1140 self._past_versions = set() 1141 1142 self.lock = threading.RLock() 1143 self.events = None 1144 1145 if manifest_locator_or_text: 1146 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1147 self._manifest_locator = manifest_locator_or_text 1148 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1149 self._manifest_locator = manifest_locator_or_text 1150 if not self._has_local_collection_uuid(): 1151 self._has_remote_blocks = True 1152 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1153 self._manifest_text = manifest_locator_or_text 1154 if '+R' in self._manifest_text: 1155 self._has_remote_blocks = True 1156 else: 1157 raise errors.ArgumentError( 1158 "Argument to CollectionReader is not a manifest or a collection UUID") 1159 1160 try: 1161 self._populate() 1162 except errors.SyntaxError as e: 1163 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1164 1165 def storage_classes_desired(self) -> List[str]: 1166 """Get this collection's `storage_classes_desired` value""" 1167 return self._storage_classes_desired or [] 1168 1169 def root_collection(self) -> 'Collection': 1170 return self 1171 1172 def get_properties(self) -> Properties: 1173 """Get this collection's properties 1174 1175 This method always returns a dict. If this collection object does not 1176 have an associated API record, or that record does not have any 1177 properties set, this method returns an empty dict. 1178 """ 1179 if self._api_response and self._api_response["properties"]: 1180 return self._api_response["properties"] 1181 else: 1182 return {} 1183 1184 def get_trash_at(self) -> Optional[datetime.datetime]: 1185 """Get this collection's `trash_at` field 1186 1187 This method parses the `trash_at` field of the collection's API 1188 record and returns a datetime from it. If that field is not set, or 1189 this collection object does not have an associated API record, 1190 returns None. 1191 """ 1192 if self._api_response and self._api_response["trash_at"]: 1193 try: 1194 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1195 except ValueError: 1196 return None 1197 else: 1198 return None 1199 1200 def stream_name(self) -> str: 1201 return "." 1202 1203 def writable(self) -> bool: 1204 return True 1205 1206 @synchronized 1207 def known_past_version( 1208 self, 1209 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1210 ) -> bool: 1211 """Indicate whether an API record for this collection has been seen before 1212 1213 As this collection object loads records from the API server, it records 1214 their `modified_at` and `portable_data_hash` fields. This method accepts 1215 a 2-tuple with values for those fields, and returns `True` if the 1216 combination was previously loaded. 1217 """ 1218 return modified_at_and_portable_data_hash in self._past_versions 1219 1220 @synchronized 1221 @retry_method 1222 def update( 1223 self, 1224 other: Optional['Collection']=None, 1225 num_retries: Optional[int]=None, 1226 ) -> None: 1227 """Merge another collection's contents into this one 1228 1229 This method compares the manifest of this collection instance with 1230 another, then updates this instance's manifest with changes from the 1231 other, renaming files to flag conflicts where necessary. 1232 1233 When called without any arguments, this method reloads the collection's 1234 API record, and updates this instance with any changes that have 1235 appeared server-side. If this instance does not have a corresponding 1236 API record, this method raises `arvados.errors.ArgumentError`. 1237 1238 Arguments: 1239 1240 * other: arvados.collection.Collection | None --- The collection 1241 whose contents should be merged into this instance. When not 1242 provided, this method reloads this collection's API record and 1243 constructs a Collection object from it. If this instance does not 1244 have a corresponding API record, this method raises 1245 `arvados.errors.ArgumentError`. 1246 1247 * num_retries: int | None --- The number of times to retry reloading 1248 the collection's API record from the API server. If not specified, 1249 uses the `num_retries` provided when this instance was constructed. 1250 """ 1251 if other is None: 1252 if self._manifest_locator is None: 1253 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1254 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1255 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1256 response.get("portable_data_hash") != self.portable_data_hash()): 1257 # The record on the server is different from our current one, but we've seen it before, 1258 # so ignore it because it's already been merged. 1259 # However, if it's the same as our current record, proceed with the update, because we want to update 1260 # our tokens. 1261 return 1262 else: 1263 self._remember_api_response(response) 1264 other = CollectionReader(response["manifest_text"]) 1265 baseline = CollectionReader(self._manifest_text) 1266 self.apply(baseline.diff(other)) 1267 self._manifest_text = self.manifest_text() 1268 1269 @synchronized 1270 def _my_api(self): 1271 if self._api_client is None: 1272 self._api_client = ThreadSafeApiCache(self._config, version='v1') 1273 if self._keep_client is None: 1274 self._keep_client = self._api_client.keep 1275 return self._api_client 1276 1277 @synchronized 1278 def _my_keep(self): 1279 if self._keep_client is None: 1280 if self._api_client is None: 1281 self._my_api() 1282 else: 1283 self._keep_client = KeepClient(api_client=self._api_client) 1284 return self._keep_client 1285 1286 @synchronized 1287 def _my_block_manager(self): 1288 if self._block_manager is None: 1289 copies = (self.replication_desired or 1290 self._my_api()._rootDesc.get('defaultCollectionReplication', 1291 2)) 1292 self._block_manager = _BlockManager(self._my_keep(), 1293 copies=copies, 1294 put_threads=self.put_threads, 1295 num_retries=self.num_retries, 1296 storage_classes_func=self.storage_classes_desired) 1297 return self._block_manager 1298 1299 def _remember_api_response(self, response): 1300 self._api_response = response 1301 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1302 1303 def _populate_from_api_server(self): 1304 # As in KeepClient itself, we must wait until the last 1305 # possible moment to instantiate an API client, in order to 1306 # avoid tripping up clients that don't have access to an API 1307 # server. If we do build one, make sure our Keep client uses 1308 # it. If instantiation fails, we'll fall back to the except 1309 # clause, just like any other Collection lookup 1310 # failure. Return an exception, or None if successful. 1311 self._remember_api_response(self._my_api().collections().get( 1312 uuid=self._manifest_locator).execute( 1313 num_retries=self.num_retries)) 1314 self._manifest_text = self._api_response['manifest_text'] 1315 self._portable_data_hash = self._api_response['portable_data_hash'] 1316 # If not overriden via kwargs, we should try to load the 1317 # replication_desired and storage_classes_desired from the API server 1318 if self.replication_desired is None: 1319 self.replication_desired = self._api_response.get('replication_desired', None) 1320 if self._storage_classes_desired is None: 1321 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1322 1323 def _populate(self): 1324 if self._manifest_text is None: 1325 if self._manifest_locator is None: 1326 return 1327 else: 1328 self._populate_from_api_server() 1329 self._baseline_manifest = self._manifest_text 1330 self._import_manifest(self._manifest_text) 1331 1332 def _has_collection_uuid(self): 1333 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1334 1335 def _has_local_collection_uuid(self): 1336 return self._has_collection_uuid and \ 1337 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1338 1339 def __enter__(self): 1340 return self 1341 1342 def __exit__(self, exc_type, exc_value, traceback): 1343 """Exit a context with this collection instance 1344 1345 If no exception was raised inside the context block, and this 1346 collection is writable and has a corresponding API record, that 1347 record will be updated to match the state of this instance at the end 1348 of the block. 1349 """ 1350 if exc_type is None: 1351 if self.writable() and self._has_collection_uuid(): 1352 self.save() 1353 self.stop_threads() 1354 1355 def stop_threads(self) -> None: 1356 """Stop background Keep upload/download threads""" 1357 if self._block_manager is not None: 1358 self._block_manager.stop_threads() 1359 1360 @synchronized 1361 def manifest_locator(self) -> Optional[str]: 1362 """Get this collection's manifest locator, if any 1363 1364 * If this collection instance is associated with an API record with a 1365 UUID, return that. 1366 * Otherwise, if this collection instance was loaded from an API record 1367 by portable data hash, return that. 1368 * Otherwise, return `None`. 1369 """ 1370 return self._manifest_locator 1371 1372 @synchronized 1373 def clone( 1374 self, 1375 new_parent: Optional['Collection']=None, 1376 new_name: Optional[str]=None, 1377 readonly: bool=False, 1378 new_config: Optional[Mapping[str, str]]=None, 1379 ) -> 'Collection': 1380 """Create a Collection object with the same contents as this instance 1381 1382 This method creates a new Collection object with contents that match 1383 this instance's. The new collection will not be associated with any API 1384 record. 1385 1386 Arguments: 1387 1388 * new_parent: arvados.collection.Collection | None --- This value is 1389 passed to the new Collection's constructor as the `parent` 1390 argument. 1391 1392 * new_name: str | None --- This value is unused. 1393 1394 * readonly: bool --- If this value is true, this method constructs and 1395 returns a `CollectionReader`. Otherwise, it returns a mutable 1396 `Collection`. Default `False`. 1397 1398 * new_config: Mapping[str, str] | None --- This value is passed to the 1399 new Collection's constructor as `apiconfig`. If no value is provided, 1400 defaults to the configuration passed to this instance's constructor. 1401 """ 1402 if new_config is None: 1403 new_config = self._config 1404 if readonly: 1405 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1406 else: 1407 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1408 1409 newcollection._clonefrom(self) 1410 return newcollection 1411 1412 @synchronized 1413 def api_response(self) -> Optional[Dict[str, Any]]: 1414 """Get this instance's associated API record 1415 1416 If this Collection instance has an associated API record, return it. 1417 Otherwise, return `None`. 1418 """ 1419 return self._api_response 1420 1421 def find_or_create( 1422 self, 1423 path: str, 1424 create_type: CreateType, 1425 ) -> CollectionItem: 1426 if path == ".": 1427 return self 1428 else: 1429 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1430 1431 def find(self, path: str) -> CollectionItem: 1432 if path == ".": 1433 return self 1434 else: 1435 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1436 1437 def remove(self, path: str, recursive: bool=False) -> None: 1438 if path == ".": 1439 raise errors.ArgumentError("Cannot remove '.'") 1440 else: 1441 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1442 1443 @must_be_writable 1444 @synchronized 1445 @retry_method 1446 def save( 1447 self, 1448 properties: Optional[Properties]=None, 1449 storage_classes: Optional[StorageClasses]=None, 1450 trash_at: Optional[datetime.datetime]=None, 1451 merge: bool=True, 1452 num_retries: Optional[int]=None, 1453 preserve_version: bool=False, 1454 ) -> str: 1455 """Save collection to an existing API record 1456 1457 This method updates the instance's corresponding API record to match 1458 the instance's state. If this instance does not have a corresponding API 1459 record yet, raises `AssertionError`. (To create a new API record, use 1460 `Collection.save_new`.) This method returns the saved collection 1461 manifest. 1462 1463 Arguments: 1464 1465 * properties: dict[str, Any] | None --- If provided, the API record will 1466 be updated with these properties. Note this will completely replace 1467 any existing properties. 1468 1469 * storage_classes: list[str] | None --- If provided, the API record will 1470 be updated with this value in the `storage_classes_desired` field. 1471 This value will also be saved on the instance and used for any 1472 changes that follow. 1473 1474 * trash_at: datetime.datetime | None --- If provided, the API record 1475 will be updated with this value in the `trash_at` field. 1476 1477 * merge: bool --- If `True` (the default), this method will first 1478 reload this collection's API record, and merge any new contents into 1479 this instance before saving changes. See `Collection.update` for 1480 details. 1481 1482 * num_retries: int | None --- The number of times to retry reloading 1483 the collection's API record from the API server. If not specified, 1484 uses the `num_retries` provided when this instance was constructed. 1485 1486 * preserve_version: bool --- This value will be passed to directly 1487 to the underlying API call. If `True`, the Arvados API will 1488 preserve the versions of this collection both immediately before 1489 and after the update. If `True` when the API server is not 1490 configured with collection versioning, this method raises 1491 `arvados.errors.ArgumentError`. 1492 """ 1493 if properties and type(properties) is not dict: 1494 raise errors.ArgumentError("properties must be dictionary type.") 1495 1496 if storage_classes and type(storage_classes) is not list: 1497 raise errors.ArgumentError("storage_classes must be list type.") 1498 if storage_classes: 1499 self._storage_classes_desired = storage_classes 1500 1501 if trash_at and type(trash_at) is not datetime.datetime: 1502 raise errors.ArgumentError("trash_at must be datetime type.") 1503 1504 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1505 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1506 1507 body={} 1508 if properties: 1509 body["properties"] = properties 1510 if self.storage_classes_desired(): 1511 body["storage_classes_desired"] = self.storage_classes_desired() 1512 if trash_at: 1513 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1514 body["trash_at"] = t 1515 if preserve_version: 1516 body["preserve_version"] = preserve_version 1517 1518 if not self.committed(): 1519 if self._has_remote_blocks: 1520 # Copy any remote blocks to the local cluster. 1521 self._copy_remote_blocks(remote_blocks={}) 1522 self._has_remote_blocks = False 1523 if not self._has_collection_uuid(): 1524 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1525 elif not self._has_local_collection_uuid(): 1526 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1527 1528 self._my_block_manager().commit_all() 1529 1530 if merge: 1531 self.update() 1532 1533 text = self.manifest_text(strip=False) 1534 body['manifest_text'] = text 1535 1536 self._remember_api_response(self._my_api().collections().update( 1537 uuid=self._manifest_locator, 1538 body=body 1539 ).execute(num_retries=num_retries)) 1540 self._manifest_text = self._api_response["manifest_text"] 1541 self._portable_data_hash = self._api_response["portable_data_hash"] 1542 self.set_committed(True) 1543 elif body: 1544 self._remember_api_response(self._my_api().collections().update( 1545 uuid=self._manifest_locator, 1546 body=body 1547 ).execute(num_retries=num_retries)) 1548 1549 return self._manifest_text 1550 1551 1552 @must_be_writable 1553 @synchronized 1554 @retry_method 1555 def save_new( 1556 self, 1557 name: Optional[str]=None, 1558 create_collection_record: bool=True, 1559 owner_uuid: Optional[str]=None, 1560 properties: Optional[Properties]=None, 1561 storage_classes: Optional[StorageClasses]=None, 1562 trash_at: Optional[datetime.datetime]=None, 1563 ensure_unique_name: bool=False, 1564 num_retries: Optional[int]=None, 1565 preserve_version: bool=False, 1566 ): 1567 """Save collection to a new API record 1568 1569 This method finishes uploading new data blocks and (optionally) 1570 creates a new API collection record with the provided data. If a new 1571 record is created, this instance becomes associated with that record 1572 for future updates like `save()`. This method returns the saved 1573 collection manifest. 1574 1575 Arguments: 1576 1577 * name: str | None --- The `name` field to use on the new collection 1578 record. If not specified, a generic default name is generated. 1579 1580 * create_collection_record: bool --- If `True` (the default), creates a 1581 collection record on the API server. If `False`, the method finishes 1582 all data uploads and only returns the resulting collection manifest 1583 without sending it to the API server. 1584 1585 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1586 new collection record. 1587 1588 * properties: dict[str, Any] | None --- The `properties` field to use on 1589 the new collection record. 1590 1591 * storage_classes: list[str] | None --- The 1592 `storage_classes_desired` field to use on the new collection record. 1593 1594 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1595 on the new collection record. 1596 1597 * ensure_unique_name: bool --- This value is passed directly to the 1598 Arvados API when creating the collection record. If `True`, the API 1599 server may modify the submitted `name` to ensure the collection's 1600 `name`+`owner_uuid` combination is unique. If `False` (the default), 1601 if a collection already exists with this same `name`+`owner_uuid` 1602 combination, creating a collection record will raise a validation 1603 error. 1604 1605 * num_retries: int | None --- The number of times to retry reloading 1606 the collection's API record from the API server. If not specified, 1607 uses the `num_retries` provided when this instance was constructed. 1608 1609 * preserve_version: bool --- This value will be passed to directly 1610 to the underlying API call. If `True`, the Arvados API will 1611 preserve the versions of this collection both immediately before 1612 and after the update. If `True` when the API server is not 1613 configured with collection versioning, this method raises 1614 `arvados.errors.ArgumentError`. 1615 """ 1616 if properties and type(properties) is not dict: 1617 raise errors.ArgumentError("properties must be dictionary type.") 1618 1619 if storage_classes and type(storage_classes) is not list: 1620 raise errors.ArgumentError("storage_classes must be list type.") 1621 1622 if trash_at and type(trash_at) is not datetime.datetime: 1623 raise errors.ArgumentError("trash_at must be datetime type.") 1624 1625 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1626 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1627 1628 if self._has_remote_blocks: 1629 # Copy any remote blocks to the local cluster. 1630 self._copy_remote_blocks(remote_blocks={}) 1631 self._has_remote_blocks = False 1632 1633 if storage_classes: 1634 self._storage_classes_desired = storage_classes 1635 1636 self._my_block_manager().commit_all() 1637 text = self.manifest_text(strip=False) 1638 1639 if create_collection_record: 1640 if name is None: 1641 name = "New collection" 1642 ensure_unique_name = True 1643 1644 body = {"manifest_text": text, 1645 "name": name, 1646 "replication_desired": self.replication_desired} 1647 if owner_uuid: 1648 body["owner_uuid"] = owner_uuid 1649 if properties: 1650 body["properties"] = properties 1651 if self.storage_classes_desired(): 1652 body["storage_classes_desired"] = self.storage_classes_desired() 1653 if trash_at: 1654 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1655 body["trash_at"] = t 1656 if preserve_version: 1657 body["preserve_version"] = preserve_version 1658 1659 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1660 text = self._api_response["manifest_text"] 1661 1662 self._manifest_locator = self._api_response["uuid"] 1663 self._portable_data_hash = self._api_response["portable_data_hash"] 1664 1665 self._manifest_text = text 1666 self.set_committed(True) 1667 1668 return text 1669 1670 _token_re = re.compile(r'(\S+)(\s+|$)') 1671 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1672 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1673 1674 def _unescape_manifest_path(self, path): 1675 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1676 1677 @synchronized 1678 def _import_manifest(self, manifest_text): 1679 """Import a manifest into a `Collection`. 1680 1681 :manifest_text: 1682 The manifest text to import from. 1683 1684 """ 1685 if len(self) > 0: 1686 raise ArgumentError("Can only import manifest into an empty collection") 1687 1688 STREAM_NAME = 0 1689 BLOCKS = 1 1690 SEGMENTS = 2 1691 1692 stream_name = None 1693 state = STREAM_NAME 1694 1695 for token_and_separator in self._token_re.finditer(manifest_text): 1696 tok = token_and_separator.group(1) 1697 sep = token_and_separator.group(2) 1698 1699 if state == STREAM_NAME: 1700 # starting a new stream 1701 stream_name = self._unescape_manifest_path(tok) 1702 blocks = [] 1703 segments = [] 1704 streamoffset = 0 1705 state = BLOCKS 1706 self.find_or_create(stream_name, COLLECTION) 1707 continue 1708 1709 if state == BLOCKS: 1710 block_locator = self._block_re.match(tok) 1711 if block_locator: 1712 blocksize = int(block_locator.group(1)) 1713 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1714 streamoffset += blocksize 1715 else: 1716 state = SEGMENTS 1717 1718 if state == SEGMENTS: 1719 file_segment = self._segment_re.match(tok) 1720 if file_segment: 1721 pos = int(file_segment.group(1)) 1722 size = int(file_segment.group(2)) 1723 name = self._unescape_manifest_path(file_segment.group(3)) 1724 if name.split('/')[-1] == '.': 1725 # placeholder for persisting an empty directory, not a real file 1726 if len(name) > 2: 1727 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1728 else: 1729 filepath = os.path.join(stream_name, name) 1730 try: 1731 afile = self.find_or_create(filepath, FILE) 1732 except IOError as e: 1733 if e.errno == errno.ENOTDIR: 1734 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1735 else: 1736 raise e from None 1737 if isinstance(afile, ArvadosFile): 1738 afile.add_segment(blocks, pos, size) 1739 else: 1740 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1741 else: 1742 # error! 1743 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1744 1745 if sep == "\n": 1746 stream_name = None 1747 state = STREAM_NAME 1748 1749 self.set_committed(True) 1750 1751 @synchronized 1752 def notify( 1753 self, 1754 event: ChangeType, 1755 collection: 'RichCollectionBase', 1756 name: str, 1757 item: CollectionItem, 1758 ) -> None: 1759 if self._callback: 1760 self._callback(event, collection, name, item) 1761 1762 1763class Subcollection(RichCollectionBase): 1764 """Read and manipulate a stream/directory within an Arvados collection 1765 1766 This class represents a single stream (like a directory) within an Arvados 1767 `Collection`. It is returned by `Collection.find` and provides the same API. 1768 Operations that work on the API collection record propagate to the parent 1769 `Collection` object. 1770 """ 1771 1772 def __init__(self, parent, name): 1773 super(Subcollection, self).__init__(parent) 1774 self.lock = self.root_collection().lock 1775 self._manifest_text = None 1776 self.name = name 1777 self.num_retries = parent.num_retries 1778 1779 def root_collection(self) -> 'Collection': 1780 return self.parent.root_collection() 1781 1782 def writable(self) -> bool: 1783 return self.root_collection().writable() 1784 1785 def _my_api(self): 1786 return self.root_collection()._my_api() 1787 1788 def _my_keep(self): 1789 return self.root_collection()._my_keep() 1790 1791 def _my_block_manager(self): 1792 return self.root_collection()._my_block_manager() 1793 1794 def stream_name(self) -> str: 1795 return os.path.join(self.parent.stream_name(), self.name) 1796 1797 @synchronized 1798 def clone( 1799 self, 1800 new_parent: Optional['Collection']=None, 1801 new_name: Optional[str]=None, 1802 ) -> 'Subcollection': 1803 c = Subcollection(new_parent, new_name) 1804 c._clonefrom(self) 1805 return c 1806 1807 @must_be_writable 1808 @synchronized 1809 def _reparent(self, newparent, newname): 1810 self.set_committed(False) 1811 self.flush() 1812 self.parent.remove(self.name, recursive=True) 1813 self.parent = newparent 1814 self.name = newname 1815 self.lock = self.parent.root_collection().lock 1816 1817 @synchronized 1818 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1819 """Encode empty directories by using an \056-named (".") empty file""" 1820 if len(self._items) == 0: 1821 return "%s %s 0:0:\\056\n" % ( 1822 escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1823 return super(Subcollection, self)._get_manifest_text(stream_name, 1824 strip, normalize, 1825 only_committed) 1826 1827 1828class CollectionReader(Collection): 1829 """Read-only `Collection` subclass 1830 1831 This class will never create or update any API collection records. You can 1832 use this class for additional code safety when you only need to read 1833 existing collections. 1834 """ 1835 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1836 self._in_init = True 1837 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1838 self._in_init = False 1839 1840 # Forego any locking since it should never change once initialized. 1841 self.lock = NoopLock() 1842 1843 # Backwards compatability with old CollectionReader 1844 # all_streams() and all_files() 1845 self._streams = None 1846 1847 def writable(self) -> bool: 1848 return self._in_init 1849 1850 def _populate_streams(orig_func): 1851 @functools.wraps(orig_func) 1852 def populate_streams_wrapper(self, *args, **kwargs): 1853 # Defer populating self._streams until needed since it creates a copy of the manifest. 1854 if self._streams is None: 1855 if self._manifest_text: 1856 self._streams = [sline.split() 1857 for sline in self._manifest_text.split("\n") 1858 if sline] 1859 else: 1860 self._streams = [] 1861 return orig_func(self, *args, **kwargs) 1862 return populate_streams_wrapper 1863 1864 @arvados.util._deprecated('3.0', 'Collection iteration') 1865 @_populate_streams 1866 def normalize(self): 1867 """Normalize the streams returned by `all_streams`""" 1868 streams = {} 1869 for s in self.all_streams(): 1870 for f in s.all_files(): 1871 streamname, filename = split(s.name() + "/" + f.name()) 1872 if streamname not in streams: 1873 streams[streamname] = {} 1874 if filename not in streams[streamname]: 1875 streams[streamname][filename] = [] 1876 for r in f.segments: 1877 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1878 1879 self._streams = [normalize_stream(s, streams[s]) 1880 for s in sorted(streams)] 1881 1882 @arvados.util._deprecated('3.0', 'Collection iteration') 1883 @_populate_streams 1884 def all_streams(self): 1885 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1886 for s in self._streams] 1887 1888 @arvados.util._deprecated('3.0', 'Collection iteration') 1889 @_populate_streams 1890 def all_files(self): 1891 for s in self.all_streams(): 1892 for f in s.all_files(): 1893 yield f 1894 1895 1896class CollectionWriter(CollectionBase): 1897 """Create a new collection from scratch 1898 1899 .. WARNING:: Deprecated 1900 This class is deprecated. Prefer `arvados.collection.Collection` 1901 instead. 1902 """ 1903 1904 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 1905 def __init__(self, api_client=None, num_retries=0, replication=None): 1906 """Instantiate a CollectionWriter. 1907 1908 CollectionWriter lets you build a new Arvados Collection from scratch. 1909 Write files to it. The CollectionWriter will upload data to Keep as 1910 appropriate, and provide you with the Collection manifest text when 1911 you're finished. 1912 1913 Arguments: 1914 * api_client: The API client to use to look up Collections. If not 1915 provided, CollectionReader will build one from available Arvados 1916 configuration. 1917 * num_retries: The default number of times to retry failed 1918 service requests. Default 0. You may change this value 1919 after instantiation, but note those changes may not 1920 propagate to related objects like the Keep client. 1921 * replication: The number of copies of each block to store. 1922 If this argument is None or not supplied, replication is 1923 the server-provided default if available, otherwise 2. 1924 """ 1925 self._api_client = api_client 1926 self.num_retries = num_retries 1927 self.replication = (2 if replication is None else replication) 1928 self._keep_client = None 1929 self._data_buffer = [] 1930 self._data_buffer_len = 0 1931 self._current_stream_files = [] 1932 self._current_stream_length = 0 1933 self._current_stream_locators = [] 1934 self._current_stream_name = '.' 1935 self._current_file_name = None 1936 self._current_file_pos = 0 1937 self._finished_streams = [] 1938 self._close_file = None 1939 self._queued_file = None 1940 self._queued_dirents = deque() 1941 self._queued_trees = deque() 1942 self._last_open = None 1943 1944 def __exit__(self, exc_type, exc_value, traceback): 1945 if exc_type is None: 1946 self.finish() 1947 1948 def do_queued_work(self): 1949 # The work queue consists of three pieces: 1950 # * _queued_file: The file object we're currently writing to the 1951 # Collection. 1952 # * _queued_dirents: Entries under the current directory 1953 # (_queued_trees[0]) that we want to write or recurse through. 1954 # This may contain files from subdirectories if 1955 # max_manifest_depth == 0 for this directory. 1956 # * _queued_trees: Directories that should be written as separate 1957 # streams to the Collection. 1958 # This function handles the smallest piece of work currently queued 1959 # (current file, then current directory, then next directory) until 1960 # no work remains. The _work_THING methods each do a unit of work on 1961 # THING. _queue_THING methods add a THING to the work queue. 1962 while True: 1963 if self._queued_file: 1964 self._work_file() 1965 elif self._queued_dirents: 1966 self._work_dirents() 1967 elif self._queued_trees: 1968 self._work_trees() 1969 else: 1970 break 1971 1972 def _work_file(self): 1973 while True: 1974 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) 1975 if not buf: 1976 break 1977 self.write(buf) 1978 self.finish_current_file() 1979 if self._close_file: 1980 self._queued_file.close() 1981 self._close_file = None 1982 self._queued_file = None 1983 1984 def _work_dirents(self): 1985 path, stream_name, max_manifest_depth = self._queued_trees[0] 1986 if stream_name != self.current_stream_name(): 1987 self.start_new_stream(stream_name) 1988 while self._queued_dirents: 1989 dirent = self._queued_dirents.popleft() 1990 target = os.path.join(path, dirent) 1991 if os.path.isdir(target): 1992 self._queue_tree(target, 1993 os.path.join(stream_name, dirent), 1994 max_manifest_depth - 1) 1995 else: 1996 self._queue_file(target, dirent) 1997 break 1998 if not self._queued_dirents: 1999 self._queued_trees.popleft() 2000 2001 def _work_trees(self): 2002 path, stream_name, max_manifest_depth = self._queued_trees[0] 2003 d = arvados.util.listdir_recursive( 2004 path, max_depth = (None if max_manifest_depth == 0 else 0)) 2005 if d: 2006 self._queue_dirents(stream_name, d) 2007 else: 2008 self._queued_trees.popleft() 2009 2010 def _queue_file(self, source, filename=None): 2011 assert (self._queued_file is None), "tried to queue more than one file" 2012 if not hasattr(source, 'read'): 2013 source = open(source, 'rb') 2014 self._close_file = True 2015 else: 2016 self._close_file = False 2017 if filename is None: 2018 filename = os.path.basename(source.name) 2019 self.start_new_file(filename) 2020 self._queued_file = source 2021 2022 def _queue_dirents(self, stream_name, dirents): 2023 assert (not self._queued_dirents), "tried to queue more than one tree" 2024 self._queued_dirents = deque(sorted(dirents)) 2025 2026 def _queue_tree(self, path, stream_name, max_manifest_depth): 2027 self._queued_trees.append((path, stream_name, max_manifest_depth)) 2028 2029 def write_file(self, source, filename=None): 2030 self._queue_file(source, filename) 2031 self.do_queued_work() 2032 2033 def write_directory_tree(self, 2034 path, stream_name='.', max_manifest_depth=-1): 2035 self._queue_tree(path, stream_name, max_manifest_depth) 2036 self.do_queued_work() 2037 2038 def write(self, newdata): 2039 if isinstance(newdata, bytes): 2040 pass 2041 elif isinstance(newdata, str): 2042 newdata = newdata.encode() 2043 elif hasattr(newdata, '__iter__'): 2044 for s in newdata: 2045 self.write(s) 2046 return 2047 self._data_buffer.append(newdata) 2048 self._data_buffer_len += len(newdata) 2049 self._current_stream_length += len(newdata) 2050 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 2051 self.flush_data() 2052 2053 def open(self, streampath, filename=None): 2054 """open(streampath[, filename]) -> file-like object 2055 2056 Pass in the path of a file to write to the Collection, either as a 2057 single string or as two separate stream name and file name arguments. 2058 This method returns a file-like object you can write to add it to the 2059 Collection. 2060 2061 You may only have one file object from the Collection open at a time, 2062 so be sure to close the object when you're done. Using the object in 2063 a with statement makes that easy: 2064 2065 with cwriter.open('./doc/page1.txt') as outfile: 2066 outfile.write(page1_data) 2067 with cwriter.open('./doc/page2.txt') as outfile: 2068 outfile.write(page2_data) 2069 """ 2070 if filename is None: 2071 streampath, filename = split(streampath) 2072 if self._last_open and not self._last_open.closed: 2073 raise errors.AssertionError( 2074 u"can't open '{}' when '{}' is still open".format( 2075 filename, self._last_open.name)) 2076 if streampath != self.current_stream_name(): 2077 self.start_new_stream(streampath) 2078 self.set_current_file_name(filename) 2079 self._last_open = _WriterFile(self, filename) 2080 return self._last_open 2081 2082 def flush_data(self): 2083 data_buffer = b''.join(self._data_buffer) 2084 if data_buffer: 2085 self._current_stream_locators.append( 2086 self._my_keep().put( 2087 data_buffer[0:config.KEEP_BLOCK_SIZE], 2088 copies=self.replication)) 2089 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 2090 self._data_buffer_len = len(self._data_buffer[0]) 2091 2092 def start_new_file(self, newfilename=None): 2093 self.finish_current_file() 2094 self.set_current_file_name(newfilename) 2095 2096 def set_current_file_name(self, newfilename): 2097 if re.search(r'[\t\n]', newfilename): 2098 raise errors.AssertionError( 2099 "Manifest filenames cannot contain whitespace: %s" % 2100 newfilename) 2101 elif re.search(r'\x00', newfilename): 2102 raise errors.AssertionError( 2103 "Manifest filenames cannot contain NUL characters: %s" % 2104 newfilename) 2105 self._current_file_name = newfilename 2106 2107 def current_file_name(self): 2108 return self._current_file_name 2109 2110 def finish_current_file(self): 2111 if self._current_file_name is None: 2112 if self._current_file_pos == self._current_stream_length: 2113 return 2114 raise errors.AssertionError( 2115 "Cannot finish an unnamed file " + 2116 "(%d bytes at offset %d in '%s' stream)" % 2117 (self._current_stream_length - self._current_file_pos, 2118 self._current_file_pos, 2119 self._current_stream_name)) 2120 self._current_stream_files.append([ 2121 self._current_file_pos, 2122 self._current_stream_length - self._current_file_pos, 2123 self._current_file_name]) 2124 self._current_file_pos = self._current_stream_length 2125 self._current_file_name = None 2126 2127 def start_new_stream(self, newstreamname='.'): 2128 self.finish_current_stream() 2129 self.set_current_stream_name(newstreamname) 2130 2131 def set_current_stream_name(self, newstreamname): 2132 if re.search(r'[\t\n]', newstreamname): 2133 raise errors.AssertionError( 2134 "Manifest stream names cannot contain whitespace: '%s'" % 2135 (newstreamname)) 2136 self._current_stream_name = '.' if newstreamname=='' else newstreamname 2137 2138 def current_stream_name(self): 2139 return self._current_stream_name 2140 2141 def finish_current_stream(self): 2142 self.finish_current_file() 2143 self.flush_data() 2144 if not self._current_stream_files: 2145 pass 2146 elif self._current_stream_name is None: 2147 raise errors.AssertionError( 2148 "Cannot finish an unnamed stream (%d bytes in %d files)" % 2149 (self._current_stream_length, len(self._current_stream_files))) 2150 else: 2151 if not self._current_stream_locators: 2152 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 2153 self._finished_streams.append([self._current_stream_name, 2154 self._current_stream_locators, 2155 self._current_stream_files]) 2156 self._current_stream_files = [] 2157 self._current_stream_length = 0 2158 self._current_stream_locators = [] 2159 self._current_stream_name = None 2160 self._current_file_pos = 0 2161 self._current_file_name = None 2162 2163 def finish(self): 2164 """Store the manifest in Keep and return its locator. 2165 2166 This is useful for storing manifest fragments (task outputs) 2167 temporarily in Keep during a Crunch job. 2168 2169 In other cases you should make a collection instead, by 2170 sending manifest_text() to the API server's "create 2171 collection" endpoint. 2172 """ 2173 return self._my_keep().put(self.manifest_text().encode(), 2174 copies=self.replication) 2175 2176 def portable_data_hash(self): 2177 stripped = self.stripped_manifest().encode() 2178 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 2179 2180 def manifest_text(self): 2181 self.finish_current_stream() 2182 manifest = '' 2183 2184 for stream in self._finished_streams: 2185 if not re.search(r'^\.(/.*)?$', stream[0]): 2186 manifest += './' 2187 manifest += stream[0].replace(' ', '\\040') 2188 manifest += ' ' + ' '.join(stream[1]) 2189 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 2190 manifest += "\n" 2191 2192 return manifest 2193 2194 def data_locators(self): 2195 ret = [] 2196 for name, locators, files in self._finished_streams: 2197 ret += locators 2198 return ret 2199 2200 def save_new(self, name=None): 2201 return self._api_client.collections().create( 2202 ensure_unique_name=True, 2203 body={ 2204 'name': name, 2205 'manifest_text': self.manifest_text(), 2206 }).execute(num_retries=self.num_retries) 2207 2208 2209class ResumableCollectionWriter(CollectionWriter): 2210 """CollectionWriter that can serialize internal state to disk 2211 2212 .. WARNING:: Deprecated 2213 This class is deprecated. Prefer `arvados.collection.Collection` 2214 instead. 2215 """ 2216 2217 STATE_PROPS = ['_current_stream_files', '_current_stream_length', 2218 '_current_stream_locators', '_current_stream_name', 2219 '_current_file_name', '_current_file_pos', '_close_file', 2220 '_data_buffer', '_dependencies', '_finished_streams', 2221 '_queued_dirents', '_queued_trees'] 2222 2223 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 2224 def __init__(self, api_client=None, **kwargs): 2225 self._dependencies = {} 2226 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) 2227 2228 @classmethod 2229 def from_state(cls, state, *init_args, **init_kwargs): 2230 # Try to build a new writer from scratch with the given state. 2231 # If the state is not suitable to resume (because files have changed, 2232 # been deleted, aren't predictable, etc.), raise a 2233 # StaleWriterStateError. Otherwise, return the initialized writer. 2234 # The caller is responsible for calling writer.do_queued_work() 2235 # appropriately after it's returned. 2236 writer = cls(*init_args, **init_kwargs) 2237 for attr_name in cls.STATE_PROPS: 2238 attr_value = state[attr_name] 2239 attr_class = getattr(writer, attr_name).__class__ 2240 # Coerce the value into the same type as the initial value, if 2241 # needed. 2242 if attr_class not in (type(None), attr_value.__class__): 2243 attr_value = attr_class(attr_value) 2244 setattr(writer, attr_name, attr_value) 2245 # Check dependencies before we try to resume anything. 2246 if any(KeepLocator(ls).permission_expired() 2247 for ls in writer._current_stream_locators): 2248 raise errors.StaleWriterStateError( 2249 "locators include expired permission hint") 2250 writer.check_dependencies() 2251 if state['_current_file'] is not None: 2252 path, pos = state['_current_file'] 2253 try: 2254 writer._queued_file = open(path, 'rb') 2255 writer._queued_file.seek(pos) 2256 except IOError as error: 2257 raise errors.StaleWriterStateError( 2258 u"failed to reopen active file {}: {}".format(path, error)) 2259 return writer 2260 2261 def check_dependencies(self): 2262 for path, orig_stat in listitems(self._dependencies): 2263 if not S_ISREG(orig_stat[ST_MODE]): 2264 raise errors.StaleWriterStateError(u"{} not file".format(path)) 2265 try: 2266 now_stat = tuple(os.stat(path)) 2267 except OSError as error: 2268 raise errors.StaleWriterStateError( 2269 u"failed to stat {}: {}".format(path, error)) 2270 if ((not S_ISREG(now_stat[ST_MODE])) or 2271 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 2272 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 2273 raise errors.StaleWriterStateError(u"{} changed".format(path)) 2274 2275 def dump_state(self, copy_func=lambda x: x): 2276 state = {attr: copy_func(getattr(self, attr)) 2277 for attr in self.STATE_PROPS} 2278 if self._queued_file is None: 2279 state['_current_file'] = None 2280 else: 2281 state['_current_file'] = (os.path.realpath(self._queued_file.name), 2282 self._queued_file.tell()) 2283 return state 2284 2285 def _queue_file(self, source, filename=None): 2286 try: 2287 src_path = os.path.realpath(source) 2288 except Exception: 2289 raise errors.AssertionError(u"{} not a file path".format(source)) 2290 try: 2291 path_stat = os.stat(src_path) 2292 except OSError as stat_error: 2293 path_stat = None 2294 super(ResumableCollectionWriter, self)._queue_file(source, filename) 2295 fd_stat = os.fstat(self._queued_file.fileno()) 2296 if not S_ISREG(fd_stat.st_mode): 2297 # We won't be able to resume from this cache anyway, so don't 2298 # worry about further checks. 2299 self._dependencies[source] = tuple(fd_stat) 2300 elif path_stat is None: 2301 raise errors.AssertionError( 2302 u"could not stat {}: {}".format(source, stat_error)) 2303 elif path_stat.st_ino != fd_stat.st_ino: 2304 raise errors.AssertionError( 2305 u"{} changed between open and stat calls".format(source)) 2306 else: 2307 self._dependencies[src_path] = tuple(fd_stat) 2308 2309 def write(self, data): 2310 if self._queued_file is None: 2311 raise errors.AssertionError( 2312 "resumable writer can't accept unsourced data") 2313 return super(ResumableCollectionWriter, self).write(data)
Argument value for Collection
methods to represent an added item
Argument value for Collection
methods to represent a removed item
Argument value for Collection
methods to represent a modified item
Argument value for Collection
methods to represent an item with token differences
create_type
value for Collection.find_or_create
create_type
value for Collection.find_or_create
93class CollectionBase(object): 94 """Abstract base class for Collection classes 95 96 .. ATTENTION:: Internal 97 This class is meant to be used by other parts of the SDK. User code 98 should instantiate or subclass `Collection` or one of its subclasses 99 directly. 100 """ 101 102 def __enter__(self): 103 """Enter a context block with this collection instance""" 104 return self 105 106 def __exit__(self, exc_type, exc_value, traceback): 107 """Exit a context block with this collection instance""" 108 pass 109 110 def _my_keep(self): 111 if self._keep_client is None: 112 self._keep_client = KeepClient(api_client=self._api_client, 113 num_retries=self.num_retries) 114 return self._keep_client 115 116 def stripped_manifest(self) -> str: 117 """Create a copy of the collection manifest with only size hints 118 119 This method returns a string with the current collection's manifest 120 text with all non-portable locator hints like permission hints and 121 remote cluster hints removed. The only hints in the returned manifest 122 will be size hints. 123 """ 124 raw = self.manifest_text() 125 clean = [] 126 for line in raw.split("\n"): 127 fields = line.split() 128 if fields: 129 clean_fields = fields[:1] + [ 130 (re.sub(r'\+[^\d][^\+]*', '', x) 131 if re.match(arvados.util.keep_locator_pattern, x) 132 else x) 133 for x in fields[1:]] 134 clean += [' '.join(clean_fields), "\n"] 135 return ''.join(clean)
Abstract base class for Collection classes
116 def stripped_manifest(self) -> str: 117 """Create a copy of the collection manifest with only size hints 118 119 This method returns a string with the current collection's manifest 120 text with all non-portable locator hints like permission hints and 121 remote cluster hints removed. The only hints in the returned manifest 122 will be size hints. 123 """ 124 raw = self.manifest_text() 125 clean = [] 126 for line in raw.split("\n"): 127 fields = line.split() 128 if fields: 129 clean_fields = fields[:1] + [ 130 (re.sub(r'\+[^\d][^\+]*', '', x) 131 if re.match(arvados.util.keep_locator_pattern, x) 132 else x) 133 for x in fields[1:]] 134 clean += [' '.join(clean_fields), "\n"] 135 return ''.join(clean)
Create a copy of the collection manifest with only size hints
This method returns a string with the current collection’s manifest text with all non-portable locator hints like permission hints and remote cluster hints removed. The only hints in the returned manifest will be size hints.
161class RichCollectionBase(CollectionBase): 162 """Base class for Collection classes 163 164 .. ATTENTION:: Internal 165 This class is meant to be used by other parts of the SDK. User code 166 should instantiate or subclass `Collection` or one of its subclasses 167 directly. 168 """ 169 170 def __init__(self, parent=None): 171 self.parent = parent 172 self._committed = False 173 self._has_remote_blocks = False 174 self._callback = None 175 self._items = {} 176 177 def _my_api(self): 178 raise NotImplementedError() 179 180 def _my_keep(self): 181 raise NotImplementedError() 182 183 def _my_block_manager(self): 184 raise NotImplementedError() 185 186 def writable(self) -> bool: 187 """Indicate whether this collection object can be modified 188 189 This method returns `False` if this object is a `CollectionReader`, 190 else `True`. 191 """ 192 raise NotImplementedError() 193 194 def root_collection(self) -> 'Collection': 195 """Get this collection's root collection object 196 197 If you open a subcollection with `Collection.find`, calling this method 198 on that subcollection returns the source Collection object. 199 """ 200 raise NotImplementedError() 201 202 def stream_name(self) -> str: 203 """Get the name of the manifest stream represented by this collection 204 205 If you open a subcollection with `Collection.find`, calling this method 206 on that subcollection returns the name of the stream you opened. 207 """ 208 raise NotImplementedError() 209 210 @synchronized 211 def has_remote_blocks(self) -> bool: 212 """Indiciate whether the collection refers to remote data 213 214 Returns `True` if the collection manifest includes any Keep locators 215 with a remote hint (`+R`), else `False`. 216 """ 217 if self._has_remote_blocks: 218 return True 219 for item in self: 220 if self[item].has_remote_blocks(): 221 return True 222 return False 223 224 @synchronized 225 def set_has_remote_blocks(self, val: bool) -> None: 226 """Cache whether this collection refers to remote blocks 227 228 .. ATTENTION:: Internal 229 This method is only meant to be used by other Collection methods. 230 231 Set this collection's cached "has remote blocks" flag to the given 232 value. 233 """ 234 self._has_remote_blocks = val 235 if self.parent: 236 self.parent.set_has_remote_blocks(val) 237 238 @must_be_writable 239 @synchronized 240 def find_or_create( 241 self, 242 path: str, 243 create_type: CreateType, 244 ) -> CollectionItem: 245 """Get the item at the given path, creating it if necessary 246 247 If `path` refers to a stream in this collection, returns a 248 corresponding `Subcollection` object. If `path` refers to a file in 249 this collection, returns a corresponding 250 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 251 this collection, then this method creates a new object and returns 252 it, creating parent streams as needed. The type of object created is 253 determined by the value of `create_type`. 254 255 Arguments: 256 257 * path: str --- The path to find or create within this collection. 258 259 * create_type: Literal[COLLECTION, FILE] --- The type of object to 260 create at `path` if one does not exist. Passing `COLLECTION` 261 creates a stream and returns the corresponding 262 `Subcollection`. Passing `FILE` creates a new file and returns the 263 corresponding `arvados.arvfile.ArvadosFile`. 264 """ 265 pathcomponents = path.split("/", 1) 266 if pathcomponents[0]: 267 item = self._items.get(pathcomponents[0]) 268 if len(pathcomponents) == 1: 269 if item is None: 270 # create new file 271 if create_type == COLLECTION: 272 item = Subcollection(self, pathcomponents[0]) 273 else: 274 item = ArvadosFile(self, pathcomponents[0]) 275 self._items[pathcomponents[0]] = item 276 self.set_committed(False) 277 self.notify(ADD, self, pathcomponents[0], item) 278 return item 279 else: 280 if item is None: 281 # create new collection 282 item = Subcollection(self, pathcomponents[0]) 283 self._items[pathcomponents[0]] = item 284 self.set_committed(False) 285 self.notify(ADD, self, pathcomponents[0], item) 286 if isinstance(item, RichCollectionBase): 287 return item.find_or_create(pathcomponents[1], create_type) 288 else: 289 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 290 else: 291 return self 292 293 @synchronized 294 def find(self, path: str) -> CollectionItem: 295 """Get the item at the given path 296 297 If `path` refers to a stream in this collection, returns a 298 corresponding `Subcollection` object. If `path` refers to a file in 299 this collection, returns a corresponding 300 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 301 this collection, then this method raises `NotADirectoryError`. 302 303 Arguments: 304 305 * path: str --- The path to find or create within this collection. 306 """ 307 if not path: 308 raise errors.ArgumentError("Parameter 'path' is empty.") 309 310 pathcomponents = path.split("/", 1) 311 if pathcomponents[0] == '': 312 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 313 314 item = self._items.get(pathcomponents[0]) 315 if item is None: 316 return None 317 elif len(pathcomponents) == 1: 318 return item 319 else: 320 if isinstance(item, RichCollectionBase): 321 if pathcomponents[1]: 322 return item.find(pathcomponents[1]) 323 else: 324 return item 325 else: 326 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 327 328 @synchronized 329 def mkdirs(self, path: str) -> 'Subcollection': 330 """Create and return a subcollection at `path` 331 332 If `path` exists within this collection, raises `FileExistsError`. 333 Otherwise, creates a stream at that path and returns the 334 corresponding `Subcollection`. 335 """ 336 if self.find(path) != None: 337 raise IOError(errno.EEXIST, "Directory or file exists", path) 338 339 return self.find_or_create(path, COLLECTION) 340 341 def open( 342 self, 343 path: str, 344 mode: str="r", 345 encoding: Optional[str]=None 346 ) -> IO: 347 """Open a file-like object within the collection 348 349 This method returns a file-like object that can read and/or write the 350 file located at `path` within the collection. If you attempt to write 351 a `path` that does not exist, the file is created with `find_or_create`. 352 If the file cannot be opened for any other reason, this method raises 353 `OSError` with an appropriate errno. 354 355 Arguments: 356 357 * path: str --- The path of the file to open within this collection 358 359 * mode: str --- The mode to open this file. Supports all the same 360 values as `builtins.open`. 361 362 * encoding: str | None --- The text encoding of the file. Only used 363 when the file is opened in text mode. The default is 364 platform-dependent. 365 366 """ 367 if not re.search(r'^[rwa][bt]?\+?$', mode): 368 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 369 370 if mode[0] == 'r' and '+' not in mode: 371 fclass = ArvadosFileReader 372 arvfile = self.find(path) 373 elif not self.writable(): 374 raise IOError(errno.EROFS, "Collection is read only") 375 else: 376 fclass = ArvadosFileWriter 377 arvfile = self.find_or_create(path, FILE) 378 379 if arvfile is None: 380 raise IOError(errno.ENOENT, "File not found", path) 381 if not isinstance(arvfile, ArvadosFile): 382 raise IOError(errno.EISDIR, "Is a directory", path) 383 384 if mode[0] == 'w': 385 arvfile.truncate(0) 386 387 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 388 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 389 if 'b' not in mode: 390 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 391 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 392 return f 393 394 def modified(self) -> bool: 395 """Indicate whether this collection has an API server record 396 397 Returns `False` if this collection corresponds to a record loaded from 398 the API server, `True` otherwise. 399 """ 400 return not self.committed() 401 402 @synchronized 403 def committed(self): 404 """Indicate whether this collection has an API server record 405 406 Returns `True` if this collection corresponds to a record loaded from 407 the API server, `False` otherwise. 408 """ 409 return self._committed 410 411 @synchronized 412 def set_committed(self, value: bool=True): 413 """Cache whether this collection has an API server record 414 415 .. ATTENTION:: Internal 416 This method is only meant to be used by other Collection methods. 417 418 Set this collection's cached "committed" flag to the given 419 value and propagates it as needed. 420 """ 421 if value == self._committed: 422 return 423 if value: 424 for k,v in listitems(self._items): 425 v.set_committed(True) 426 self._committed = True 427 else: 428 self._committed = False 429 if self.parent is not None: 430 self.parent.set_committed(False) 431 432 @synchronized 433 def __iter__(self) -> Iterator[str]: 434 """Iterate names of streams and files in this collection 435 436 This method does not recurse. It only iterates the contents of this 437 collection's corresponding stream. 438 """ 439 return iter(viewkeys(self._items)) 440 441 @synchronized 442 def __getitem__(self, k: str) -> CollectionItem: 443 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection 444 445 This method does not recurse. If you want to search a path, use 446 `RichCollectionBase.find` instead. 447 """ 448 return self._items[k] 449 450 @synchronized 451 def __contains__(self, k: str) -> bool: 452 """Indicate whether this collection has an item with this name 453 454 This method does not recurse. It you want to check a path, use 455 `RichCollectionBase.exists` instead. 456 """ 457 return k in self._items 458 459 @synchronized 460 def __len__(self): 461 """Get the number of items directly contained in this collection 462 463 This method does not recurse. It only counts the streams and files 464 in this collection's corresponding stream. 465 """ 466 return len(self._items) 467 468 @must_be_writable 469 @synchronized 470 def __delitem__(self, p: str) -> None: 471 """Delete an item from this collection's stream 472 473 This method does not recurse. If you want to remove an item by a 474 path, use `RichCollectionBase.remove` instead. 475 """ 476 del self._items[p] 477 self.set_committed(False) 478 self.notify(DEL, self, p, None) 479 480 @synchronized 481 def keys(self) -> Iterator[str]: 482 """Iterate names of streams and files in this collection 483 484 This method does not recurse. It only iterates the contents of this 485 collection's corresponding stream. 486 """ 487 return self._items.keys() 488 489 @synchronized 490 def values(self) -> List[CollectionItem]: 491 """Get a list of objects in this collection's stream 492 493 The return value includes a `Subcollection` for every stream, and an 494 `arvados.arvfile.ArvadosFile` for every file, directly within this 495 collection's stream. This method does not recurse. 496 """ 497 return listvalues(self._items) 498 499 @synchronized 500 def items(self) -> List[Tuple[str, CollectionItem]]: 501 """Get a list of `(name, object)` tuples from this collection's stream 502 503 The return value includes a `Subcollection` for every stream, and an 504 `arvados.arvfile.ArvadosFile` for every file, directly within this 505 collection's stream. This method does not recurse. 506 """ 507 return listitems(self._items) 508 509 def exists(self, path: str) -> bool: 510 """Indicate whether this collection includes an item at `path` 511 512 This method returns `True` if `path` refers to a stream or file within 513 this collection, else `False`. 514 515 Arguments: 516 517 * path: str --- The path to check for existence within this collection 518 """ 519 return self.find(path) is not None 520 521 @must_be_writable 522 @synchronized 523 def remove(self, path: str, recursive: bool=False) -> None: 524 """Remove the file or stream at `path` 525 526 Arguments: 527 528 * path: str --- The path of the item to remove from the collection 529 530 * recursive: bool --- Controls the method's behavior if `path` refers 531 to a nonempty stream. If `False` (the default), this method raises 532 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 533 items under the stream. 534 """ 535 if not path: 536 raise errors.ArgumentError("Parameter 'path' is empty.") 537 538 pathcomponents = path.split("/", 1) 539 item = self._items.get(pathcomponents[0]) 540 if item is None: 541 raise IOError(errno.ENOENT, "File not found", path) 542 if len(pathcomponents) == 1: 543 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 544 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 545 deleteditem = self._items[pathcomponents[0]] 546 del self._items[pathcomponents[0]] 547 self.set_committed(False) 548 self.notify(DEL, self, pathcomponents[0], deleteditem) 549 else: 550 item.remove(pathcomponents[1], recursive=recursive) 551 552 def _clonefrom(self, source): 553 for k,v in listitems(source): 554 self._items[k] = v.clone(self, k) 555 556 def clone(self): 557 raise NotImplementedError() 558 559 @must_be_writable 560 @synchronized 561 def add( 562 self, 563 source_obj: CollectionItem, 564 target_name: str, 565 overwrite: bool=False, 566 reparent: bool=False, 567 ) -> None: 568 """Copy or move a file or subcollection object to this collection 569 570 Arguments: 571 572 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 573 to add to this collection 574 575 * target_name: str --- The path inside this collection where 576 `source_obj` should be added. 577 578 * overwrite: bool --- Controls the behavior of this method when the 579 collection already contains an object at `target_name`. If `False` 580 (the default), this method will raise `FileExistsError`. If `True`, 581 the object at `target_name` will be replaced with `source_obj`. 582 583 * reparent: bool --- Controls whether this method copies or moves 584 `source_obj`. If `False` (the default), `source_obj` is copied into 585 this collection. If `True`, `source_obj` is moved into this 586 collection. 587 """ 588 if target_name in self and not overwrite: 589 raise IOError(errno.EEXIST, "File already exists", target_name) 590 591 modified_from = None 592 if target_name in self: 593 modified_from = self[target_name] 594 595 # Actually make the move or copy. 596 if reparent: 597 source_obj._reparent(self, target_name) 598 item = source_obj 599 else: 600 item = source_obj.clone(self, target_name) 601 602 self._items[target_name] = item 603 self.set_committed(False) 604 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 605 self.set_has_remote_blocks(True) 606 607 if modified_from: 608 self.notify(MOD, self, target_name, (modified_from, item)) 609 else: 610 self.notify(ADD, self, target_name, item) 611 612 def _get_src_target(self, source, target_path, source_collection, create_dest): 613 if source_collection is None: 614 source_collection = self 615 616 # Find the object 617 if isinstance(source, basestring): 618 source_obj = source_collection.find(source) 619 if source_obj is None: 620 raise IOError(errno.ENOENT, "File not found", source) 621 sourcecomponents = source.split("/") 622 else: 623 source_obj = source 624 sourcecomponents = None 625 626 # Find parent collection the target path 627 targetcomponents = target_path.split("/") 628 629 # Determine the name to use. 630 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 631 632 if not target_name: 633 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 634 635 if create_dest: 636 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 637 else: 638 if len(targetcomponents) > 1: 639 target_dir = self.find("/".join(targetcomponents[0:-1])) 640 else: 641 target_dir = self 642 643 if target_dir is None: 644 raise IOError(errno.ENOENT, "Target directory not found", target_name) 645 646 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 647 target_dir = target_dir[target_name] 648 target_name = sourcecomponents[-1] 649 650 return (source_obj, target_dir, target_name) 651 652 @must_be_writable 653 @synchronized 654 def copy( 655 self, 656 source: Union[str, CollectionItem], 657 target_path: str, 658 source_collection: Optional['RichCollectionBase']=None, 659 overwrite: bool=False, 660 ) -> None: 661 """Copy a file or subcollection object to this collection 662 663 Arguments: 664 665 * source: str | arvados.arvfile.ArvadosFile | 666 arvados.collection.Subcollection --- The file or subcollection to 667 add to this collection. If `source` is a str, the object will be 668 found by looking up this path from `source_collection` (see 669 below). 670 671 * target_path: str --- The path inside this collection where the 672 source object should be added. 673 674 * source_collection: arvados.collection.Collection | None --- The 675 collection to find the source object from when `source` is a 676 path. Defaults to the current collection (`self`). 677 678 * overwrite: bool --- Controls the behavior of this method when the 679 collection already contains an object at `target_path`. If `False` 680 (the default), this method will raise `FileExistsError`. If `True`, 681 the object at `target_path` will be replaced with `source_obj`. 682 """ 683 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 684 target_dir.add(source_obj, target_name, overwrite, False) 685 686 @must_be_writable 687 @synchronized 688 def rename( 689 self, 690 source: Union[str, CollectionItem], 691 target_path: str, 692 source_collection: Optional['RichCollectionBase']=None, 693 overwrite: bool=False, 694 ) -> None: 695 """Move a file or subcollection object to this collection 696 697 Arguments: 698 699 * source: str | arvados.arvfile.ArvadosFile | 700 arvados.collection.Subcollection --- The file or subcollection to 701 add to this collection. If `source` is a str, the object will be 702 found by looking up this path from `source_collection` (see 703 below). 704 705 * target_path: str --- The path inside this collection where the 706 source object should be added. 707 708 * source_collection: arvados.collection.Collection | None --- The 709 collection to find the source object from when `source` is a 710 path. Defaults to the current collection (`self`). 711 712 * overwrite: bool --- Controls the behavior of this method when the 713 collection already contains an object at `target_path`. If `False` 714 (the default), this method will raise `FileExistsError`. If `True`, 715 the object at `target_path` will be replaced with `source_obj`. 716 """ 717 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 718 if not source_obj.writable(): 719 raise IOError(errno.EROFS, "Source collection is read only", source) 720 target_dir.add(source_obj, target_name, overwrite, True) 721 722 def portable_manifest_text(self, stream_name: str=".") -> str: 723 """Get the portable manifest text for this collection 724 725 The portable manifest text is normalized, and does not include access 726 tokens. This method does not flush outstanding blocks to Keep. 727 728 Arguments: 729 730 * stream_name: str --- The name to use for this collection's stream in 731 the generated manifest. Default `'.'`. 732 """ 733 return self._get_manifest_text(stream_name, True, True) 734 735 @synchronized 736 def manifest_text( 737 self, 738 stream_name: str=".", 739 strip: bool=False, 740 normalize: bool=False, 741 only_committed: bool=False, 742 ) -> str: 743 """Get the manifest text for this collection 744 745 Arguments: 746 747 * stream_name: str --- The name to use for this collection's stream in 748 the generated manifest. Default `'.'`. 749 750 * strip: bool --- Controls whether or not the returned manifest text 751 includes access tokens. If `False` (the default), the manifest text 752 will include access tokens. If `True`, the manifest text will not 753 include access tokens. 754 755 * normalize: bool --- Controls whether or not the returned manifest 756 text is normalized. Default `False`. 757 758 * only_committed: bool --- Controls whether or not this method uploads 759 pending data to Keep before building and returning the manifest text. 760 If `False` (the default), this method will finish uploading all data 761 to Keep, then return the final manifest. If `True`, this method will 762 build and return a manifest that only refers to the data that has 763 finished uploading at the time this method was called. 764 """ 765 if not only_committed: 766 self._my_block_manager().commit_all() 767 return self._get_manifest_text(stream_name, strip, normalize, 768 only_committed=only_committed) 769 770 @synchronized 771 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 772 """Get the manifest text for this collection, sub collections and files. 773 774 :stream_name: 775 Name to use for this stream (directory) 776 777 :strip: 778 If True, remove signing tokens from block locators if present. 779 If False (default), block locators are left unchanged. 780 781 :normalize: 782 If True, always export the manifest text in normalized form 783 even if the Collection is not modified. If False (default) and the collection 784 is not modified, return the original manifest text even if it is not 785 in normalized form. 786 787 :only_committed: 788 If True, only include blocks that were already committed to Keep. 789 790 """ 791 792 if not self.committed() or self._manifest_text is None or normalize: 793 stream = {} 794 buf = [] 795 sorted_keys = sorted(self.keys()) 796 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 797 # Create a stream per file `k` 798 arvfile = self[filename] 799 filestream = [] 800 for segment in arvfile.segments(): 801 loc = segment.locator 802 if arvfile.parent._my_block_manager().is_bufferblock(loc): 803 if only_committed: 804 continue 805 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 806 if strip: 807 loc = KeepLocator(loc).stripped() 808 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 809 segment.segment_offset, segment.range_size)) 810 stream[filename] = filestream 811 if stream: 812 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 813 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 814 buf.append(self[dirname].manifest_text( 815 stream_name=os.path.join(stream_name, dirname), 816 strip=strip, normalize=True, only_committed=only_committed)) 817 return "".join(buf) 818 else: 819 if strip: 820 return self.stripped_manifest() 821 else: 822 return self._manifest_text 823 824 @synchronized 825 def _copy_remote_blocks(self, remote_blocks={}): 826 """Scan through the entire collection and ask Keep to copy remote blocks. 827 828 When accessing a remote collection, blocks will have a remote signature 829 (+R instead of +A). Collect these signatures and request Keep to copy the 830 blocks to the local cluster, returning local (+A) signatures. 831 832 :remote_blocks: 833 Shared cache of remote to local block mappings. This is used to avoid 834 doing extra work when blocks are shared by more than one file in 835 different subdirectories. 836 837 """ 838 for item in self: 839 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 840 return remote_blocks 841 842 @synchronized 843 def diff( 844 self, 845 end_collection: 'RichCollectionBase', 846 prefix: str=".", 847 holding_collection: Optional['Collection']=None, 848 ) -> ChangeList: 849 """Build a list of differences between this collection and another 850 851 Arguments: 852 853 * end_collection: arvados.collection.RichCollectionBase --- A 854 collection object with the desired end state. The returned diff 855 list will describe how to go from the current collection object 856 `self` to `end_collection`. 857 858 * prefix: str --- The name to use for this collection's stream in 859 the diff list. Default `'.'`. 860 861 * holding_collection: arvados.collection.Collection | None --- A 862 collection object used to hold objects for the returned diff 863 list. By default, a new empty collection is created. 864 """ 865 changes = [] 866 if holding_collection is None: 867 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 868 for k in self: 869 if k not in end_collection: 870 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 871 for k in end_collection: 872 if k in self: 873 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 874 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 875 elif end_collection[k] != self[k]: 876 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 877 else: 878 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 879 else: 880 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 881 return changes 882 883 @must_be_writable 884 @synchronized 885 def apply(self, changes: ChangeList) -> None: 886 """Apply a list of changes from to this collection 887 888 This method takes a list of changes generated by 889 `RichCollectionBase.diff` and applies it to this 890 collection. Afterward, the state of this collection object will 891 match the state of `end_collection` passed to `diff`. If a change 892 conflicts with a local change, it will be saved to an alternate path 893 indicating the conflict. 894 895 Arguments: 896 897 * changes: arvados.collection.ChangeList --- The list of differences 898 generated by `RichCollectionBase.diff`. 899 """ 900 if changes: 901 self.set_committed(False) 902 for change in changes: 903 event_type = change[0] 904 path = change[1] 905 initial = change[2] 906 local = self.find(path) 907 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 908 time.gmtime())) 909 if event_type == ADD: 910 if local is None: 911 # No local file at path, safe to copy over new file 912 self.copy(initial, path) 913 elif local is not None and local != initial: 914 # There is already local file and it is different: 915 # save change to conflict file. 916 self.copy(initial, conflictpath) 917 elif event_type == MOD or event_type == TOK: 918 final = change[3] 919 if local == initial: 920 # Local matches the "initial" item so it has not 921 # changed locally and is safe to update. 922 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 923 # Replace contents of local file with new contents 924 local.replace_contents(final) 925 else: 926 # Overwrite path with new item; this can happen if 927 # path was a file and is now a collection or vice versa 928 self.copy(final, path, overwrite=True) 929 else: 930 # Local is missing (presumably deleted) or local doesn't 931 # match the "start" value, so save change to conflict file 932 self.copy(final, conflictpath) 933 elif event_type == DEL: 934 if local == initial: 935 # Local item matches "initial" value, so it is safe to remove. 936 self.remove(path, recursive=True) 937 # else, the file is modified or already removed, in either 938 # case we don't want to try to remove it. 939 940 def portable_data_hash(self) -> str: 941 """Get the portable data hash for this collection's manifest""" 942 if self._manifest_locator and self.committed(): 943 # If the collection is already saved on the API server, and it's committed 944 # then return API server's PDH response. 945 return self._portable_data_hash 946 else: 947 stripped = self.portable_manifest_text().encode() 948 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 949 950 @synchronized 951 def subscribe(self, callback: ChangeCallback) -> None: 952 """Set a notify callback for changes to this collection 953 954 Arguments: 955 956 * callback: arvados.collection.ChangeCallback --- The callable to 957 call each time the collection is changed. 958 """ 959 if self._callback is None: 960 self._callback = callback 961 else: 962 raise errors.ArgumentError("A callback is already set on this collection.") 963 964 @synchronized 965 def unsubscribe(self) -> None: 966 """Remove any notify callback set for changes to this collection""" 967 if self._callback is not None: 968 self._callback = None 969 970 @synchronized 971 def notify( 972 self, 973 event: ChangeType, 974 collection: 'RichCollectionBase', 975 name: str, 976 item: CollectionItem, 977 ) -> None: 978 """Notify any subscribed callback about a change to this collection 979 980 .. ATTENTION:: Internal 981 This method is only meant to be used by other Collection methods. 982 983 If a callback has been registered with `RichCollectionBase.subscribe`, 984 it will be called with information about a change to this collection. 985 Then this notification will be propagated to this collection's root. 986 987 Arguments: 988 989 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 990 the collection. 991 992 * collection: arvados.collection.RichCollectionBase --- The 993 collection that was modified. 994 995 * name: str --- The name of the file or stream within `collection` that 996 was modified. 997 998 * item: arvados.arvfile.ArvadosFile | 999 arvados.collection.Subcollection --- The new contents at `name` 1000 within `collection`. 1001 """ 1002 if self._callback: 1003 self._callback(event, collection, name, item) 1004 self.root_collection().notify(event, collection, name, item) 1005 1006 @synchronized 1007 def __eq__(self, other: Any) -> bool: 1008 """Indicate whether this collection object is equal to another""" 1009 if other is self: 1010 return True 1011 if not isinstance(other, RichCollectionBase): 1012 return False 1013 if len(self._items) != len(other): 1014 return False 1015 for k in self._items: 1016 if k not in other: 1017 return False 1018 if self._items[k] != other[k]: 1019 return False 1020 return True 1021 1022 def __ne__(self, other: Any) -> bool: 1023 """Indicate whether this collection object is not equal to another""" 1024 return not self.__eq__(other) 1025 1026 @synchronized 1027 def flush(self) -> None: 1028 """Upload any pending data to Keep""" 1029 for e in listvalues(self): 1030 e.flush()
Base class for Collection classes
186 def writable(self) -> bool: 187 """Indicate whether this collection object can be modified 188 189 This method returns `False` if this object is a `CollectionReader`, 190 else `True`. 191 """ 192 raise NotImplementedError()
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
194 def root_collection(self) -> 'Collection': 195 """Get this collection's root collection object 196 197 If you open a subcollection with `Collection.find`, calling this method 198 on that subcollection returns the source Collection object. 199 """ 200 raise NotImplementedError()
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
202 def stream_name(self) -> str: 203 """Get the name of the manifest stream represented by this collection 204 205 If you open a subcollection with `Collection.find`, calling this method 206 on that subcollection returns the name of the stream you opened. 207 """ 208 raise NotImplementedError()
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
210 @synchronized 211 def has_remote_blocks(self) -> bool: 212 """Indiciate whether the collection refers to remote data 213 214 Returns `True` if the collection manifest includes any Keep locators 215 with a remote hint (`+R`), else `False`. 216 """ 217 if self._has_remote_blocks: 218 return True 219 for item in self: 220 if self[item].has_remote_blocks(): 221 return True 222 return False
Indiciate whether the collection refers to remote data
Returns True
if the collection manifest includes any Keep locators
with a remote hint (+R
), else False
.
224 @synchronized 225 def set_has_remote_blocks(self, val: bool) -> None: 226 """Cache whether this collection refers to remote blocks 227 228 .. ATTENTION:: Internal 229 This method is only meant to be used by other Collection methods. 230 231 Set this collection's cached "has remote blocks" flag to the given 232 value. 233 """ 234 self._has_remote_blocks = val 235 if self.parent: 236 self.parent.set_has_remote_blocks(val)
Cache whether this collection refers to remote blocks
Set this collection’s cached “has remote blocks” flag to the given value.
238 @must_be_writable 239 @synchronized 240 def find_or_create( 241 self, 242 path: str, 243 create_type: CreateType, 244 ) -> CollectionItem: 245 """Get the item at the given path, creating it if necessary 246 247 If `path` refers to a stream in this collection, returns a 248 corresponding `Subcollection` object. If `path` refers to a file in 249 this collection, returns a corresponding 250 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 251 this collection, then this method creates a new object and returns 252 it, creating parent streams as needed. The type of object created is 253 determined by the value of `create_type`. 254 255 Arguments: 256 257 * path: str --- The path to find or create within this collection. 258 259 * create_type: Literal[COLLECTION, FILE] --- The type of object to 260 create at `path` if one does not exist. Passing `COLLECTION` 261 creates a stream and returns the corresponding 262 `Subcollection`. Passing `FILE` creates a new file and returns the 263 corresponding `arvados.arvfile.ArvadosFile`. 264 """ 265 pathcomponents = path.split("/", 1) 266 if pathcomponents[0]: 267 item = self._items.get(pathcomponents[0]) 268 if len(pathcomponents) == 1: 269 if item is None: 270 # create new file 271 if create_type == COLLECTION: 272 item = Subcollection(self, pathcomponents[0]) 273 else: 274 item = ArvadosFile(self, pathcomponents[0]) 275 self._items[pathcomponents[0]] = item 276 self.set_committed(False) 277 self.notify(ADD, self, pathcomponents[0], item) 278 return item 279 else: 280 if item is None: 281 # create new collection 282 item = Subcollection(self, pathcomponents[0]) 283 self._items[pathcomponents[0]] = item 284 self.set_committed(False) 285 self.notify(ADD, self, pathcomponents[0], item) 286 if isinstance(item, RichCollectionBase): 287 return item.find_or_create(pathcomponents[1], create_type) 288 else: 289 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 290 else: 291 return self
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
293 @synchronized 294 def find(self, path: str) -> CollectionItem: 295 """Get the item at the given path 296 297 If `path` refers to a stream in this collection, returns a 298 corresponding `Subcollection` object. If `path` refers to a file in 299 this collection, returns a corresponding 300 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in 301 this collection, then this method raises `NotADirectoryError`. 302 303 Arguments: 304 305 * path: str --- The path to find or create within this collection. 306 """ 307 if not path: 308 raise errors.ArgumentError("Parameter 'path' is empty.") 309 310 pathcomponents = path.split("/", 1) 311 if pathcomponents[0] == '': 312 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 313 314 item = self._items.get(pathcomponents[0]) 315 if item is None: 316 return None 317 elif len(pathcomponents) == 1: 318 return item 319 else: 320 if isinstance(item, RichCollectionBase): 321 if pathcomponents[1]: 322 return item.find(pathcomponents[1]) 323 else: 324 return item 325 else: 326 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
328 @synchronized 329 def mkdirs(self, path: str) -> 'Subcollection': 330 """Create and return a subcollection at `path` 331 332 If `path` exists within this collection, raises `FileExistsError`. 333 Otherwise, creates a stream at that path and returns the 334 corresponding `Subcollection`. 335 """ 336 if self.find(path) != None: 337 raise IOError(errno.EEXIST, "Directory or file exists", path) 338 339 return self.find_or_create(path, COLLECTION)
Create and return a subcollection at path
If path
exists within this collection, raises FileExistsError
.
Otherwise, creates a stream at that path and returns the
corresponding Subcollection
.
341 def open( 342 self, 343 path: str, 344 mode: str="r", 345 encoding: Optional[str]=None 346 ) -> IO: 347 """Open a file-like object within the collection 348 349 This method returns a file-like object that can read and/or write the 350 file located at `path` within the collection. If you attempt to write 351 a `path` that does not exist, the file is created with `find_or_create`. 352 If the file cannot be opened for any other reason, this method raises 353 `OSError` with an appropriate errno. 354 355 Arguments: 356 357 * path: str --- The path of the file to open within this collection 358 359 * mode: str --- The mode to open this file. Supports all the same 360 values as `builtins.open`. 361 362 * encoding: str | None --- The text encoding of the file. Only used 363 when the file is opened in text mode. The default is 364 platform-dependent. 365 366 """ 367 if not re.search(r'^[rwa][bt]?\+?$', mode): 368 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 369 370 if mode[0] == 'r' and '+' not in mode: 371 fclass = ArvadosFileReader 372 arvfile = self.find(path) 373 elif not self.writable(): 374 raise IOError(errno.EROFS, "Collection is read only") 375 else: 376 fclass = ArvadosFileWriter 377 arvfile = self.find_or_create(path, FILE) 378 379 if arvfile is None: 380 raise IOError(errno.ENOENT, "File not found", path) 381 if not isinstance(arvfile, ArvadosFile): 382 raise IOError(errno.EISDIR, "Is a directory", path) 383 384 if mode[0] == 'w': 385 arvfile.truncate(0) 386 387 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 388 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 389 if 'b' not in mode: 390 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 391 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 392 return f
Open a file-like object within the collection
This method returns a file-like object that can read and/or write the
file located at path
within the collection. If you attempt to write
a path
that does not exist, the file is created with find_or_create
.
If the file cannot be opened for any other reason, this method raises
OSError
with an appropriate errno.
Arguments:
path: str — The path of the file to open within this collection
mode: str — The mode to open this file. Supports all the same values as
builtins.open
.encoding: str | None — The text encoding of the file. Only used when the file is opened in text mode. The default is platform-dependent.
394 def modified(self) -> bool: 395 """Indicate whether this collection has an API server record 396 397 Returns `False` if this collection corresponds to a record loaded from 398 the API server, `True` otherwise. 399 """ 400 return not self.committed()
Indicate whether this collection has an API server record
Returns False
if this collection corresponds to a record loaded from
the API server, True
otherwise.
402 @synchronized 403 def committed(self): 404 """Indicate whether this collection has an API server record 405 406 Returns `True` if this collection corresponds to a record loaded from 407 the API server, `False` otherwise. 408 """ 409 return self._committed
Indicate whether this collection has an API server record
Returns True
if this collection corresponds to a record loaded from
the API server, False
otherwise.
411 @synchronized 412 def set_committed(self, value: bool=True): 413 """Cache whether this collection has an API server record 414 415 .. ATTENTION:: Internal 416 This method is only meant to be used by other Collection methods. 417 418 Set this collection's cached "committed" flag to the given 419 value and propagates it as needed. 420 """ 421 if value == self._committed: 422 return 423 if value: 424 for k,v in listitems(self._items): 425 v.set_committed(True) 426 self._committed = True 427 else: 428 self._committed = False 429 if self.parent is not None: 430 self.parent.set_committed(False)
Cache whether this collection has an API server record
Set this collection’s cached “committed” flag to the given value and propagates it as needed.
480 @synchronized 481 def keys(self) -> Iterator[str]: 482 """Iterate names of streams and files in this collection 483 484 This method does not recurse. It only iterates the contents of this 485 collection's corresponding stream. 486 """ 487 return self._items.keys()
Iterate names of streams and files in this collection
This method does not recurse. It only iterates the contents of this collection’s corresponding stream.
489 @synchronized 490 def values(self) -> List[CollectionItem]: 491 """Get a list of objects in this collection's stream 492 493 The return value includes a `Subcollection` for every stream, and an 494 `arvados.arvfile.ArvadosFile` for every file, directly within this 495 collection's stream. This method does not recurse. 496 """ 497 return listvalues(self._items)
Get a list of objects in this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
499 @synchronized 500 def items(self) -> List[Tuple[str, CollectionItem]]: 501 """Get a list of `(name, object)` tuples from this collection's stream 502 503 The return value includes a `Subcollection` for every stream, and an 504 `arvados.arvfile.ArvadosFile` for every file, directly within this 505 collection's stream. This method does not recurse. 506 """ 507 return listitems(self._items)
Get a list of (name, object)
tuples from this collection’s stream
The return value includes a Subcollection
for every stream, and an
arvados.arvfile.ArvadosFile
for every file, directly within this
collection’s stream. This method does not recurse.
509 def exists(self, path: str) -> bool: 510 """Indicate whether this collection includes an item at `path` 511 512 This method returns `True` if `path` refers to a stream or file within 513 this collection, else `False`. 514 515 Arguments: 516 517 * path: str --- The path to check for existence within this collection 518 """ 519 return self.find(path) is not None
Indicate whether this collection includes an item at path
This method returns True
if path
refers to a stream or file within
this collection, else False
.
Arguments:
- path: str — The path to check for existence within this collection
521 @must_be_writable 522 @synchronized 523 def remove(self, path: str, recursive: bool=False) -> None: 524 """Remove the file or stream at `path` 525 526 Arguments: 527 528 * path: str --- The path of the item to remove from the collection 529 530 * recursive: bool --- Controls the method's behavior if `path` refers 531 to a nonempty stream. If `False` (the default), this method raises 532 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all 533 items under the stream. 534 """ 535 if not path: 536 raise errors.ArgumentError("Parameter 'path' is empty.") 537 538 pathcomponents = path.split("/", 1) 539 item = self._items.get(pathcomponents[0]) 540 if item is None: 541 raise IOError(errno.ENOENT, "File not found", path) 542 if len(pathcomponents) == 1: 543 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 544 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 545 deleteditem = self._items[pathcomponents[0]] 546 del self._items[pathcomponents[0]] 547 self.set_committed(False) 548 self.notify(DEL, self, pathcomponents[0], deleteditem) 549 else: 550 item.remove(pathcomponents[1], recursive=recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
559 @must_be_writable 560 @synchronized 561 def add( 562 self, 563 source_obj: CollectionItem, 564 target_name: str, 565 overwrite: bool=False, 566 reparent: bool=False, 567 ) -> None: 568 """Copy or move a file or subcollection object to this collection 569 570 Arguments: 571 572 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection 573 to add to this collection 574 575 * target_name: str --- The path inside this collection where 576 `source_obj` should be added. 577 578 * overwrite: bool --- Controls the behavior of this method when the 579 collection already contains an object at `target_name`. If `False` 580 (the default), this method will raise `FileExistsError`. If `True`, 581 the object at `target_name` will be replaced with `source_obj`. 582 583 * reparent: bool --- Controls whether this method copies or moves 584 `source_obj`. If `False` (the default), `source_obj` is copied into 585 this collection. If `True`, `source_obj` is moved into this 586 collection. 587 """ 588 if target_name in self and not overwrite: 589 raise IOError(errno.EEXIST, "File already exists", target_name) 590 591 modified_from = None 592 if target_name in self: 593 modified_from = self[target_name] 594 595 # Actually make the move or copy. 596 if reparent: 597 source_obj._reparent(self, target_name) 598 item = source_obj 599 else: 600 item = source_obj.clone(self, target_name) 601 602 self._items[target_name] = item 603 self.set_committed(False) 604 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 605 self.set_has_remote_blocks(True) 606 607 if modified_from: 608 self.notify(MOD, self, target_name, (modified_from, item)) 609 else: 610 self.notify(ADD, self, target_name, item)
Copy or move a file or subcollection object to this collection
Arguments:
source_obj: arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection
target_name: str — The path inside this collection where
source_obj
should be added.overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_name
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_name
will be replaced withsource_obj
.reparent: bool — Controls whether this method copies or moves
source_obj
. IfFalse
(the default),source_obj
is copied into this collection. IfTrue
,source_obj
is moved into this collection.
652 @must_be_writable 653 @synchronized 654 def copy( 655 self, 656 source: Union[str, CollectionItem], 657 target_path: str, 658 source_collection: Optional['RichCollectionBase']=None, 659 overwrite: bool=False, 660 ) -> None: 661 """Copy a file or subcollection object to this collection 662 663 Arguments: 664 665 * source: str | arvados.arvfile.ArvadosFile | 666 arvados.collection.Subcollection --- The file or subcollection to 667 add to this collection. If `source` is a str, the object will be 668 found by looking up this path from `source_collection` (see 669 below). 670 671 * target_path: str --- The path inside this collection where the 672 source object should be added. 673 674 * source_collection: arvados.collection.Collection | None --- The 675 collection to find the source object from when `source` is a 676 path. Defaults to the current collection (`self`). 677 678 * overwrite: bool --- Controls the behavior of this method when the 679 collection already contains an object at `target_path`. If `False` 680 (the default), this method will raise `FileExistsError`. If `True`, 681 the object at `target_path` will be replaced with `source_obj`. 682 """ 683 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 684 target_dir.add(source_obj, target_name, overwrite, False)
Copy a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
686 @must_be_writable 687 @synchronized 688 def rename( 689 self, 690 source: Union[str, CollectionItem], 691 target_path: str, 692 source_collection: Optional['RichCollectionBase']=None, 693 overwrite: bool=False, 694 ) -> None: 695 """Move a file or subcollection object to this collection 696 697 Arguments: 698 699 * source: str | arvados.arvfile.ArvadosFile | 700 arvados.collection.Subcollection --- The file or subcollection to 701 add to this collection. If `source` is a str, the object will be 702 found by looking up this path from `source_collection` (see 703 below). 704 705 * target_path: str --- The path inside this collection where the 706 source object should be added. 707 708 * source_collection: arvados.collection.Collection | None --- The 709 collection to find the source object from when `source` is a 710 path. Defaults to the current collection (`self`). 711 712 * overwrite: bool --- Controls the behavior of this method when the 713 collection already contains an object at `target_path`. If `False` 714 (the default), this method will raise `FileExistsError`. If `True`, 715 the object at `target_path` will be replaced with `source_obj`. 716 """ 717 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 718 if not source_obj.writable(): 719 raise IOError(errno.EROFS, "Source collection is read only", source) 720 target_dir.add(source_obj, target_name, overwrite, True)
Move a file or subcollection object to this collection
Arguments:
source: str | arvados.arvfile.ArvadosFile | Subcollection — The file or subcollection to add to this collection. If
source
is a str, the object will be found by looking up this path fromsource_collection
(see below).target_path: str — The path inside this collection where the source object should be added.
source_collection: Collection | None — The collection to find the source object from when
source
is a path. Defaults to the current collection (self
).overwrite: bool — Controls the behavior of this method when the collection already contains an object at
target_path
. IfFalse
(the default), this method will raiseFileExistsError
. IfTrue
, the object attarget_path
will be replaced withsource_obj
.
722 def portable_manifest_text(self, stream_name: str=".") -> str: 723 """Get the portable manifest text for this collection 724 725 The portable manifest text is normalized, and does not include access 726 tokens. This method does not flush outstanding blocks to Keep. 727 728 Arguments: 729 730 * stream_name: str --- The name to use for this collection's stream in 731 the generated manifest. Default `'.'`. 732 """ 733 return self._get_manifest_text(stream_name, True, True)
Get the portable manifest text for this collection
The portable manifest text is normalized, and does not include access tokens. This method does not flush outstanding blocks to Keep.
Arguments:
- stream_name: str — The name to use for this collection’s stream in
the generated manifest. Default
'.'
.
735 @synchronized 736 def manifest_text( 737 self, 738 stream_name: str=".", 739 strip: bool=False, 740 normalize: bool=False, 741 only_committed: bool=False, 742 ) -> str: 743 """Get the manifest text for this collection 744 745 Arguments: 746 747 * stream_name: str --- The name to use for this collection's stream in 748 the generated manifest. Default `'.'`. 749 750 * strip: bool --- Controls whether or not the returned manifest text 751 includes access tokens. If `False` (the default), the manifest text 752 will include access tokens. If `True`, the manifest text will not 753 include access tokens. 754 755 * normalize: bool --- Controls whether or not the returned manifest 756 text is normalized. Default `False`. 757 758 * only_committed: bool --- Controls whether or not this method uploads 759 pending data to Keep before building and returning the manifest text. 760 If `False` (the default), this method will finish uploading all data 761 to Keep, then return the final manifest. If `True`, this method will 762 build and return a manifest that only refers to the data that has 763 finished uploading at the time this method was called. 764 """ 765 if not only_committed: 766 self._my_block_manager().commit_all() 767 return self._get_manifest_text(stream_name, strip, normalize, 768 only_committed=only_committed)
Get the manifest text for this collection
Arguments:
stream_name: str — The name to use for this collection’s stream in the generated manifest. Default
'.'
.strip: bool — Controls whether or not the returned manifest text includes access tokens. If
False
(the default), the manifest text will include access tokens. IfTrue
, the manifest text will not include access tokens.normalize: bool — Controls whether or not the returned manifest text is normalized. Default
False
.only_committed: bool — Controls whether or not this method uploads pending data to Keep before building and returning the manifest text. If
False
(the default), this method will finish uploading all data to Keep, then return the final manifest. IfTrue
, this method will build and return a manifest that only refers to the data that has finished uploading at the time this method was called.
842 @synchronized 843 def diff( 844 self, 845 end_collection: 'RichCollectionBase', 846 prefix: str=".", 847 holding_collection: Optional['Collection']=None, 848 ) -> ChangeList: 849 """Build a list of differences between this collection and another 850 851 Arguments: 852 853 * end_collection: arvados.collection.RichCollectionBase --- A 854 collection object with the desired end state. The returned diff 855 list will describe how to go from the current collection object 856 `self` to `end_collection`. 857 858 * prefix: str --- The name to use for this collection's stream in 859 the diff list. Default `'.'`. 860 861 * holding_collection: arvados.collection.Collection | None --- A 862 collection object used to hold objects for the returned diff 863 list. By default, a new empty collection is created. 864 """ 865 changes = [] 866 if holding_collection is None: 867 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 868 for k in self: 869 if k not in end_collection: 870 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 871 for k in end_collection: 872 if k in self: 873 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 874 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 875 elif end_collection[k] != self[k]: 876 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 877 else: 878 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 879 else: 880 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 881 return changes
Build a list of differences between this collection and another
Arguments:
end_collection: RichCollectionBase — A collection object with the desired end state. The returned diff list will describe how to go from the current collection object
self
toend_collection
.prefix: str — The name to use for this collection’s stream in the diff list. Default
'.'
.holding_collection: Collection | None — A collection object used to hold objects for the returned diff list. By default, a new empty collection is created.
883 @must_be_writable 884 @synchronized 885 def apply(self, changes: ChangeList) -> None: 886 """Apply a list of changes from to this collection 887 888 This method takes a list of changes generated by 889 `RichCollectionBase.diff` and applies it to this 890 collection. Afterward, the state of this collection object will 891 match the state of `end_collection` passed to `diff`. If a change 892 conflicts with a local change, it will be saved to an alternate path 893 indicating the conflict. 894 895 Arguments: 896 897 * changes: arvados.collection.ChangeList --- The list of differences 898 generated by `RichCollectionBase.diff`. 899 """ 900 if changes: 901 self.set_committed(False) 902 for change in changes: 903 event_type = change[0] 904 path = change[1] 905 initial = change[2] 906 local = self.find(path) 907 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 908 time.gmtime())) 909 if event_type == ADD: 910 if local is None: 911 # No local file at path, safe to copy over new file 912 self.copy(initial, path) 913 elif local is not None and local != initial: 914 # There is already local file and it is different: 915 # save change to conflict file. 916 self.copy(initial, conflictpath) 917 elif event_type == MOD or event_type == TOK: 918 final = change[3] 919 if local == initial: 920 # Local matches the "initial" item so it has not 921 # changed locally and is safe to update. 922 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 923 # Replace contents of local file with new contents 924 local.replace_contents(final) 925 else: 926 # Overwrite path with new item; this can happen if 927 # path was a file and is now a collection or vice versa 928 self.copy(final, path, overwrite=True) 929 else: 930 # Local is missing (presumably deleted) or local doesn't 931 # match the "start" value, so save change to conflict file 932 self.copy(final, conflictpath) 933 elif event_type == DEL: 934 if local == initial: 935 # Local item matches "initial" value, so it is safe to remove. 936 self.remove(path, recursive=True) 937 # else, the file is modified or already removed, in either 938 # case we don't want to try to remove it.
Apply a list of changes from to this collection
This method takes a list of changes generated by
RichCollectionBase.diff
and applies it to this
collection. Afterward, the state of this collection object will
match the state of end_collection
passed to diff
. If a change
conflicts with a local change, it will be saved to an alternate path
indicating the conflict.
Arguments:
- changes: arvados.collection.ChangeList — The list of differences
generated by
RichCollectionBase.diff
.
940 def portable_data_hash(self) -> str: 941 """Get the portable data hash for this collection's manifest""" 942 if self._manifest_locator and self.committed(): 943 # If the collection is already saved on the API server, and it's committed 944 # then return API server's PDH response. 945 return self._portable_data_hash 946 else: 947 stripped = self.portable_manifest_text().encode() 948 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
Get the portable data hash for this collection’s manifest
950 @synchronized 951 def subscribe(self, callback: ChangeCallback) -> None: 952 """Set a notify callback for changes to this collection 953 954 Arguments: 955 956 * callback: arvados.collection.ChangeCallback --- The callable to 957 call each time the collection is changed. 958 """ 959 if self._callback is None: 960 self._callback = callback 961 else: 962 raise errors.ArgumentError("A callback is already set on this collection.")
Set a notify callback for changes to this collection
Arguments:
- callback: arvados.collection.ChangeCallback — The callable to call each time the collection is changed.
964 @synchronized 965 def unsubscribe(self) -> None: 966 """Remove any notify callback set for changes to this collection""" 967 if self._callback is not None: 968 self._callback = None
Remove any notify callback set for changes to this collection
970 @synchronized 971 def notify( 972 self, 973 event: ChangeType, 974 collection: 'RichCollectionBase', 975 name: str, 976 item: CollectionItem, 977 ) -> None: 978 """Notify any subscribed callback about a change to this collection 979 980 .. ATTENTION:: Internal 981 This method is only meant to be used by other Collection methods. 982 983 If a callback has been registered with `RichCollectionBase.subscribe`, 984 it will be called with information about a change to this collection. 985 Then this notification will be propagated to this collection's root. 986 987 Arguments: 988 989 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to 990 the collection. 991 992 * collection: arvados.collection.RichCollectionBase --- The 993 collection that was modified. 994 995 * name: str --- The name of the file or stream within `collection` that 996 was modified. 997 998 * item: arvados.arvfile.ArvadosFile | 999 arvados.collection.Subcollection --- The new contents at `name` 1000 within `collection`. 1001 """ 1002 if self._callback: 1003 self._callback(event, collection, name, item) 1004 self.root_collection().notify(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1026 @synchronized 1027 def flush(self) -> None: 1028 """Upload any pending data to Keep""" 1029 for e in listvalues(self): 1030 e.flush()
Upload any pending data to Keep
Inherited Members
1033class Collection(RichCollectionBase): 1034 """Read and manipulate an Arvados collection 1035 1036 This class provides a high-level interface to create, read, and update 1037 Arvados collections and their contents. Refer to the Arvados Python SDK 1038 cookbook for [an introduction to using the Collection class][cookbook]. 1039 1040 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections 1041 """ 1042 1043 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1044 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1045 keep_client: Optional['arvados.keep.KeepClient']=None, 1046 num_retries: int=10, 1047 parent: Optional['Collection']=None, 1048 apiconfig: Optional[Mapping[str, str]]=None, 1049 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1050 replication_desired: Optional[int]=None, 1051 storage_classes_desired: Optional[List[str]]=None, 1052 put_threads: Optional[int]=None): 1053 """Initialize a Collection object 1054 1055 Arguments: 1056 1057 * manifest_locator_or_text: str | None --- This string can contain a 1058 collection manifest text, portable data hash, or UUID. When given a 1059 portable data hash or UUID, this instance will load a collection 1060 record from the API server. Otherwise, this instance will represent a 1061 new collection without an API server record. The default value `None` 1062 instantiates a new collection with an empty manifest. 1063 1064 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1065 Arvados API client object this instance uses to make requests. If 1066 none is given, this instance creates its own client using the 1067 settings from `apiconfig` (see below). If your client instantiates 1068 many Collection objects, you can help limit memory utilization by 1069 calling `arvados.api.api` to construct an 1070 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1071 for every Collection. 1072 1073 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1074 object this instance uses to make requests. If none is given, this 1075 instance creates its own client using its `api_client`. 1076 1077 * num_retries: int --- The number of times that client requests are 1078 retried. Default 10. 1079 1080 * parent: arvados.collection.Collection | None --- The parent Collection 1081 object of this instance, if any. This argument is primarily used by 1082 other Collection methods; user client code shouldn't need to use it. 1083 1084 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1085 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1086 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1087 Collection object constructs one from these settings. If no 1088 mapping is provided, calls `arvados.config.settings` to get these 1089 parameters from user configuration. 1090 1091 * block_manager: arvados.arvfile._BlockManager | None --- The 1092 _BlockManager object used by this instance to coordinate reading 1093 and writing Keep data blocks. If none is given, this instance 1094 constructs its own. This argument is primarily used by other 1095 Collection methods; user client code shouldn't need to use it. 1096 1097 * replication_desired: int | None --- This controls both the value of 1098 the `replication_desired` field on API collection records saved by 1099 this class, as well as the number of Keep services that the object 1100 writes new data blocks to. If none is given, uses the default value 1101 configured for the cluster. 1102 1103 * storage_classes_desired: list[str] | None --- This controls both 1104 the value of the `storage_classes_desired` field on API collection 1105 records saved by this class, as well as selecting which specific 1106 Keep services the object writes new data blocks to. If none is 1107 given, defaults to an empty list. 1108 1109 * put_threads: int | None --- The number of threads to run 1110 simultaneously to upload data blocks to Keep. This value is used when 1111 building a new `block_manager`. It is unused when a `block_manager` 1112 is provided. 1113 """ 1114 1115 if storage_classes_desired and type(storage_classes_desired) is not list: 1116 raise errors.ArgumentError("storage_classes_desired must be list type.") 1117 1118 super(Collection, self).__init__(parent) 1119 self._api_client = api_client 1120 self._keep_client = keep_client 1121 1122 # Use the keep client from ThreadSafeApiCache 1123 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1124 self._keep_client = self._api_client.keep 1125 1126 self._block_manager = block_manager 1127 self.replication_desired = replication_desired 1128 self._storage_classes_desired = storage_classes_desired 1129 self.put_threads = put_threads 1130 1131 if apiconfig: 1132 self._config = apiconfig 1133 else: 1134 self._config = config.settings() 1135 1136 self.num_retries = num_retries 1137 self._manifest_locator = None 1138 self._manifest_text = None 1139 self._portable_data_hash = None 1140 self._api_response = None 1141 self._past_versions = set() 1142 1143 self.lock = threading.RLock() 1144 self.events = None 1145 1146 if manifest_locator_or_text: 1147 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1148 self._manifest_locator = manifest_locator_or_text 1149 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1150 self._manifest_locator = manifest_locator_or_text 1151 if not self._has_local_collection_uuid(): 1152 self._has_remote_blocks = True 1153 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1154 self._manifest_text = manifest_locator_or_text 1155 if '+R' in self._manifest_text: 1156 self._has_remote_blocks = True 1157 else: 1158 raise errors.ArgumentError( 1159 "Argument to CollectionReader is not a manifest or a collection UUID") 1160 1161 try: 1162 self._populate() 1163 except errors.SyntaxError as e: 1164 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None 1165 1166 def storage_classes_desired(self) -> List[str]: 1167 """Get this collection's `storage_classes_desired` value""" 1168 return self._storage_classes_desired or [] 1169 1170 def root_collection(self) -> 'Collection': 1171 return self 1172 1173 def get_properties(self) -> Properties: 1174 """Get this collection's properties 1175 1176 This method always returns a dict. If this collection object does not 1177 have an associated API record, or that record does not have any 1178 properties set, this method returns an empty dict. 1179 """ 1180 if self._api_response and self._api_response["properties"]: 1181 return self._api_response["properties"] 1182 else: 1183 return {} 1184 1185 def get_trash_at(self) -> Optional[datetime.datetime]: 1186 """Get this collection's `trash_at` field 1187 1188 This method parses the `trash_at` field of the collection's API 1189 record and returns a datetime from it. If that field is not set, or 1190 this collection object does not have an associated API record, 1191 returns None. 1192 """ 1193 if self._api_response and self._api_response["trash_at"]: 1194 try: 1195 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1196 except ValueError: 1197 return None 1198 else: 1199 return None 1200 1201 def stream_name(self) -> str: 1202 return "." 1203 1204 def writable(self) -> bool: 1205 return True 1206 1207 @synchronized 1208 def known_past_version( 1209 self, 1210 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1211 ) -> bool: 1212 """Indicate whether an API record for this collection has been seen before 1213 1214 As this collection object loads records from the API server, it records 1215 their `modified_at` and `portable_data_hash` fields. This method accepts 1216 a 2-tuple with values for those fields, and returns `True` if the 1217 combination was previously loaded. 1218 """ 1219 return modified_at_and_portable_data_hash in self._past_versions 1220 1221 @synchronized 1222 @retry_method 1223 def update( 1224 self, 1225 other: Optional['Collection']=None, 1226 num_retries: Optional[int]=None, 1227 ) -> None: 1228 """Merge another collection's contents into this one 1229 1230 This method compares the manifest of this collection instance with 1231 another, then updates this instance's manifest with changes from the 1232 other, renaming files to flag conflicts where necessary. 1233 1234 When called without any arguments, this method reloads the collection's 1235 API record, and updates this instance with any changes that have 1236 appeared server-side. If this instance does not have a corresponding 1237 API record, this method raises `arvados.errors.ArgumentError`. 1238 1239 Arguments: 1240 1241 * other: arvados.collection.Collection | None --- The collection 1242 whose contents should be merged into this instance. When not 1243 provided, this method reloads this collection's API record and 1244 constructs a Collection object from it. If this instance does not 1245 have a corresponding API record, this method raises 1246 `arvados.errors.ArgumentError`. 1247 1248 * num_retries: int | None --- The number of times to retry reloading 1249 the collection's API record from the API server. If not specified, 1250 uses the `num_retries` provided when this instance was constructed. 1251 """ 1252 if other is None: 1253 if self._manifest_locator is None: 1254 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1255 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1256 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1257 response.get("portable_data_hash") != self.portable_data_hash()): 1258 # The record on the server is different from our current one, but we've seen it before, 1259 # so ignore it because it's already been merged. 1260 # However, if it's the same as our current record, proceed with the update, because we want to update 1261 # our tokens. 1262 return 1263 else: 1264 self._remember_api_response(response) 1265 other = CollectionReader(response["manifest_text"]) 1266 baseline = CollectionReader(self._manifest_text) 1267 self.apply(baseline.diff(other)) 1268 self._manifest_text = self.manifest_text() 1269 1270 @synchronized 1271 def _my_api(self): 1272 if self._api_client is None: 1273 self._api_client = ThreadSafeApiCache(self._config, version='v1') 1274 if self._keep_client is None: 1275 self._keep_client = self._api_client.keep 1276 return self._api_client 1277 1278 @synchronized 1279 def _my_keep(self): 1280 if self._keep_client is None: 1281 if self._api_client is None: 1282 self._my_api() 1283 else: 1284 self._keep_client = KeepClient(api_client=self._api_client) 1285 return self._keep_client 1286 1287 @synchronized 1288 def _my_block_manager(self): 1289 if self._block_manager is None: 1290 copies = (self.replication_desired or 1291 self._my_api()._rootDesc.get('defaultCollectionReplication', 1292 2)) 1293 self._block_manager = _BlockManager(self._my_keep(), 1294 copies=copies, 1295 put_threads=self.put_threads, 1296 num_retries=self.num_retries, 1297 storage_classes_func=self.storage_classes_desired) 1298 return self._block_manager 1299 1300 def _remember_api_response(self, response): 1301 self._api_response = response 1302 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1303 1304 def _populate_from_api_server(self): 1305 # As in KeepClient itself, we must wait until the last 1306 # possible moment to instantiate an API client, in order to 1307 # avoid tripping up clients that don't have access to an API 1308 # server. If we do build one, make sure our Keep client uses 1309 # it. If instantiation fails, we'll fall back to the except 1310 # clause, just like any other Collection lookup 1311 # failure. Return an exception, or None if successful. 1312 self._remember_api_response(self._my_api().collections().get( 1313 uuid=self._manifest_locator).execute( 1314 num_retries=self.num_retries)) 1315 self._manifest_text = self._api_response['manifest_text'] 1316 self._portable_data_hash = self._api_response['portable_data_hash'] 1317 # If not overriden via kwargs, we should try to load the 1318 # replication_desired and storage_classes_desired from the API server 1319 if self.replication_desired is None: 1320 self.replication_desired = self._api_response.get('replication_desired', None) 1321 if self._storage_classes_desired is None: 1322 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None) 1323 1324 def _populate(self): 1325 if self._manifest_text is None: 1326 if self._manifest_locator is None: 1327 return 1328 else: 1329 self._populate_from_api_server() 1330 self._baseline_manifest = self._manifest_text 1331 self._import_manifest(self._manifest_text) 1332 1333 def _has_collection_uuid(self): 1334 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator) 1335 1336 def _has_local_collection_uuid(self): 1337 return self._has_collection_uuid and \ 1338 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0] 1339 1340 def __enter__(self): 1341 return self 1342 1343 def __exit__(self, exc_type, exc_value, traceback): 1344 """Exit a context with this collection instance 1345 1346 If no exception was raised inside the context block, and this 1347 collection is writable and has a corresponding API record, that 1348 record will be updated to match the state of this instance at the end 1349 of the block. 1350 """ 1351 if exc_type is None: 1352 if self.writable() and self._has_collection_uuid(): 1353 self.save() 1354 self.stop_threads() 1355 1356 def stop_threads(self) -> None: 1357 """Stop background Keep upload/download threads""" 1358 if self._block_manager is not None: 1359 self._block_manager.stop_threads() 1360 1361 @synchronized 1362 def manifest_locator(self) -> Optional[str]: 1363 """Get this collection's manifest locator, if any 1364 1365 * If this collection instance is associated with an API record with a 1366 UUID, return that. 1367 * Otherwise, if this collection instance was loaded from an API record 1368 by portable data hash, return that. 1369 * Otherwise, return `None`. 1370 """ 1371 return self._manifest_locator 1372 1373 @synchronized 1374 def clone( 1375 self, 1376 new_parent: Optional['Collection']=None, 1377 new_name: Optional[str]=None, 1378 readonly: bool=False, 1379 new_config: Optional[Mapping[str, str]]=None, 1380 ) -> 'Collection': 1381 """Create a Collection object with the same contents as this instance 1382 1383 This method creates a new Collection object with contents that match 1384 this instance's. The new collection will not be associated with any API 1385 record. 1386 1387 Arguments: 1388 1389 * new_parent: arvados.collection.Collection | None --- This value is 1390 passed to the new Collection's constructor as the `parent` 1391 argument. 1392 1393 * new_name: str | None --- This value is unused. 1394 1395 * readonly: bool --- If this value is true, this method constructs and 1396 returns a `CollectionReader`. Otherwise, it returns a mutable 1397 `Collection`. Default `False`. 1398 1399 * new_config: Mapping[str, str] | None --- This value is passed to the 1400 new Collection's constructor as `apiconfig`. If no value is provided, 1401 defaults to the configuration passed to this instance's constructor. 1402 """ 1403 if new_config is None: 1404 new_config = self._config 1405 if readonly: 1406 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1407 else: 1408 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1409 1410 newcollection._clonefrom(self) 1411 return newcollection 1412 1413 @synchronized 1414 def api_response(self) -> Optional[Dict[str, Any]]: 1415 """Get this instance's associated API record 1416 1417 If this Collection instance has an associated API record, return it. 1418 Otherwise, return `None`. 1419 """ 1420 return self._api_response 1421 1422 def find_or_create( 1423 self, 1424 path: str, 1425 create_type: CreateType, 1426 ) -> CollectionItem: 1427 if path == ".": 1428 return self 1429 else: 1430 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) 1431 1432 def find(self, path: str) -> CollectionItem: 1433 if path == ".": 1434 return self 1435 else: 1436 return super(Collection, self).find(path[2:] if path.startswith("./") else path) 1437 1438 def remove(self, path: str, recursive: bool=False) -> None: 1439 if path == ".": 1440 raise errors.ArgumentError("Cannot remove '.'") 1441 else: 1442 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive) 1443 1444 @must_be_writable 1445 @synchronized 1446 @retry_method 1447 def save( 1448 self, 1449 properties: Optional[Properties]=None, 1450 storage_classes: Optional[StorageClasses]=None, 1451 trash_at: Optional[datetime.datetime]=None, 1452 merge: bool=True, 1453 num_retries: Optional[int]=None, 1454 preserve_version: bool=False, 1455 ) -> str: 1456 """Save collection to an existing API record 1457 1458 This method updates the instance's corresponding API record to match 1459 the instance's state. If this instance does not have a corresponding API 1460 record yet, raises `AssertionError`. (To create a new API record, use 1461 `Collection.save_new`.) This method returns the saved collection 1462 manifest. 1463 1464 Arguments: 1465 1466 * properties: dict[str, Any] | None --- If provided, the API record will 1467 be updated with these properties. Note this will completely replace 1468 any existing properties. 1469 1470 * storage_classes: list[str] | None --- If provided, the API record will 1471 be updated with this value in the `storage_classes_desired` field. 1472 This value will also be saved on the instance and used for any 1473 changes that follow. 1474 1475 * trash_at: datetime.datetime | None --- If provided, the API record 1476 will be updated with this value in the `trash_at` field. 1477 1478 * merge: bool --- If `True` (the default), this method will first 1479 reload this collection's API record, and merge any new contents into 1480 this instance before saving changes. See `Collection.update` for 1481 details. 1482 1483 * num_retries: int | None --- The number of times to retry reloading 1484 the collection's API record from the API server. If not specified, 1485 uses the `num_retries` provided when this instance was constructed. 1486 1487 * preserve_version: bool --- This value will be passed to directly 1488 to the underlying API call. If `True`, the Arvados API will 1489 preserve the versions of this collection both immediately before 1490 and after the update. If `True` when the API server is not 1491 configured with collection versioning, this method raises 1492 `arvados.errors.ArgumentError`. 1493 """ 1494 if properties and type(properties) is not dict: 1495 raise errors.ArgumentError("properties must be dictionary type.") 1496 1497 if storage_classes and type(storage_classes) is not list: 1498 raise errors.ArgumentError("storage_classes must be list type.") 1499 if storage_classes: 1500 self._storage_classes_desired = storage_classes 1501 1502 if trash_at and type(trash_at) is not datetime.datetime: 1503 raise errors.ArgumentError("trash_at must be datetime type.") 1504 1505 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1506 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1507 1508 body={} 1509 if properties: 1510 body["properties"] = properties 1511 if self.storage_classes_desired(): 1512 body["storage_classes_desired"] = self.storage_classes_desired() 1513 if trash_at: 1514 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1515 body["trash_at"] = t 1516 if preserve_version: 1517 body["preserve_version"] = preserve_version 1518 1519 if not self.committed(): 1520 if self._has_remote_blocks: 1521 # Copy any remote blocks to the local cluster. 1522 self._copy_remote_blocks(remote_blocks={}) 1523 self._has_remote_blocks = False 1524 if not self._has_collection_uuid(): 1525 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1526 elif not self._has_local_collection_uuid(): 1527 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1528 1529 self._my_block_manager().commit_all() 1530 1531 if merge: 1532 self.update() 1533 1534 text = self.manifest_text(strip=False) 1535 body['manifest_text'] = text 1536 1537 self._remember_api_response(self._my_api().collections().update( 1538 uuid=self._manifest_locator, 1539 body=body 1540 ).execute(num_retries=num_retries)) 1541 self._manifest_text = self._api_response["manifest_text"] 1542 self._portable_data_hash = self._api_response["portable_data_hash"] 1543 self.set_committed(True) 1544 elif body: 1545 self._remember_api_response(self._my_api().collections().update( 1546 uuid=self._manifest_locator, 1547 body=body 1548 ).execute(num_retries=num_retries)) 1549 1550 return self._manifest_text 1551 1552 1553 @must_be_writable 1554 @synchronized 1555 @retry_method 1556 def save_new( 1557 self, 1558 name: Optional[str]=None, 1559 create_collection_record: bool=True, 1560 owner_uuid: Optional[str]=None, 1561 properties: Optional[Properties]=None, 1562 storage_classes: Optional[StorageClasses]=None, 1563 trash_at: Optional[datetime.datetime]=None, 1564 ensure_unique_name: bool=False, 1565 num_retries: Optional[int]=None, 1566 preserve_version: bool=False, 1567 ): 1568 """Save collection to a new API record 1569 1570 This method finishes uploading new data blocks and (optionally) 1571 creates a new API collection record with the provided data. If a new 1572 record is created, this instance becomes associated with that record 1573 for future updates like `save()`. This method returns the saved 1574 collection manifest. 1575 1576 Arguments: 1577 1578 * name: str | None --- The `name` field to use on the new collection 1579 record. If not specified, a generic default name is generated. 1580 1581 * create_collection_record: bool --- If `True` (the default), creates a 1582 collection record on the API server. If `False`, the method finishes 1583 all data uploads and only returns the resulting collection manifest 1584 without sending it to the API server. 1585 1586 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1587 new collection record. 1588 1589 * properties: dict[str, Any] | None --- The `properties` field to use on 1590 the new collection record. 1591 1592 * storage_classes: list[str] | None --- The 1593 `storage_classes_desired` field to use on the new collection record. 1594 1595 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1596 on the new collection record. 1597 1598 * ensure_unique_name: bool --- This value is passed directly to the 1599 Arvados API when creating the collection record. If `True`, the API 1600 server may modify the submitted `name` to ensure the collection's 1601 `name`+`owner_uuid` combination is unique. If `False` (the default), 1602 if a collection already exists with this same `name`+`owner_uuid` 1603 combination, creating a collection record will raise a validation 1604 error. 1605 1606 * num_retries: int | None --- The number of times to retry reloading 1607 the collection's API record from the API server. If not specified, 1608 uses the `num_retries` provided when this instance was constructed. 1609 1610 * preserve_version: bool --- This value will be passed to directly 1611 to the underlying API call. If `True`, the Arvados API will 1612 preserve the versions of this collection both immediately before 1613 and after the update. If `True` when the API server is not 1614 configured with collection versioning, this method raises 1615 `arvados.errors.ArgumentError`. 1616 """ 1617 if properties and type(properties) is not dict: 1618 raise errors.ArgumentError("properties must be dictionary type.") 1619 1620 if storage_classes and type(storage_classes) is not list: 1621 raise errors.ArgumentError("storage_classes must be list type.") 1622 1623 if trash_at and type(trash_at) is not datetime.datetime: 1624 raise errors.ArgumentError("trash_at must be datetime type.") 1625 1626 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1627 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1628 1629 if self._has_remote_blocks: 1630 # Copy any remote blocks to the local cluster. 1631 self._copy_remote_blocks(remote_blocks={}) 1632 self._has_remote_blocks = False 1633 1634 if storage_classes: 1635 self._storage_classes_desired = storage_classes 1636 1637 self._my_block_manager().commit_all() 1638 text = self.manifest_text(strip=False) 1639 1640 if create_collection_record: 1641 if name is None: 1642 name = "New collection" 1643 ensure_unique_name = True 1644 1645 body = {"manifest_text": text, 1646 "name": name, 1647 "replication_desired": self.replication_desired} 1648 if owner_uuid: 1649 body["owner_uuid"] = owner_uuid 1650 if properties: 1651 body["properties"] = properties 1652 if self.storage_classes_desired(): 1653 body["storage_classes_desired"] = self.storage_classes_desired() 1654 if trash_at: 1655 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1656 body["trash_at"] = t 1657 if preserve_version: 1658 body["preserve_version"] = preserve_version 1659 1660 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1661 text = self._api_response["manifest_text"] 1662 1663 self._manifest_locator = self._api_response["uuid"] 1664 self._portable_data_hash = self._api_response["portable_data_hash"] 1665 1666 self._manifest_text = text 1667 self.set_committed(True) 1668 1669 return text 1670 1671 _token_re = re.compile(r'(\S+)(\s+|$)') 1672 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1673 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1674 1675 def _unescape_manifest_path(self, path): 1676 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) 1677 1678 @synchronized 1679 def _import_manifest(self, manifest_text): 1680 """Import a manifest into a `Collection`. 1681 1682 :manifest_text: 1683 The manifest text to import from. 1684 1685 """ 1686 if len(self) > 0: 1687 raise ArgumentError("Can only import manifest into an empty collection") 1688 1689 STREAM_NAME = 0 1690 BLOCKS = 1 1691 SEGMENTS = 2 1692 1693 stream_name = None 1694 state = STREAM_NAME 1695 1696 for token_and_separator in self._token_re.finditer(manifest_text): 1697 tok = token_and_separator.group(1) 1698 sep = token_and_separator.group(2) 1699 1700 if state == STREAM_NAME: 1701 # starting a new stream 1702 stream_name = self._unescape_manifest_path(tok) 1703 blocks = [] 1704 segments = [] 1705 streamoffset = 0 1706 state = BLOCKS 1707 self.find_or_create(stream_name, COLLECTION) 1708 continue 1709 1710 if state == BLOCKS: 1711 block_locator = self._block_re.match(tok) 1712 if block_locator: 1713 blocksize = int(block_locator.group(1)) 1714 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1715 streamoffset += blocksize 1716 else: 1717 state = SEGMENTS 1718 1719 if state == SEGMENTS: 1720 file_segment = self._segment_re.match(tok) 1721 if file_segment: 1722 pos = int(file_segment.group(1)) 1723 size = int(file_segment.group(2)) 1724 name = self._unescape_manifest_path(file_segment.group(3)) 1725 if name.split('/')[-1] == '.': 1726 # placeholder for persisting an empty directory, not a real file 1727 if len(name) > 2: 1728 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1729 else: 1730 filepath = os.path.join(stream_name, name) 1731 try: 1732 afile = self.find_or_create(filepath, FILE) 1733 except IOError as e: 1734 if e.errno == errno.ENOTDIR: 1735 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None 1736 else: 1737 raise e from None 1738 if isinstance(afile, ArvadosFile): 1739 afile.add_segment(blocks, pos, size) 1740 else: 1741 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1742 else: 1743 # error! 1744 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1745 1746 if sep == "\n": 1747 stream_name = None 1748 state = STREAM_NAME 1749 1750 self.set_committed(True) 1751 1752 @synchronized 1753 def notify( 1754 self, 1755 event: ChangeType, 1756 collection: 'RichCollectionBase', 1757 name: str, 1758 item: CollectionItem, 1759 ) -> None: 1760 if self._callback: 1761 self._callback(event, collection, name, item)
Read and manipulate an Arvados collection
This class provides a high-level interface to create, read, and update Arvados collections and their contents. Refer to the Arvados Python SDK cookbook for an introduction to using the Collection class.
1043 def __init__(self, manifest_locator_or_text: Optional[str]=None, 1044 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, 1045 keep_client: Optional['arvados.keep.KeepClient']=None, 1046 num_retries: int=10, 1047 parent: Optional['Collection']=None, 1048 apiconfig: Optional[Mapping[str, str]]=None, 1049 block_manager: Optional['arvados.arvfile._BlockManager']=None, 1050 replication_desired: Optional[int]=None, 1051 storage_classes_desired: Optional[List[str]]=None, 1052 put_threads: Optional[int]=None): 1053 """Initialize a Collection object 1054 1055 Arguments: 1056 1057 * manifest_locator_or_text: str | None --- This string can contain a 1058 collection manifest text, portable data hash, or UUID. When given a 1059 portable data hash or UUID, this instance will load a collection 1060 record from the API server. Otherwise, this instance will represent a 1061 new collection without an API server record. The default value `None` 1062 instantiates a new collection with an empty manifest. 1063 1064 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The 1065 Arvados API client object this instance uses to make requests. If 1066 none is given, this instance creates its own client using the 1067 settings from `apiconfig` (see below). If your client instantiates 1068 many Collection objects, you can help limit memory utilization by 1069 calling `arvados.api.api` to construct an 1070 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` 1071 for every Collection. 1072 1073 * keep_client: arvados.keep.KeepClient | None --- The Keep client 1074 object this instance uses to make requests. If none is given, this 1075 instance creates its own client using its `api_client`. 1076 1077 * num_retries: int --- The number of times that client requests are 1078 retried. Default 10. 1079 1080 * parent: arvados.collection.Collection | None --- The parent Collection 1081 object of this instance, if any. This argument is primarily used by 1082 other Collection methods; user client code shouldn't need to use it. 1083 1084 * apiconfig: Mapping[str, str] | None --- A mapping with entries for 1085 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally 1086 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the 1087 Collection object constructs one from these settings. If no 1088 mapping is provided, calls `arvados.config.settings` to get these 1089 parameters from user configuration. 1090 1091 * block_manager: arvados.arvfile._BlockManager | None --- The 1092 _BlockManager object used by this instance to coordinate reading 1093 and writing Keep data blocks. If none is given, this instance 1094 constructs its own. This argument is primarily used by other 1095 Collection methods; user client code shouldn't need to use it. 1096 1097 * replication_desired: int | None --- This controls both the value of 1098 the `replication_desired` field on API collection records saved by 1099 this class, as well as the number of Keep services that the object 1100 writes new data blocks to. If none is given, uses the default value 1101 configured for the cluster. 1102 1103 * storage_classes_desired: list[str] | None --- This controls both 1104 the value of the `storage_classes_desired` field on API collection 1105 records saved by this class, as well as selecting which specific 1106 Keep services the object writes new data blocks to. If none is 1107 given, defaults to an empty list. 1108 1109 * put_threads: int | None --- The number of threads to run 1110 simultaneously to upload data blocks to Keep. This value is used when 1111 building a new `block_manager`. It is unused when a `block_manager` 1112 is provided. 1113 """ 1114 1115 if storage_classes_desired and type(storage_classes_desired) is not list: 1116 raise errors.ArgumentError("storage_classes_desired must be list type.") 1117 1118 super(Collection, self).__init__(parent) 1119 self._api_client = api_client 1120 self._keep_client = keep_client 1121 1122 # Use the keep client from ThreadSafeApiCache 1123 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): 1124 self._keep_client = self._api_client.keep 1125 1126 self._block_manager = block_manager 1127 self.replication_desired = replication_desired 1128 self._storage_classes_desired = storage_classes_desired 1129 self.put_threads = put_threads 1130 1131 if apiconfig: 1132 self._config = apiconfig 1133 else: 1134 self._config = config.settings() 1135 1136 self.num_retries = num_retries 1137 self._manifest_locator = None 1138 self._manifest_text = None 1139 self._portable_data_hash = None 1140 self._api_response = None 1141 self._past_versions = set() 1142 1143 self.lock = threading.RLock() 1144 self.events = None 1145 1146 if manifest_locator_or_text: 1147 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1148 self._manifest_locator = manifest_locator_or_text 1149 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1150 self._manifest_locator = manifest_locator_or_text 1151 if not self._has_local_collection_uuid(): 1152 self._has_remote_blocks = True 1153 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1154 self._manifest_text = manifest_locator_or_text 1155 if '+R' in self._manifest_text: 1156 self._has_remote_blocks = True 1157 else: 1158 raise errors.ArgumentError( 1159 "Argument to CollectionReader is not a manifest or a collection UUID") 1160 1161 try: 1162 self._populate() 1163 except errors.SyntaxError as e: 1164 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.safeapi.ThreadSafeApiCache
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
1166 def storage_classes_desired(self) -> List[str]: 1167 """Get this collection's `storage_classes_desired` value""" 1168 return self._storage_classes_desired or []
Get this collection’s storage_classes_desired
value
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
1173 def get_properties(self) -> Properties: 1174 """Get this collection's properties 1175 1176 This method always returns a dict. If this collection object does not 1177 have an associated API record, or that record does not have any 1178 properties set, this method returns an empty dict. 1179 """ 1180 if self._api_response and self._api_response["properties"]: 1181 return self._api_response["properties"] 1182 else: 1183 return {}
Get this collection’s properties
This method always returns a dict. If this collection object does not have an associated API record, or that record does not have any properties set, this method returns an empty dict.
1185 def get_trash_at(self) -> Optional[datetime.datetime]: 1186 """Get this collection's `trash_at` field 1187 1188 This method parses the `trash_at` field of the collection's API 1189 record and returns a datetime from it. If that field is not set, or 1190 this collection object does not have an associated API record, 1191 returns None. 1192 """ 1193 if self._api_response and self._api_response["trash_at"]: 1194 try: 1195 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1196 except ValueError: 1197 return None 1198 else: 1199 return None
Get this collection’s trash_at
field
This method parses the trash_at
field of the collection’s API
record and returns a datetime from it. If that field is not set, or
this collection object does not have an associated API record,
returns None.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
1207 @synchronized 1208 def known_past_version( 1209 self, 1210 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] 1211 ) -> bool: 1212 """Indicate whether an API record for this collection has been seen before 1213 1214 As this collection object loads records from the API server, it records 1215 their `modified_at` and `portable_data_hash` fields. This method accepts 1216 a 2-tuple with values for those fields, and returns `True` if the 1217 combination was previously loaded. 1218 """ 1219 return modified_at_and_portable_data_hash in self._past_versions
Indicate whether an API record for this collection has been seen before
As this collection object loads records from the API server, it records
their modified_at
and portable_data_hash
fields. This method accepts
a 2-tuple with values for those fields, and returns True
if the
combination was previously loaded.
1221 @synchronized 1222 @retry_method 1223 def update( 1224 self, 1225 other: Optional['Collection']=None, 1226 num_retries: Optional[int]=None, 1227 ) -> None: 1228 """Merge another collection's contents into this one 1229 1230 This method compares the manifest of this collection instance with 1231 another, then updates this instance's manifest with changes from the 1232 other, renaming files to flag conflicts where necessary. 1233 1234 When called without any arguments, this method reloads the collection's 1235 API record, and updates this instance with any changes that have 1236 appeared server-side. If this instance does not have a corresponding 1237 API record, this method raises `arvados.errors.ArgumentError`. 1238 1239 Arguments: 1240 1241 * other: arvados.collection.Collection | None --- The collection 1242 whose contents should be merged into this instance. When not 1243 provided, this method reloads this collection's API record and 1244 constructs a Collection object from it. If this instance does not 1245 have a corresponding API record, this method raises 1246 `arvados.errors.ArgumentError`. 1247 1248 * num_retries: int | None --- The number of times to retry reloading 1249 the collection's API record from the API server. If not specified, 1250 uses the `num_retries` provided when this instance was constructed. 1251 """ 1252 if other is None: 1253 if self._manifest_locator is None: 1254 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1255 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1256 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1257 response.get("portable_data_hash") != self.portable_data_hash()): 1258 # The record on the server is different from our current one, but we've seen it before, 1259 # so ignore it because it's already been merged. 1260 # However, if it's the same as our current record, proceed with the update, because we want to update 1261 # our tokens. 1262 return 1263 else: 1264 self._remember_api_response(response) 1265 other = CollectionReader(response["manifest_text"]) 1266 baseline = CollectionReader(self._manifest_text) 1267 self.apply(baseline.diff(other)) 1268 self._manifest_text = self.manifest_text()
Merge another collection’s contents into this one
This method compares the manifest of this collection instance with another, then updates this instance’s manifest with changes from the other, renaming files to flag conflicts where necessary.
When called without any arguments, this method reloads the collection’s
API record, and updates this instance with any changes that have
appeared server-side. If this instance does not have a corresponding
API record, this method raises arvados.errors.ArgumentError
.
Arguments:
other: Collection | None — The collection whose contents should be merged into this instance. When not provided, this method reloads this collection’s API record and constructs a Collection object from it. If this instance does not have a corresponding API record, this method raises
arvados.errors.ArgumentError
.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.
1356 def stop_threads(self) -> None: 1357 """Stop background Keep upload/download threads""" 1358 if self._block_manager is not None: 1359 self._block_manager.stop_threads()
Stop background Keep upload/download threads
1361 @synchronized 1362 def manifest_locator(self) -> Optional[str]: 1363 """Get this collection's manifest locator, if any 1364 1365 * If this collection instance is associated with an API record with a 1366 UUID, return that. 1367 * Otherwise, if this collection instance was loaded from an API record 1368 by portable data hash, return that. 1369 * Otherwise, return `None`. 1370 """ 1371 return self._manifest_locator
Get this collection’s manifest locator, if any
- If this collection instance is associated with an API record with a UUID, return that.
- Otherwise, if this collection instance was loaded from an API record by portable data hash, return that.
- Otherwise, return
None
.
1373 @synchronized 1374 def clone( 1375 self, 1376 new_parent: Optional['Collection']=None, 1377 new_name: Optional[str]=None, 1378 readonly: bool=False, 1379 new_config: Optional[Mapping[str, str]]=None, 1380 ) -> 'Collection': 1381 """Create a Collection object with the same contents as this instance 1382 1383 This method creates a new Collection object with contents that match 1384 this instance's. The new collection will not be associated with any API 1385 record. 1386 1387 Arguments: 1388 1389 * new_parent: arvados.collection.Collection | None --- This value is 1390 passed to the new Collection's constructor as the `parent` 1391 argument. 1392 1393 * new_name: str | None --- This value is unused. 1394 1395 * readonly: bool --- If this value is true, this method constructs and 1396 returns a `CollectionReader`. Otherwise, it returns a mutable 1397 `Collection`. Default `False`. 1398 1399 * new_config: Mapping[str, str] | None --- This value is passed to the 1400 new Collection's constructor as `apiconfig`. If no value is provided, 1401 defaults to the configuration passed to this instance's constructor. 1402 """ 1403 if new_config is None: 1404 new_config = self._config 1405 if readonly: 1406 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1407 else: 1408 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1409 1410 newcollection._clonefrom(self) 1411 return newcollection
Create a Collection object with the same contents as this instance
This method creates a new Collection object with contents that match this instance’s. The new collection will not be associated with any API record.
Arguments:
new_parent: Collection | None — This value is passed to the new Collection’s constructor as the
parent
argument.new_name: str | None — This value is unused.
readonly: bool — If this value is true, this method constructs and returns a
CollectionReader
. Otherwise, it returns a mutableCollection
. DefaultFalse
.new_config: Mapping[str, str] | None — This value is passed to the new Collection’s constructor as
apiconfig
. If no value is provided, defaults to the configuration passed to this instance’s constructor.
1413 @synchronized 1414 def api_response(self) -> Optional[Dict[str, Any]]: 1415 """Get this instance's associated API record 1416 1417 If this Collection instance has an associated API record, return it. 1418 Otherwise, return `None`. 1419 """ 1420 return self._api_response
Get this instance’s associated API record
If this Collection instance has an associated API record, return it.
Otherwise, return None
.
1422 def find_or_create( 1423 self, 1424 path: str, 1425 create_type: CreateType, 1426 ) -> CollectionItem: 1427 if path == ".": 1428 return self 1429 else: 1430 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
Get the item at the given path, creating it if necessary
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method creates a new object and returns
it, creating parent streams as needed. The type of object created is
determined by the value of create_type
.
Arguments:
path: str — The path to find or create within this collection.
create_type: Literal[COLLECTION, FILE] — The type of object to create at
path
if one does not exist. PassingCOLLECTION
creates a stream and returns the correspondingSubcollection
. PassingFILE
creates a new file and returns the correspondingarvados.arvfile.ArvadosFile
.
1432 def find(self, path: str) -> CollectionItem: 1433 if path == ".": 1434 return self 1435 else: 1436 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
Get the item at the given path
If path
refers to a stream in this collection, returns a
corresponding Subcollection
object. If path
refers to a file in
this collection, returns a corresponding
arvados.arvfile.ArvadosFile
object. If path
does not exist in
this collection, then this method raises NotADirectoryError
.
Arguments:
- path: str — The path to find or create within this collection.
1438 def remove(self, path: str, recursive: bool=False) -> None: 1439 if path == ".": 1440 raise errors.ArgumentError("Cannot remove '.'") 1441 else: 1442 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
Remove the file or stream at path
Arguments:
path: str — The path of the item to remove from the collection
recursive: bool — Controls the method’s behavior if
path
refers to a nonempty stream. IfFalse
(the default), this method raisesOSError
with errnoENOTEMPTY
. IfTrue
, this method removes all items under the stream.
1444 @must_be_writable 1445 @synchronized 1446 @retry_method 1447 def save( 1448 self, 1449 properties: Optional[Properties]=None, 1450 storage_classes: Optional[StorageClasses]=None, 1451 trash_at: Optional[datetime.datetime]=None, 1452 merge: bool=True, 1453 num_retries: Optional[int]=None, 1454 preserve_version: bool=False, 1455 ) -> str: 1456 """Save collection to an existing API record 1457 1458 This method updates the instance's corresponding API record to match 1459 the instance's state. If this instance does not have a corresponding API 1460 record yet, raises `AssertionError`. (To create a new API record, use 1461 `Collection.save_new`.) This method returns the saved collection 1462 manifest. 1463 1464 Arguments: 1465 1466 * properties: dict[str, Any] | None --- If provided, the API record will 1467 be updated with these properties. Note this will completely replace 1468 any existing properties. 1469 1470 * storage_classes: list[str] | None --- If provided, the API record will 1471 be updated with this value in the `storage_classes_desired` field. 1472 This value will also be saved on the instance and used for any 1473 changes that follow. 1474 1475 * trash_at: datetime.datetime | None --- If provided, the API record 1476 will be updated with this value in the `trash_at` field. 1477 1478 * merge: bool --- If `True` (the default), this method will first 1479 reload this collection's API record, and merge any new contents into 1480 this instance before saving changes. See `Collection.update` for 1481 details. 1482 1483 * num_retries: int | None --- The number of times to retry reloading 1484 the collection's API record from the API server. If not specified, 1485 uses the `num_retries` provided when this instance was constructed. 1486 1487 * preserve_version: bool --- This value will be passed to directly 1488 to the underlying API call. If `True`, the Arvados API will 1489 preserve the versions of this collection both immediately before 1490 and after the update. If `True` when the API server is not 1491 configured with collection versioning, this method raises 1492 `arvados.errors.ArgumentError`. 1493 """ 1494 if properties and type(properties) is not dict: 1495 raise errors.ArgumentError("properties must be dictionary type.") 1496 1497 if storage_classes and type(storage_classes) is not list: 1498 raise errors.ArgumentError("storage_classes must be list type.") 1499 if storage_classes: 1500 self._storage_classes_desired = storage_classes 1501 1502 if trash_at and type(trash_at) is not datetime.datetime: 1503 raise errors.ArgumentError("trash_at must be datetime type.") 1504 1505 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1506 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1507 1508 body={} 1509 if properties: 1510 body["properties"] = properties 1511 if self.storage_classes_desired(): 1512 body["storage_classes_desired"] = self.storage_classes_desired() 1513 if trash_at: 1514 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1515 body["trash_at"] = t 1516 if preserve_version: 1517 body["preserve_version"] = preserve_version 1518 1519 if not self.committed(): 1520 if self._has_remote_blocks: 1521 # Copy any remote blocks to the local cluster. 1522 self._copy_remote_blocks(remote_blocks={}) 1523 self._has_remote_blocks = False 1524 if not self._has_collection_uuid(): 1525 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1526 elif not self._has_local_collection_uuid(): 1527 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1528 1529 self._my_block_manager().commit_all() 1530 1531 if merge: 1532 self.update() 1533 1534 text = self.manifest_text(strip=False) 1535 body['manifest_text'] = text 1536 1537 self._remember_api_response(self._my_api().collections().update( 1538 uuid=self._manifest_locator, 1539 body=body 1540 ).execute(num_retries=num_retries)) 1541 self._manifest_text = self._api_response["manifest_text"] 1542 self._portable_data_hash = self._api_response["portable_data_hash"] 1543 self.set_committed(True) 1544 elif body: 1545 self._remember_api_response(self._my_api().collections().update( 1546 uuid=self._manifest_locator, 1547 body=body 1548 ).execute(num_retries=num_retries)) 1549 1550 return self._manifest_text
Save collection to an existing API record
This method updates the instance’s corresponding API record to match
the instance’s state. If this instance does not have a corresponding API
record yet, raises AssertionError
. (To create a new API record, use
Collection.save_new
.) This method returns the saved collection
manifest.
Arguments:
properties: dict[str, Any] | None — If provided, the API record will be updated with these properties. Note this will completely replace any existing properties.
storage_classes: list[str] | None — If provided, the API record will be updated with this value in the
storage_classes_desired
field. This value will also be saved on the instance and used for any changes that follow.trash_at: datetime.datetime | None — If provided, the API record will be updated with this value in the
trash_at
field.merge: bool — If
True
(the default), this method will first reload this collection’s API record, and merge any new contents into this instance before saving changes. SeeCollection.update
for details.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1553 @must_be_writable 1554 @synchronized 1555 @retry_method 1556 def save_new( 1557 self, 1558 name: Optional[str]=None, 1559 create_collection_record: bool=True, 1560 owner_uuid: Optional[str]=None, 1561 properties: Optional[Properties]=None, 1562 storage_classes: Optional[StorageClasses]=None, 1563 trash_at: Optional[datetime.datetime]=None, 1564 ensure_unique_name: bool=False, 1565 num_retries: Optional[int]=None, 1566 preserve_version: bool=False, 1567 ): 1568 """Save collection to a new API record 1569 1570 This method finishes uploading new data blocks and (optionally) 1571 creates a new API collection record with the provided data. If a new 1572 record is created, this instance becomes associated with that record 1573 for future updates like `save()`. This method returns the saved 1574 collection manifest. 1575 1576 Arguments: 1577 1578 * name: str | None --- The `name` field to use on the new collection 1579 record. If not specified, a generic default name is generated. 1580 1581 * create_collection_record: bool --- If `True` (the default), creates a 1582 collection record on the API server. If `False`, the method finishes 1583 all data uploads and only returns the resulting collection manifest 1584 without sending it to the API server. 1585 1586 * owner_uuid: str | None --- The `owner_uuid` field to use on the 1587 new collection record. 1588 1589 * properties: dict[str, Any] | None --- The `properties` field to use on 1590 the new collection record. 1591 1592 * storage_classes: list[str] | None --- The 1593 `storage_classes_desired` field to use on the new collection record. 1594 1595 * trash_at: datetime.datetime | None --- The `trash_at` field to use 1596 on the new collection record. 1597 1598 * ensure_unique_name: bool --- This value is passed directly to the 1599 Arvados API when creating the collection record. If `True`, the API 1600 server may modify the submitted `name` to ensure the collection's 1601 `name`+`owner_uuid` combination is unique. If `False` (the default), 1602 if a collection already exists with this same `name`+`owner_uuid` 1603 combination, creating a collection record will raise a validation 1604 error. 1605 1606 * num_retries: int | None --- The number of times to retry reloading 1607 the collection's API record from the API server. If not specified, 1608 uses the `num_retries` provided when this instance was constructed. 1609 1610 * preserve_version: bool --- This value will be passed to directly 1611 to the underlying API call. If `True`, the Arvados API will 1612 preserve the versions of this collection both immediately before 1613 and after the update. If `True` when the API server is not 1614 configured with collection versioning, this method raises 1615 `arvados.errors.ArgumentError`. 1616 """ 1617 if properties and type(properties) is not dict: 1618 raise errors.ArgumentError("properties must be dictionary type.") 1619 1620 if storage_classes and type(storage_classes) is not list: 1621 raise errors.ArgumentError("storage_classes must be list type.") 1622 1623 if trash_at and type(trash_at) is not datetime.datetime: 1624 raise errors.ArgumentError("trash_at must be datetime type.") 1625 1626 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False): 1627 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.") 1628 1629 if self._has_remote_blocks: 1630 # Copy any remote blocks to the local cluster. 1631 self._copy_remote_blocks(remote_blocks={}) 1632 self._has_remote_blocks = False 1633 1634 if storage_classes: 1635 self._storage_classes_desired = storage_classes 1636 1637 self._my_block_manager().commit_all() 1638 text = self.manifest_text(strip=False) 1639 1640 if create_collection_record: 1641 if name is None: 1642 name = "New collection" 1643 ensure_unique_name = True 1644 1645 body = {"manifest_text": text, 1646 "name": name, 1647 "replication_desired": self.replication_desired} 1648 if owner_uuid: 1649 body["owner_uuid"] = owner_uuid 1650 if properties: 1651 body["properties"] = properties 1652 if self.storage_classes_desired(): 1653 body["storage_classes_desired"] = self.storage_classes_desired() 1654 if trash_at: 1655 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1656 body["trash_at"] = t 1657 if preserve_version: 1658 body["preserve_version"] = preserve_version 1659 1660 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1661 text = self._api_response["manifest_text"] 1662 1663 self._manifest_locator = self._api_response["uuid"] 1664 self._portable_data_hash = self._api_response["portable_data_hash"] 1665 1666 self._manifest_text = text 1667 self.set_committed(True) 1668 1669 return text
Save collection to a new API record
This method finishes uploading new data blocks and (optionally)
creates a new API collection record with the provided data. If a new
record is created, this instance becomes associated with that record
for future updates like save()
. This method returns the saved
collection manifest.
Arguments:
name: str | None — The
name
field to use on the new collection record. If not specified, a generic default name is generated.create_collection_record: bool — If
True
(the default), creates a collection record on the API server. IfFalse
, the method finishes all data uploads and only returns the resulting collection manifest without sending it to the API server.owner_uuid: str | None — The
owner_uuid
field to use on the new collection record.properties: dict[str, Any] | None — The
properties
field to use on the new collection record.storage_classes: list[str] | None — The
storage_classes_desired
field to use on the new collection record.trash_at: datetime.datetime | None — The
trash_at
field to use on the new collection record.ensure_unique_name: bool — This value is passed directly to the Arvados API when creating the collection record. If
True
, the API server may modify the submittedname
to ensure the collection’sname
+owner_uuid
combination is unique. IfFalse
(the default), if a collection already exists with this samename
+owner_uuid
combination, creating a collection record will raise a validation error.num_retries: int | None — The number of times to retry reloading the collection’s API record from the API server. If not specified, uses the
num_retries
provided when this instance was constructed.preserve_version: bool — This value will be passed to directly to the underlying API call. If
True
, the Arvados API will preserve the versions of this collection both immediately before and after the update. IfTrue
when the API server is not configured with collection versioning, this method raisesarvados.errors.ArgumentError
.
1752 @synchronized 1753 def notify( 1754 self, 1755 event: ChangeType, 1756 collection: 'RichCollectionBase', 1757 name: str, 1758 item: CollectionItem, 1759 ) -> None: 1760 if self._callback: 1761 self._callback(event, collection, name, item)
Notify any subscribed callback about a change to this collection
If a callback has been registered with RichCollectionBase.subscribe
,
it will be called with information about a change to this collection.
Then this notification will be propagated to this collection’s root.
Arguments:
event: Literal[ADD, DEL, MOD, TOK] — The type of modification to the collection.
collection: RichCollectionBase — The collection that was modified.
name: str — The name of the file or stream within
collection
that was modified.item: arvados.arvfile.ArvadosFile | Subcollection — The new contents at
name
withincollection
.
1764class Subcollection(RichCollectionBase): 1765 """Read and manipulate a stream/directory within an Arvados collection 1766 1767 This class represents a single stream (like a directory) within an Arvados 1768 `Collection`. It is returned by `Collection.find` and provides the same API. 1769 Operations that work on the API collection record propagate to the parent 1770 `Collection` object. 1771 """ 1772 1773 def __init__(self, parent, name): 1774 super(Subcollection, self).__init__(parent) 1775 self.lock = self.root_collection().lock 1776 self._manifest_text = None 1777 self.name = name 1778 self.num_retries = parent.num_retries 1779 1780 def root_collection(self) -> 'Collection': 1781 return self.parent.root_collection() 1782 1783 def writable(self) -> bool: 1784 return self.root_collection().writable() 1785 1786 def _my_api(self): 1787 return self.root_collection()._my_api() 1788 1789 def _my_keep(self): 1790 return self.root_collection()._my_keep() 1791 1792 def _my_block_manager(self): 1793 return self.root_collection()._my_block_manager() 1794 1795 def stream_name(self) -> str: 1796 return os.path.join(self.parent.stream_name(), self.name) 1797 1798 @synchronized 1799 def clone( 1800 self, 1801 new_parent: Optional['Collection']=None, 1802 new_name: Optional[str]=None, 1803 ) -> 'Subcollection': 1804 c = Subcollection(new_parent, new_name) 1805 c._clonefrom(self) 1806 return c 1807 1808 @must_be_writable 1809 @synchronized 1810 def _reparent(self, newparent, newname): 1811 self.set_committed(False) 1812 self.flush() 1813 self.parent.remove(self.name, recursive=True) 1814 self.parent = newparent 1815 self.name = newname 1816 self.lock = self.parent.root_collection().lock 1817 1818 @synchronized 1819 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): 1820 """Encode empty directories by using an \056-named (".") empty file""" 1821 if len(self._items) == 0: 1822 return "%s %s 0:0:\\056\n" % ( 1823 escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1824 return super(Subcollection, self)._get_manifest_text(stream_name, 1825 strip, normalize, 1826 only_committed)
Read and manipulate a stream/directory within an Arvados collection
This class represents a single stream (like a directory) within an Arvados
Collection
. It is returned by Collection.find
and provides the same API.
Operations that work on the API collection record propagate to the parent
Collection
object.
Get this collection’s root collection object
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the source Collection object.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
Get the name of the manifest stream represented by this collection
If you open a subcollection with Collection.find
, calling this method
on that subcollection returns the name of the stream you opened.
Inherited Members
1829class CollectionReader(Collection): 1830 """Read-only `Collection` subclass 1831 1832 This class will never create or update any API collection records. You can 1833 use this class for additional code safety when you only need to read 1834 existing collections. 1835 """ 1836 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1837 self._in_init = True 1838 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1839 self._in_init = False 1840 1841 # Forego any locking since it should never change once initialized. 1842 self.lock = NoopLock() 1843 1844 # Backwards compatability with old CollectionReader 1845 # all_streams() and all_files() 1846 self._streams = None 1847 1848 def writable(self) -> bool: 1849 return self._in_init 1850 1851 def _populate_streams(orig_func): 1852 @functools.wraps(orig_func) 1853 def populate_streams_wrapper(self, *args, **kwargs): 1854 # Defer populating self._streams until needed since it creates a copy of the manifest. 1855 if self._streams is None: 1856 if self._manifest_text: 1857 self._streams = [sline.split() 1858 for sline in self._manifest_text.split("\n") 1859 if sline] 1860 else: 1861 self._streams = [] 1862 return orig_func(self, *args, **kwargs) 1863 return populate_streams_wrapper 1864 1865 @arvados.util._deprecated('3.0', 'Collection iteration') 1866 @_populate_streams 1867 def normalize(self): 1868 """Normalize the streams returned by `all_streams`""" 1869 streams = {} 1870 for s in self.all_streams(): 1871 for f in s.all_files(): 1872 streamname, filename = split(s.name() + "/" + f.name()) 1873 if streamname not in streams: 1874 streams[streamname] = {} 1875 if filename not in streams[streamname]: 1876 streams[streamname][filename] = [] 1877 for r in f.segments: 1878 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1879 1880 self._streams = [normalize_stream(s, streams[s]) 1881 for s in sorted(streams)] 1882 1883 @arvados.util._deprecated('3.0', 'Collection iteration') 1884 @_populate_streams 1885 def all_streams(self): 1886 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1887 for s in self._streams] 1888 1889 @arvados.util._deprecated('3.0', 'Collection iteration') 1890 @_populate_streams 1891 def all_files(self): 1892 for s in self.all_streams(): 1893 for f in s.all_files(): 1894 yield f
Read-only Collection
subclass
This class will never create or update any API collection records. You can use this class for additional code safety when you only need to read existing collections.
1836 def __init__(self, manifest_locator_or_text, *args, **kwargs): 1837 self._in_init = True 1838 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1839 self._in_init = False 1840 1841 # Forego any locking since it should never change once initialized. 1842 self.lock = NoopLock() 1843 1844 # Backwards compatability with old CollectionReader 1845 # all_streams() and all_files() 1846 self._streams = None
Initialize a Collection object
Arguments:
manifest_locator_or_text: str | None — This string can contain a collection manifest text, portable data hash, or UUID. When given a portable data hash or UUID, this instance will load a collection record from the API server. Otherwise, this instance will represent a new collection without an API server record. The default value
None
instantiates a new collection with an empty manifest.api_client: arvados.api_resources.ArvadosAPIClient | None — The Arvados API client object this instance uses to make requests. If none is given, this instance creates its own client using the settings from
apiconfig
(see below). If your client instantiates many Collection objects, you can help limit memory utilization by callingarvados.api.api
to construct anarvados.safeapi.ThreadSafeApiCache
, and use that as theapi_client
for every Collection.keep_client: arvados.keep.KeepClient | None — The Keep client object this instance uses to make requests. If none is given, this instance creates its own client using its
api_client
.num_retries: int — The number of times that client requests are retried. Default 10.
parent: Collection | None — The parent Collection object of this instance, if any. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
apiconfig: Mapping[str, str] | None — A mapping with entries for
ARVADOS_API_HOST
,ARVADOS_API_TOKEN
, and optionallyARVADOS_API_HOST_INSECURE
. When noapi_client
is provided, the Collection object constructs one from these settings. If no mapping is provided, callsarvados.config.settings
to get these parameters from user configuration.block_manager: arvados.arvfile._BlockManager | None — The _BlockManager object used by this instance to coordinate reading and writing Keep data blocks. If none is given, this instance constructs its own. This argument is primarily used by other Collection methods; user client code shouldn’t need to use it.
replication_desired: int | None — This controls both the value of the
replication_desired
field on API collection records saved by this class, as well as the number of Keep services that the object writes new data blocks to. If none is given, uses the default value configured for the cluster.storage_classes_desired: list[str] | None — This controls both the value of the
storage_classes_desired
field on API collection records saved by this class, as well as selecting which specific Keep services the object writes new data blocks to. If none is given, defaults to an empty list.put_threads: int | None — The number of threads to run simultaneously to upload data blocks to Keep. This value is used when building a new
block_manager
. It is unused when ablock_manager
is provided.
Indicate whether this collection object can be modified
This method returns False
if this object is a CollectionReader
,
else True
.
1865 @arvados.util._deprecated('3.0', 'Collection iteration') 1866 @_populate_streams 1867 def normalize(self): 1868 """Normalize the streams returned by `all_streams`""" 1869 streams = {} 1870 for s in self.all_streams(): 1871 for f in s.all_files(): 1872 streamname, filename = split(s.name() + "/" + f.name()) 1873 if streamname not in streams: 1874 streams[streamname] = {} 1875 if filename not in streams[streamname]: 1876 streams[streamname][filename] = [] 1877 for r in f.segments: 1878 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1879 1880 self._streams = [normalize_stream(s, streams[s]) 1881 for s in sorted(streams)]
Normalize the streams returned by all_streams
1883 @arvados.util._deprecated('3.0', 'Collection iteration') 1884 @_populate_streams 1885 def all_streams(self): 1886 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1887 for s in self._streams]
1889 @arvados.util._deprecated('3.0', 'Collection iteration') 1890 @_populate_streams 1891 def all_files(self): 1892 for s in self.all_streams(): 1893 for f in s.all_files(): 1894 yield f
Inherited Members
1897class CollectionWriter(CollectionBase): 1898 """Create a new collection from scratch 1899 1900 .. WARNING:: Deprecated 1901 This class is deprecated. Prefer `arvados.collection.Collection` 1902 instead. 1903 """ 1904 1905 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 1906 def __init__(self, api_client=None, num_retries=0, replication=None): 1907 """Instantiate a CollectionWriter. 1908 1909 CollectionWriter lets you build a new Arvados Collection from scratch. 1910 Write files to it. The CollectionWriter will upload data to Keep as 1911 appropriate, and provide you with the Collection manifest text when 1912 you're finished. 1913 1914 Arguments: 1915 * api_client: The API client to use to look up Collections. If not 1916 provided, CollectionReader will build one from available Arvados 1917 configuration. 1918 * num_retries: The default number of times to retry failed 1919 service requests. Default 0. You may change this value 1920 after instantiation, but note those changes may not 1921 propagate to related objects like the Keep client. 1922 * replication: The number of copies of each block to store. 1923 If this argument is None or not supplied, replication is 1924 the server-provided default if available, otherwise 2. 1925 """ 1926 self._api_client = api_client 1927 self.num_retries = num_retries 1928 self.replication = (2 if replication is None else replication) 1929 self._keep_client = None 1930 self._data_buffer = [] 1931 self._data_buffer_len = 0 1932 self._current_stream_files = [] 1933 self._current_stream_length = 0 1934 self._current_stream_locators = [] 1935 self._current_stream_name = '.' 1936 self._current_file_name = None 1937 self._current_file_pos = 0 1938 self._finished_streams = [] 1939 self._close_file = None 1940 self._queued_file = None 1941 self._queued_dirents = deque() 1942 self._queued_trees = deque() 1943 self._last_open = None 1944 1945 def __exit__(self, exc_type, exc_value, traceback): 1946 if exc_type is None: 1947 self.finish() 1948 1949 def do_queued_work(self): 1950 # The work queue consists of three pieces: 1951 # * _queued_file: The file object we're currently writing to the 1952 # Collection. 1953 # * _queued_dirents: Entries under the current directory 1954 # (_queued_trees[0]) that we want to write or recurse through. 1955 # This may contain files from subdirectories if 1956 # max_manifest_depth == 0 for this directory. 1957 # * _queued_trees: Directories that should be written as separate 1958 # streams to the Collection. 1959 # This function handles the smallest piece of work currently queued 1960 # (current file, then current directory, then next directory) until 1961 # no work remains. The _work_THING methods each do a unit of work on 1962 # THING. _queue_THING methods add a THING to the work queue. 1963 while True: 1964 if self._queued_file: 1965 self._work_file() 1966 elif self._queued_dirents: 1967 self._work_dirents() 1968 elif self._queued_trees: 1969 self._work_trees() 1970 else: 1971 break 1972 1973 def _work_file(self): 1974 while True: 1975 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) 1976 if not buf: 1977 break 1978 self.write(buf) 1979 self.finish_current_file() 1980 if self._close_file: 1981 self._queued_file.close() 1982 self._close_file = None 1983 self._queued_file = None 1984 1985 def _work_dirents(self): 1986 path, stream_name, max_manifest_depth = self._queued_trees[0] 1987 if stream_name != self.current_stream_name(): 1988 self.start_new_stream(stream_name) 1989 while self._queued_dirents: 1990 dirent = self._queued_dirents.popleft() 1991 target = os.path.join(path, dirent) 1992 if os.path.isdir(target): 1993 self._queue_tree(target, 1994 os.path.join(stream_name, dirent), 1995 max_manifest_depth - 1) 1996 else: 1997 self._queue_file(target, dirent) 1998 break 1999 if not self._queued_dirents: 2000 self._queued_trees.popleft() 2001 2002 def _work_trees(self): 2003 path, stream_name, max_manifest_depth = self._queued_trees[0] 2004 d = arvados.util.listdir_recursive( 2005 path, max_depth = (None if max_manifest_depth == 0 else 0)) 2006 if d: 2007 self._queue_dirents(stream_name, d) 2008 else: 2009 self._queued_trees.popleft() 2010 2011 def _queue_file(self, source, filename=None): 2012 assert (self._queued_file is None), "tried to queue more than one file" 2013 if not hasattr(source, 'read'): 2014 source = open(source, 'rb') 2015 self._close_file = True 2016 else: 2017 self._close_file = False 2018 if filename is None: 2019 filename = os.path.basename(source.name) 2020 self.start_new_file(filename) 2021 self._queued_file = source 2022 2023 def _queue_dirents(self, stream_name, dirents): 2024 assert (not self._queued_dirents), "tried to queue more than one tree" 2025 self._queued_dirents = deque(sorted(dirents)) 2026 2027 def _queue_tree(self, path, stream_name, max_manifest_depth): 2028 self._queued_trees.append((path, stream_name, max_manifest_depth)) 2029 2030 def write_file(self, source, filename=None): 2031 self._queue_file(source, filename) 2032 self.do_queued_work() 2033 2034 def write_directory_tree(self, 2035 path, stream_name='.', max_manifest_depth=-1): 2036 self._queue_tree(path, stream_name, max_manifest_depth) 2037 self.do_queued_work() 2038 2039 def write(self, newdata): 2040 if isinstance(newdata, bytes): 2041 pass 2042 elif isinstance(newdata, str): 2043 newdata = newdata.encode() 2044 elif hasattr(newdata, '__iter__'): 2045 for s in newdata: 2046 self.write(s) 2047 return 2048 self._data_buffer.append(newdata) 2049 self._data_buffer_len += len(newdata) 2050 self._current_stream_length += len(newdata) 2051 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 2052 self.flush_data() 2053 2054 def open(self, streampath, filename=None): 2055 """open(streampath[, filename]) -> file-like object 2056 2057 Pass in the path of a file to write to the Collection, either as a 2058 single string or as two separate stream name and file name arguments. 2059 This method returns a file-like object you can write to add it to the 2060 Collection. 2061 2062 You may only have one file object from the Collection open at a time, 2063 so be sure to close the object when you're done. Using the object in 2064 a with statement makes that easy: 2065 2066 with cwriter.open('./doc/page1.txt') as outfile: 2067 outfile.write(page1_data) 2068 with cwriter.open('./doc/page2.txt') as outfile: 2069 outfile.write(page2_data) 2070 """ 2071 if filename is None: 2072 streampath, filename = split(streampath) 2073 if self._last_open and not self._last_open.closed: 2074 raise errors.AssertionError( 2075 u"can't open '{}' when '{}' is still open".format( 2076 filename, self._last_open.name)) 2077 if streampath != self.current_stream_name(): 2078 self.start_new_stream(streampath) 2079 self.set_current_file_name(filename) 2080 self._last_open = _WriterFile(self, filename) 2081 return self._last_open 2082 2083 def flush_data(self): 2084 data_buffer = b''.join(self._data_buffer) 2085 if data_buffer: 2086 self._current_stream_locators.append( 2087 self._my_keep().put( 2088 data_buffer[0:config.KEEP_BLOCK_SIZE], 2089 copies=self.replication)) 2090 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 2091 self._data_buffer_len = len(self._data_buffer[0]) 2092 2093 def start_new_file(self, newfilename=None): 2094 self.finish_current_file() 2095 self.set_current_file_name(newfilename) 2096 2097 def set_current_file_name(self, newfilename): 2098 if re.search(r'[\t\n]', newfilename): 2099 raise errors.AssertionError( 2100 "Manifest filenames cannot contain whitespace: %s" % 2101 newfilename) 2102 elif re.search(r'\x00', newfilename): 2103 raise errors.AssertionError( 2104 "Manifest filenames cannot contain NUL characters: %s" % 2105 newfilename) 2106 self._current_file_name = newfilename 2107 2108 def current_file_name(self): 2109 return self._current_file_name 2110 2111 def finish_current_file(self): 2112 if self._current_file_name is None: 2113 if self._current_file_pos == self._current_stream_length: 2114 return 2115 raise errors.AssertionError( 2116 "Cannot finish an unnamed file " + 2117 "(%d bytes at offset %d in '%s' stream)" % 2118 (self._current_stream_length - self._current_file_pos, 2119 self._current_file_pos, 2120 self._current_stream_name)) 2121 self._current_stream_files.append([ 2122 self._current_file_pos, 2123 self._current_stream_length - self._current_file_pos, 2124 self._current_file_name]) 2125 self._current_file_pos = self._current_stream_length 2126 self._current_file_name = None 2127 2128 def start_new_stream(self, newstreamname='.'): 2129 self.finish_current_stream() 2130 self.set_current_stream_name(newstreamname) 2131 2132 def set_current_stream_name(self, newstreamname): 2133 if re.search(r'[\t\n]', newstreamname): 2134 raise errors.AssertionError( 2135 "Manifest stream names cannot contain whitespace: '%s'" % 2136 (newstreamname)) 2137 self._current_stream_name = '.' if newstreamname=='' else newstreamname 2138 2139 def current_stream_name(self): 2140 return self._current_stream_name 2141 2142 def finish_current_stream(self): 2143 self.finish_current_file() 2144 self.flush_data() 2145 if not self._current_stream_files: 2146 pass 2147 elif self._current_stream_name is None: 2148 raise errors.AssertionError( 2149 "Cannot finish an unnamed stream (%d bytes in %d files)" % 2150 (self._current_stream_length, len(self._current_stream_files))) 2151 else: 2152 if not self._current_stream_locators: 2153 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 2154 self._finished_streams.append([self._current_stream_name, 2155 self._current_stream_locators, 2156 self._current_stream_files]) 2157 self._current_stream_files = [] 2158 self._current_stream_length = 0 2159 self._current_stream_locators = [] 2160 self._current_stream_name = None 2161 self._current_file_pos = 0 2162 self._current_file_name = None 2163 2164 def finish(self): 2165 """Store the manifest in Keep and return its locator. 2166 2167 This is useful for storing manifest fragments (task outputs) 2168 temporarily in Keep during a Crunch job. 2169 2170 In other cases you should make a collection instead, by 2171 sending manifest_text() to the API server's "create 2172 collection" endpoint. 2173 """ 2174 return self._my_keep().put(self.manifest_text().encode(), 2175 copies=self.replication) 2176 2177 def portable_data_hash(self): 2178 stripped = self.stripped_manifest().encode() 2179 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) 2180 2181 def manifest_text(self): 2182 self.finish_current_stream() 2183 manifest = '' 2184 2185 for stream in self._finished_streams: 2186 if not re.search(r'^\.(/.*)?$', stream[0]): 2187 manifest += './' 2188 manifest += stream[0].replace(' ', '\\040') 2189 manifest += ' ' + ' '.join(stream[1]) 2190 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 2191 manifest += "\n" 2192 2193 return manifest 2194 2195 def data_locators(self): 2196 ret = [] 2197 for name, locators, files in self._finished_streams: 2198 ret += locators 2199 return ret 2200 2201 def save_new(self, name=None): 2202 return self._api_client.collections().create( 2203 ensure_unique_name=True, 2204 body={ 2205 'name': name, 2206 'manifest_text': self.manifest_text(), 2207 }).execute(num_retries=self.num_retries)
Create a new collection from scratch
1905 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 1906 def __init__(self, api_client=None, num_retries=0, replication=None): 1907 """Instantiate a CollectionWriter. 1908 1909 CollectionWriter lets you build a new Arvados Collection from scratch. 1910 Write files to it. The CollectionWriter will upload data to Keep as 1911 appropriate, and provide you with the Collection manifest text when 1912 you're finished. 1913 1914 Arguments: 1915 * api_client: The API client to use to look up Collections. If not 1916 provided, CollectionReader will build one from available Arvados 1917 configuration. 1918 * num_retries: The default number of times to retry failed 1919 service requests. Default 0. You may change this value 1920 after instantiation, but note those changes may not 1921 propagate to related objects like the Keep client. 1922 * replication: The number of copies of each block to store. 1923 If this argument is None or not supplied, replication is 1924 the server-provided default if available, otherwise 2. 1925 """ 1926 self._api_client = api_client 1927 self.num_retries = num_retries 1928 self.replication = (2 if replication is None else replication) 1929 self._keep_client = None 1930 self._data_buffer = [] 1931 self._data_buffer_len = 0 1932 self._current_stream_files = [] 1933 self._current_stream_length = 0 1934 self._current_stream_locators = [] 1935 self._current_stream_name = '.' 1936 self._current_file_name = None 1937 self._current_file_pos = 0 1938 self._finished_streams = [] 1939 self._close_file = None 1940 self._queued_file = None 1941 self._queued_dirents = deque() 1942 self._queued_trees = deque() 1943 self._last_open = None
Instantiate a CollectionWriter.
CollectionWriter lets you build a new Arvados Collection from scratch. Write files to it. The CollectionWriter will upload data to Keep as appropriate, and provide you with the Collection manifest text when you’re finished.
Arguments:
- api_client: The API client to use to look up Collections. If not provided, CollectionReader will build one from available Arvados configuration.
- num_retries: The default number of times to retry failed service requests. Default 0. You may change this value after instantiation, but note those changes may not propagate to related objects like the Keep client.
- replication: The number of copies of each block to store. If this argument is None or not supplied, replication is the server-provided default if available, otherwise 2.
1949 def do_queued_work(self): 1950 # The work queue consists of three pieces: 1951 # * _queued_file: The file object we're currently writing to the 1952 # Collection. 1953 # * _queued_dirents: Entries under the current directory 1954 # (_queued_trees[0]) that we want to write or recurse through. 1955 # This may contain files from subdirectories if 1956 # max_manifest_depth == 0 for this directory. 1957 # * _queued_trees: Directories that should be written as separate 1958 # streams to the Collection. 1959 # This function handles the smallest piece of work currently queued 1960 # (current file, then current directory, then next directory) until 1961 # no work remains. The _work_THING methods each do a unit of work on 1962 # THING. _queue_THING methods add a THING to the work queue. 1963 while True: 1964 if self._queued_file: 1965 self._work_file() 1966 elif self._queued_dirents: 1967 self._work_dirents() 1968 elif self._queued_trees: 1969 self._work_trees() 1970 else: 1971 break
2039 def write(self, newdata): 2040 if isinstance(newdata, bytes): 2041 pass 2042 elif isinstance(newdata, str): 2043 newdata = newdata.encode() 2044 elif hasattr(newdata, '__iter__'): 2045 for s in newdata: 2046 self.write(s) 2047 return 2048 self._data_buffer.append(newdata) 2049 self._data_buffer_len += len(newdata) 2050 self._current_stream_length += len(newdata) 2051 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 2052 self.flush_data()
2054 def open(self, streampath, filename=None): 2055 """open(streampath[, filename]) -> file-like object 2056 2057 Pass in the path of a file to write to the Collection, either as a 2058 single string or as two separate stream name and file name arguments. 2059 This method returns a file-like object you can write to add it to the 2060 Collection. 2061 2062 You may only have one file object from the Collection open at a time, 2063 so be sure to close the object when you're done. Using the object in 2064 a with statement makes that easy: 2065 2066 with cwriter.open('./doc/page1.txt') as outfile: 2067 outfile.write(page1_data) 2068 with cwriter.open('./doc/page2.txt') as outfile: 2069 outfile.write(page2_data) 2070 """ 2071 if filename is None: 2072 streampath, filename = split(streampath) 2073 if self._last_open and not self._last_open.closed: 2074 raise errors.AssertionError( 2075 u"can't open '{}' when '{}' is still open".format( 2076 filename, self._last_open.name)) 2077 if streampath != self.current_stream_name(): 2078 self.start_new_stream(streampath) 2079 self.set_current_file_name(filename) 2080 self._last_open = _WriterFile(self, filename) 2081 return self._last_open
open(streampath[, filename]) -> file-like object
Pass in the path of a file to write to the Collection, either as a single string or as two separate stream name and file name arguments. This method returns a file-like object you can write to add it to the Collection.
You may only have one file object from the Collection open at a time, so be sure to close the object when you’re done. Using the object in a with statement makes that easy:
with cwriter.open('./doc/page1.txt') as outfile:
outfile.write(page1_data)
with cwriter.open('./doc/page2.txt') as outfile:
outfile.write(page2_data)
2083 def flush_data(self): 2084 data_buffer = b''.join(self._data_buffer) 2085 if data_buffer: 2086 self._current_stream_locators.append( 2087 self._my_keep().put( 2088 data_buffer[0:config.KEEP_BLOCK_SIZE], 2089 copies=self.replication)) 2090 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 2091 self._data_buffer_len = len(self._data_buffer[0])
2097 def set_current_file_name(self, newfilename): 2098 if re.search(r'[\t\n]', newfilename): 2099 raise errors.AssertionError( 2100 "Manifest filenames cannot contain whitespace: %s" % 2101 newfilename) 2102 elif re.search(r'\x00', newfilename): 2103 raise errors.AssertionError( 2104 "Manifest filenames cannot contain NUL characters: %s" % 2105 newfilename) 2106 self._current_file_name = newfilename
2111 def finish_current_file(self): 2112 if self._current_file_name is None: 2113 if self._current_file_pos == self._current_stream_length: 2114 return 2115 raise errors.AssertionError( 2116 "Cannot finish an unnamed file " + 2117 "(%d bytes at offset %d in '%s' stream)" % 2118 (self._current_stream_length - self._current_file_pos, 2119 self._current_file_pos, 2120 self._current_stream_name)) 2121 self._current_stream_files.append([ 2122 self._current_file_pos, 2123 self._current_stream_length - self._current_file_pos, 2124 self._current_file_name]) 2125 self._current_file_pos = self._current_stream_length 2126 self._current_file_name = None
2142 def finish_current_stream(self): 2143 self.finish_current_file() 2144 self.flush_data() 2145 if not self._current_stream_files: 2146 pass 2147 elif self._current_stream_name is None: 2148 raise errors.AssertionError( 2149 "Cannot finish an unnamed stream (%d bytes in %d files)" % 2150 (self._current_stream_length, len(self._current_stream_files))) 2151 else: 2152 if not self._current_stream_locators: 2153 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 2154 self._finished_streams.append([self._current_stream_name, 2155 self._current_stream_locators, 2156 self._current_stream_files]) 2157 self._current_stream_files = [] 2158 self._current_stream_length = 0 2159 self._current_stream_locators = [] 2160 self._current_stream_name = None 2161 self._current_file_pos = 0 2162 self._current_file_name = None
2164 def finish(self): 2165 """Store the manifest in Keep and return its locator. 2166 2167 This is useful for storing manifest fragments (task outputs) 2168 temporarily in Keep during a Crunch job. 2169 2170 In other cases you should make a collection instead, by 2171 sending manifest_text() to the API server's "create 2172 collection" endpoint. 2173 """ 2174 return self._my_keep().put(self.manifest_text().encode(), 2175 copies=self.replication)
Store the manifest in Keep and return its locator.
This is useful for storing manifest fragments (task outputs) temporarily in Keep during a Crunch job.
In other cases you should make a collection instead, by sending manifest_text() to the API server’s “create collection” endpoint.
2181 def manifest_text(self): 2182 self.finish_current_stream() 2183 manifest = '' 2184 2185 for stream in self._finished_streams: 2186 if not re.search(r'^\.(/.*)?$', stream[0]): 2187 manifest += './' 2188 manifest += stream[0].replace(' ', '\\040') 2189 manifest += ' ' + ' '.join(stream[1]) 2190 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 2191 manifest += "\n" 2192 2193 return manifest
Inherited Members
2210class ResumableCollectionWriter(CollectionWriter): 2211 """CollectionWriter that can serialize internal state to disk 2212 2213 .. WARNING:: Deprecated 2214 This class is deprecated. Prefer `arvados.collection.Collection` 2215 instead. 2216 """ 2217 2218 STATE_PROPS = ['_current_stream_files', '_current_stream_length', 2219 '_current_stream_locators', '_current_stream_name', 2220 '_current_file_name', '_current_file_pos', '_close_file', 2221 '_data_buffer', '_dependencies', '_finished_streams', 2222 '_queued_dirents', '_queued_trees'] 2223 2224 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 2225 def __init__(self, api_client=None, **kwargs): 2226 self._dependencies = {} 2227 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) 2228 2229 @classmethod 2230 def from_state(cls, state, *init_args, **init_kwargs): 2231 # Try to build a new writer from scratch with the given state. 2232 # If the state is not suitable to resume (because files have changed, 2233 # been deleted, aren't predictable, etc.), raise a 2234 # StaleWriterStateError. Otherwise, return the initialized writer. 2235 # The caller is responsible for calling writer.do_queued_work() 2236 # appropriately after it's returned. 2237 writer = cls(*init_args, **init_kwargs) 2238 for attr_name in cls.STATE_PROPS: 2239 attr_value = state[attr_name] 2240 attr_class = getattr(writer, attr_name).__class__ 2241 # Coerce the value into the same type as the initial value, if 2242 # needed. 2243 if attr_class not in (type(None), attr_value.__class__): 2244 attr_value = attr_class(attr_value) 2245 setattr(writer, attr_name, attr_value) 2246 # Check dependencies before we try to resume anything. 2247 if any(KeepLocator(ls).permission_expired() 2248 for ls in writer._current_stream_locators): 2249 raise errors.StaleWriterStateError( 2250 "locators include expired permission hint") 2251 writer.check_dependencies() 2252 if state['_current_file'] is not None: 2253 path, pos = state['_current_file'] 2254 try: 2255 writer._queued_file = open(path, 'rb') 2256 writer._queued_file.seek(pos) 2257 except IOError as error: 2258 raise errors.StaleWriterStateError( 2259 u"failed to reopen active file {}: {}".format(path, error)) 2260 return writer 2261 2262 def check_dependencies(self): 2263 for path, orig_stat in listitems(self._dependencies): 2264 if not S_ISREG(orig_stat[ST_MODE]): 2265 raise errors.StaleWriterStateError(u"{} not file".format(path)) 2266 try: 2267 now_stat = tuple(os.stat(path)) 2268 except OSError as error: 2269 raise errors.StaleWriterStateError( 2270 u"failed to stat {}: {}".format(path, error)) 2271 if ((not S_ISREG(now_stat[ST_MODE])) or 2272 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 2273 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 2274 raise errors.StaleWriterStateError(u"{} changed".format(path)) 2275 2276 def dump_state(self, copy_func=lambda x: x): 2277 state = {attr: copy_func(getattr(self, attr)) 2278 for attr in self.STATE_PROPS} 2279 if self._queued_file is None: 2280 state['_current_file'] = None 2281 else: 2282 state['_current_file'] = (os.path.realpath(self._queued_file.name), 2283 self._queued_file.tell()) 2284 return state 2285 2286 def _queue_file(self, source, filename=None): 2287 try: 2288 src_path = os.path.realpath(source) 2289 except Exception: 2290 raise errors.AssertionError(u"{} not a file path".format(source)) 2291 try: 2292 path_stat = os.stat(src_path) 2293 except OSError as stat_error: 2294 path_stat = None 2295 super(ResumableCollectionWriter, self)._queue_file(source, filename) 2296 fd_stat = os.fstat(self._queued_file.fileno()) 2297 if not S_ISREG(fd_stat.st_mode): 2298 # We won't be able to resume from this cache anyway, so don't 2299 # worry about further checks. 2300 self._dependencies[source] = tuple(fd_stat) 2301 elif path_stat is None: 2302 raise errors.AssertionError( 2303 u"could not stat {}: {}".format(source, stat_error)) 2304 elif path_stat.st_ino != fd_stat.st_ino: 2305 raise errors.AssertionError( 2306 u"{} changed between open and stat calls".format(source)) 2307 else: 2308 self._dependencies[src_path] = tuple(fd_stat) 2309 2310 def write(self, data): 2311 if self._queued_file is None: 2312 raise errors.AssertionError( 2313 "resumable writer can't accept unsourced data") 2314 return super(ResumableCollectionWriter, self).write(data)
CollectionWriter that can serialize internal state to disk
2224 @arvados.util._deprecated('3.0', 'arvados.collection.Collection') 2225 def __init__(self, api_client=None, **kwargs): 2226 self._dependencies = {} 2227 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2229 @classmethod 2230 def from_state(cls, state, *init_args, **init_kwargs): 2231 # Try to build a new writer from scratch with the given state. 2232 # If the state is not suitable to resume (because files have changed, 2233 # been deleted, aren't predictable, etc.), raise a 2234 # StaleWriterStateError. Otherwise, return the initialized writer. 2235 # The caller is responsible for calling writer.do_queued_work() 2236 # appropriately after it's returned. 2237 writer = cls(*init_args, **init_kwargs) 2238 for attr_name in cls.STATE_PROPS: 2239 attr_value = state[attr_name] 2240 attr_class = getattr(writer, attr_name).__class__ 2241 # Coerce the value into the same type as the initial value, if 2242 # needed. 2243 if attr_class not in (type(None), attr_value.__class__): 2244 attr_value = attr_class(attr_value) 2245 setattr(writer, attr_name, attr_value) 2246 # Check dependencies before we try to resume anything. 2247 if any(KeepLocator(ls).permission_expired() 2248 for ls in writer._current_stream_locators): 2249 raise errors.StaleWriterStateError( 2250 "locators include expired permission hint") 2251 writer.check_dependencies() 2252 if state['_current_file'] is not None: 2253 path, pos = state['_current_file'] 2254 try: 2255 writer._queued_file = open(path, 'rb') 2256 writer._queued_file.seek(pos) 2257 except IOError as error: 2258 raise errors.StaleWriterStateError( 2259 u"failed to reopen active file {}: {}".format(path, error)) 2260 return writer
2262 def check_dependencies(self): 2263 for path, orig_stat in listitems(self._dependencies): 2264 if not S_ISREG(orig_stat[ST_MODE]): 2265 raise errors.StaleWriterStateError(u"{} not file".format(path)) 2266 try: 2267 now_stat = tuple(os.stat(path)) 2268 except OSError as error: 2269 raise errors.StaleWriterStateError( 2270 u"failed to stat {}: {}".format(path, error)) 2271 if ((not S_ISREG(now_stat[ST_MODE])) or 2272 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 2273 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 2274 raise errors.StaleWriterStateError(u"{} changed".format(path))
2276 def dump_state(self, copy_func=lambda x: x): 2277 state = {attr: copy_func(getattr(self, attr)) 2278 for attr in self.STATE_PROPS} 2279 if self._queued_file is None: 2280 state['_current_file'] = None 2281 else: 2282 state['_current_file'] = (os.path.realpath(self._queued_file.name), 2283 self._queued_file.tell()) 2284 return state
Inherited Members
- CollectionWriter
- num_retries
- replication
- do_queued_work
- write_file
- write_directory_tree
- open
- flush_data
- start_new_file
- set_current_file_name
- current_file_name
- finish_current_file
- start_new_stream
- set_current_stream_name
- current_stream_name
- finish_current_stream
- finish
- portable_data_hash
- manifest_text
- data_locators
- save_new