Package arvados :: Module collection
[hide private]
[frames] | no frames]

Source Code for Module arvados.collection

   1  # Copyright (C) The Arvados Authors. All rights reserved. 
   2  # 
   3  # SPDX-License-Identifier: Apache-2.0 
   4   
   5  from __future__ import absolute_import 
   6  from future.utils import listitems, listvalues, viewkeys 
   7  from builtins import str 
   8  from past.builtins import basestring 
   9  from builtins import object 
  10  import ciso8601 
  11  import datetime 
  12  import errno 
  13  import functools 
  14  import hashlib 
  15  import io 
  16  import logging 
  17  import os 
  18  import re 
  19  import sys 
  20  import threading 
  21  import time 
  22   
  23  from collections import deque 
  24  from stat import * 
  25   
  26  from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock 
  27  from .keep import KeepLocator, KeepClient 
  28  from .stream import StreamReader 
  29  from ._normalize_stream import normalize_stream, escape 
  30  from ._ranges import Range, LocatorAndRange 
  31  from .safeapi import ThreadSafeApiCache 
  32  import arvados.config as config 
  33  import arvados.errors as errors 
  34  import arvados.util 
  35  import arvados.events as events 
  36  from arvados.retry import retry_method 
  37   
  38  _logger = logging.getLogger('arvados.collection') 
  39   
  40   
  41  if sys.version_info >= (3, 0): 
  42      TextIOWrapper = io.TextIOWrapper 
  43  else: 
44 - class TextIOWrapper(io.TextIOWrapper):
45 """To maintain backward compatibility, cast str to unicode in 46 write('foo'). 47 48 """
49 - def write(self, data):
50 if isinstance(data, basestring): 51 data = unicode(data) 52 return super(TextIOWrapper, self).write(data)
53
54 55 -class CollectionBase(object):
56 """Abstract base class for Collection classes.""" 57
58 - def __enter__(self):
59 return self
60
61 - def __exit__(self, exc_type, exc_value, traceback):
62 pass
63
64 - def _my_keep(self):
65 if self._keep_client is None: 66 self._keep_client = KeepClient(api_client=self._api_client, 67 num_retries=self.num_retries) 68 return self._keep_client
69
70 - def stripped_manifest(self):
71 """Get the manifest with locator hints stripped. 72 73 Return the manifest for the current collection with all 74 non-portable hints (i.e., permission signatures and other 75 hints other than size hints) removed from the locators. 76 """ 77 raw = self.manifest_text() 78 clean = [] 79 for line in raw.split("\n"): 80 fields = line.split() 81 if fields: 82 clean_fields = fields[:1] + [ 83 (re.sub(r'\+[^\d][^\+]*', '', x) 84 if re.match(arvados.util.keep_locator_pattern, x) 85 else x) 86 for x in fields[1:]] 87 clean += [' '.join(clean_fields), "\n"] 88 return ''.join(clean)
89
90 91 -class _WriterFile(_FileLikeObjectBase):
92 - def __init__(self, coll_writer, name):
93 super(_WriterFile, self).__init__(name, 'wb') 94 self.dest = coll_writer
95
96 - def close(self):
97 super(_WriterFile, self).close() 98 self.dest.finish_current_file()
99 100 @_FileLikeObjectBase._before_close
101 - def write(self, data):
102 self.dest.write(data)
103 104 @_FileLikeObjectBase._before_close
105 - def writelines(self, seq):
106 for data in seq: 107 self.write(data)
108 109 @_FileLikeObjectBase._before_close
110 - def flush(self):
111 self.dest.flush_data()
112
113 114 -class CollectionWriter(CollectionBase):
115 """Deprecated, use Collection instead.""" 116
117 - def __init__(self, api_client=None, num_retries=0, replication=None):
118 """Instantiate a CollectionWriter. 119 120 CollectionWriter lets you build a new Arvados Collection from scratch. 121 Write files to it. The CollectionWriter will upload data to Keep as 122 appropriate, and provide you with the Collection manifest text when 123 you're finished. 124 125 Arguments: 126 * api_client: The API client to use to look up Collections. If not 127 provided, CollectionReader will build one from available Arvados 128 configuration. 129 * num_retries: The default number of times to retry failed 130 service requests. Default 0. You may change this value 131 after instantiation, but note those changes may not 132 propagate to related objects like the Keep client. 133 * replication: The number of copies of each block to store. 134 If this argument is None or not supplied, replication is 135 the server-provided default if available, otherwise 2. 136 """ 137 self._api_client = api_client 138 self.num_retries = num_retries 139 self.replication = (2 if replication is None else replication) 140 self._keep_client = None 141 self._data_buffer = [] 142 self._data_buffer_len = 0 143 self._current_stream_files = [] 144 self._current_stream_length = 0 145 self._current_stream_locators = [] 146 self._current_stream_name = '.' 147 self._current_file_name = None 148 self._current_file_pos = 0 149 self._finished_streams = [] 150 self._close_file = None 151 self._queued_file = None 152 self._queued_dirents = deque() 153 self._queued_trees = deque() 154 self._last_open = None
155
156 - def __exit__(self, exc_type, exc_value, traceback):
157 if exc_type is None: 158 self.finish()
159
160 - def do_queued_work(self):
161 # The work queue consists of three pieces: 162 # * _queued_file: The file object we're currently writing to the 163 # Collection. 164 # * _queued_dirents: Entries under the current directory 165 # (_queued_trees[0]) that we want to write or recurse through. 166 # This may contain files from subdirectories if 167 # max_manifest_depth == 0 for this directory. 168 # * _queued_trees: Directories that should be written as separate 169 # streams to the Collection. 170 # This function handles the smallest piece of work currently queued 171 # (current file, then current directory, then next directory) until 172 # no work remains. The _work_THING methods each do a unit of work on 173 # THING. _queue_THING methods add a THING to the work queue. 174 while True: 175 if self._queued_file: 176 self._work_file() 177 elif self._queued_dirents: 178 self._work_dirents() 179 elif self._queued_trees: 180 self._work_trees() 181 else: 182 break
183
184 - def _work_file(self):
185 while True: 186 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) 187 if not buf: 188 break 189 self.write(buf) 190 self.finish_current_file() 191 if self._close_file: 192 self._queued_file.close() 193 self._close_file = None 194 self._queued_file = None
195
196 - def _work_dirents(self):
197 path, stream_name, max_manifest_depth = self._queued_trees[0] 198 if stream_name != self.current_stream_name(): 199 self.start_new_stream(stream_name) 200 while self._queued_dirents: 201 dirent = self._queued_dirents.popleft() 202 target = os.path.join(path, dirent) 203 if os.path.isdir(target): 204 self._queue_tree(target, 205 os.path.join(stream_name, dirent), 206 max_manifest_depth - 1) 207 else: 208 self._queue_file(target, dirent) 209 break 210 if not self._queued_dirents: 211 self._queued_trees.popleft()
212
213 - def _work_trees(self):
214 path, stream_name, max_manifest_depth = self._queued_trees[0] 215 d = arvados.util.listdir_recursive( 216 path, max_depth = (None if max_manifest_depth == 0 else 0)) 217 if d: 218 self._queue_dirents(stream_name, d) 219 else: 220 self._queued_trees.popleft()
221
222 - def _queue_file(self, source, filename=None):
223 assert (self._queued_file is None), "tried to queue more than one file" 224 if not hasattr(source, 'read'): 225 source = open(source, 'rb') 226 self._close_file = True 227 else: 228 self._close_file = False 229 if filename is None: 230 filename = os.path.basename(source.name) 231 self.start_new_file(filename) 232 self._queued_file = source
233
234 - def _queue_dirents(self, stream_name, dirents):
235 assert (not self._queued_dirents), "tried to queue more than one tree" 236 self._queued_dirents = deque(sorted(dirents))
237
238 - def _queue_tree(self, path, stream_name, max_manifest_depth):
239 self._queued_trees.append((path, stream_name, max_manifest_depth))
240
241 - def write_file(self, source, filename=None):
242 self._queue_file(source, filename) 243 self.do_queued_work()
244
245 - def write_directory_tree(self, 246 path, stream_name='.', max_manifest_depth=-1):
247 self._queue_tree(path, stream_name, max_manifest_depth) 248 self.do_queued_work()
249
250 - def write(self, newdata):
251 if isinstance(newdata, bytes): 252 pass 253 elif isinstance(newdata, str): 254 newdata = newdata.encode() 255 elif hasattr(newdata, '__iter__'): 256 for s in newdata: 257 self.write(s) 258 return 259 self._data_buffer.append(newdata) 260 self._data_buffer_len += len(newdata) 261 self._current_stream_length += len(newdata) 262 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: 263 self.flush_data()
264
265 - def open(self, streampath, filename=None):
266 """open(streampath[, filename]) -> file-like object 267 268 Pass in the path of a file to write to the Collection, either as a 269 single string or as two separate stream name and file name arguments. 270 This method returns a file-like object you can write to add it to the 271 Collection. 272 273 You may only have one file object from the Collection open at a time, 274 so be sure to close the object when you're done. Using the object in 275 a with statement makes that easy:: 276 277 with cwriter.open('./doc/page1.txt') as outfile: 278 outfile.write(page1_data) 279 with cwriter.open('./doc/page2.txt') as outfile: 280 outfile.write(page2_data) 281 """ 282 if filename is None: 283 streampath, filename = split(streampath) 284 if self._last_open and not self._last_open.closed: 285 raise errors.AssertionError( 286 u"can't open '{}' when '{}' is still open".format( 287 filename, self._last_open.name)) 288 if streampath != self.current_stream_name(): 289 self.start_new_stream(streampath) 290 self.set_current_file_name(filename) 291 self._last_open = _WriterFile(self, filename) 292 return self._last_open
293
294 - def flush_data(self):
295 data_buffer = b''.join(self._data_buffer) 296 if data_buffer: 297 self._current_stream_locators.append( 298 self._my_keep().put( 299 data_buffer[0:config.KEEP_BLOCK_SIZE], 300 copies=self.replication)) 301 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] 302 self._data_buffer_len = len(self._data_buffer[0])
303
304 - def start_new_file(self, newfilename=None):
305 self.finish_current_file() 306 self.set_current_file_name(newfilename)
307
308 - def set_current_file_name(self, newfilename):
309 if re.search(r'[\t\n]', newfilename): 310 raise errors.AssertionError( 311 "Manifest filenames cannot contain whitespace: %s" % 312 newfilename) 313 elif re.search(r'\x00', newfilename): 314 raise errors.AssertionError( 315 "Manifest filenames cannot contain NUL characters: %s" % 316 newfilename) 317 self._current_file_name = newfilename
318
319 - def current_file_name(self):
320 return self._current_file_name
321
322 - def finish_current_file(self):
323 if self._current_file_name is None: 324 if self._current_file_pos == self._current_stream_length: 325 return 326 raise errors.AssertionError( 327 "Cannot finish an unnamed file " + 328 "(%d bytes at offset %d in '%s' stream)" % 329 (self._current_stream_length - self._current_file_pos, 330 self._current_file_pos, 331 self._current_stream_name)) 332 self._current_stream_files.append([ 333 self._current_file_pos, 334 self._current_stream_length - self._current_file_pos, 335 self._current_file_name]) 336 self._current_file_pos = self._current_stream_length 337 self._current_file_name = None
338
339 - def start_new_stream(self, newstreamname='.'):
340 self.finish_current_stream() 341 self.set_current_stream_name(newstreamname)
342
343 - def set_current_stream_name(self, newstreamname):
344 if re.search(r'[\t\n]', newstreamname): 345 raise errors.AssertionError( 346 "Manifest stream names cannot contain whitespace: '%s'" % 347 (newstreamname)) 348 self._current_stream_name = '.' if newstreamname=='' else newstreamname
349
350 - def current_stream_name(self):
351 return self._current_stream_name
352
353 - def finish_current_stream(self):
354 self.finish_current_file() 355 self.flush_data() 356 if not self._current_stream_files: 357 pass 358 elif self._current_stream_name is None: 359 raise errors.AssertionError( 360 "Cannot finish an unnamed stream (%d bytes in %d files)" % 361 (self._current_stream_length, len(self._current_stream_files))) 362 else: 363 if not self._current_stream_locators: 364 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) 365 self._finished_streams.append([self._current_stream_name, 366 self._current_stream_locators, 367 self._current_stream_files]) 368 self._current_stream_files = [] 369 self._current_stream_length = 0 370 self._current_stream_locators = [] 371 self._current_stream_name = None 372 self._current_file_pos = 0 373 self._current_file_name = None
374
375 - def finish(self):
376 """Store the manifest in Keep and return its locator. 377 378 This is useful for storing manifest fragments (task outputs) 379 temporarily in Keep during a Crunch job. 380 381 In other cases you should make a collection instead, by 382 sending manifest_text() to the API server's "create 383 collection" endpoint. 384 """ 385 return self._my_keep().put(self.manifest_text().encode(), 386 copies=self.replication)
387
388 - def portable_data_hash(self):
389 stripped = self.stripped_manifest().encode() 390 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
391
392 - def manifest_text(self):
393 self.finish_current_stream() 394 manifest = '' 395 396 for stream in self._finished_streams: 397 if not re.search(r'^\.(/.*)?$', stream[0]): 398 manifest += './' 399 manifest += stream[0].replace(' ', '\\040') 400 manifest += ' ' + ' '.join(stream[1]) 401 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) 402 manifest += "\n" 403 404 return manifest
405
406 - def data_locators(self):
407 ret = [] 408 for name, locators, files in self._finished_streams: 409 ret += locators 410 return ret
411
412 - def save_new(self, name=None):
413 return self._api_client.collections().create( 414 ensure_unique_name=True, 415 body={ 416 'name': name, 417 'manifest_text': self.manifest_text(), 418 }).execute(num_retries=self.num_retries)
419
420 421 -class ResumableCollectionWriter(CollectionWriter):
422 """Deprecated, use Collection instead.""" 423 424 STATE_PROPS = ['_current_stream_files', '_current_stream_length', 425 '_current_stream_locators', '_current_stream_name', 426 '_current_file_name', '_current_file_pos', '_close_file', 427 '_data_buffer', '_dependencies', '_finished_streams', 428 '_queued_dirents', '_queued_trees'] 429
430 - def __init__(self, api_client=None, **kwargs):
431 self._dependencies = {} 432 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
433 434 @classmethod
435 - def from_state(cls, state, *init_args, **init_kwargs):
436 # Try to build a new writer from scratch with the given state. 437 # If the state is not suitable to resume (because files have changed, 438 # been deleted, aren't predictable, etc.), raise a 439 # StaleWriterStateError. Otherwise, return the initialized writer. 440 # The caller is responsible for calling writer.do_queued_work() 441 # appropriately after it's returned. 442 writer = cls(*init_args, **init_kwargs) 443 for attr_name in cls.STATE_PROPS: 444 attr_value = state[attr_name] 445 attr_class = getattr(writer, attr_name).__class__ 446 # Coerce the value into the same type as the initial value, if 447 # needed. 448 if attr_class not in (type(None), attr_value.__class__): 449 attr_value = attr_class(attr_value) 450 setattr(writer, attr_name, attr_value) 451 # Check dependencies before we try to resume anything. 452 if any(KeepLocator(ls).permission_expired() 453 for ls in writer._current_stream_locators): 454 raise errors.StaleWriterStateError( 455 "locators include expired permission hint") 456 writer.check_dependencies() 457 if state['_current_file'] is not None: 458 path, pos = state['_current_file'] 459 try: 460 writer._queued_file = open(path, 'rb') 461 writer._queued_file.seek(pos) 462 except IOError as error: 463 raise errors.StaleWriterStateError( 464 u"failed to reopen active file {}: {}".format(path, error)) 465 return writer
466
467 - def check_dependencies(self):
468 for path, orig_stat in listitems(self._dependencies): 469 if not S_ISREG(orig_stat[ST_MODE]): 470 raise errors.StaleWriterStateError(u"{} not file".format(path)) 471 try: 472 now_stat = tuple(os.stat(path)) 473 except OSError as error: 474 raise errors.StaleWriterStateError( 475 u"failed to stat {}: {}".format(path, error)) 476 if ((not S_ISREG(now_stat[ST_MODE])) or 477 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or 478 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): 479 raise errors.StaleWriterStateError(u"{} changed".format(path))
480
481 - def dump_state(self, copy_func=lambda x: x):
482 state = {attr: copy_func(getattr(self, attr)) 483 for attr in self.STATE_PROPS} 484 if self._queued_file is None: 485 state['_current_file'] = None 486 else: 487 state['_current_file'] = (os.path.realpath(self._queued_file.name), 488 self._queued_file.tell()) 489 return state
490
491 - def _queue_file(self, source, filename=None):
492 try: 493 src_path = os.path.realpath(source) 494 except Exception: 495 raise errors.AssertionError(u"{} not a file path".format(source)) 496 try: 497 path_stat = os.stat(src_path) 498 except OSError as stat_error: 499 path_stat = None 500 super(ResumableCollectionWriter, self)._queue_file(source, filename) 501 fd_stat = os.fstat(self._queued_file.fileno()) 502 if not S_ISREG(fd_stat.st_mode): 503 # We won't be able to resume from this cache anyway, so don't 504 # worry about further checks. 505 self._dependencies[source] = tuple(fd_stat) 506 elif path_stat is None: 507 raise errors.AssertionError( 508 u"could not stat {}: {}".format(source, stat_error)) 509 elif path_stat.st_ino != fd_stat.st_ino: 510 raise errors.AssertionError( 511 u"{} changed between open and stat calls".format(source)) 512 else: 513 self._dependencies[src_path] = tuple(fd_stat)
514
515 - def write(self, data):
516 if self._queued_file is None: 517 raise errors.AssertionError( 518 "resumable writer can't accept unsourced data") 519 return super(ResumableCollectionWriter, self).write(data)
520 521 522 ADD = "add" 523 DEL = "del" 524 MOD = "mod" 525 TOK = "tok" 526 FILE = "file" 527 COLLECTION = "collection"
528 529 -class RichCollectionBase(CollectionBase):
530 """Base class for Collections and Subcollections. 531 532 Implements the majority of functionality relating to accessing items in the 533 Collection. 534 535 """ 536
537 - def __init__(self, parent=None):
538 self.parent = parent 539 self._committed = False 540 self._has_remote_blocks = False 541 self._callback = None 542 self._items = {}
543
544 - def _my_api(self):
545 raise NotImplementedError()
546
547 - def _my_keep(self):
548 raise NotImplementedError()
549
550 - def _my_block_manager(self):
551 raise NotImplementedError()
552
553 - def writable(self):
554 raise NotImplementedError()
555
556 - def root_collection(self):
557 raise NotImplementedError()
558
559 - def notify(self, event, collection, name, item):
560 raise NotImplementedError()
561
562 - def stream_name(self):
563 raise NotImplementedError()
564 565 566 @synchronized
567 - def has_remote_blocks(self):
568 """Recursively check for a +R segment locator signature.""" 569 570 if self._has_remote_blocks: 571 return True 572 for item in self: 573 if self[item].has_remote_blocks(): 574 return True 575 return False
576 577 @synchronized
578 - def set_has_remote_blocks(self, val):
579 self._has_remote_blocks = val 580 if self.parent: 581 self.parent.set_has_remote_blocks(val)
582 583 @must_be_writable 584 @synchronized
585 - def find_or_create(self, path, create_type):
586 """Recursively search the specified file path. 587 588 May return either a `Collection` or `ArvadosFile`. If not found, will 589 create a new item at the specified path based on `create_type`. Will 590 create intermediate subcollections needed to contain the final item in 591 the path. 592 593 :create_type: 594 One of `arvados.collection.FILE` or 595 `arvados.collection.COLLECTION`. If the path is not found, and value 596 of create_type is FILE then create and return a new ArvadosFile for 597 the last path component. If COLLECTION, then create and return a new 598 Collection for the last path component. 599 600 """ 601 602 pathcomponents = path.split("/", 1) 603 if pathcomponents[0]: 604 item = self._items.get(pathcomponents[0]) 605 if len(pathcomponents) == 1: 606 if item is None: 607 # create new file 608 if create_type == COLLECTION: 609 item = Subcollection(self, pathcomponents[0]) 610 else: 611 item = ArvadosFile(self, pathcomponents[0]) 612 self._items[pathcomponents[0]] = item 613 self.set_committed(False) 614 self.notify(ADD, self, pathcomponents[0], item) 615 return item 616 else: 617 if item is None: 618 # create new collection 619 item = Subcollection(self, pathcomponents[0]) 620 self._items[pathcomponents[0]] = item 621 self.set_committed(False) 622 self.notify(ADD, self, pathcomponents[0], item) 623 if isinstance(item, RichCollectionBase): 624 return item.find_or_create(pathcomponents[1], create_type) 625 else: 626 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 627 else: 628 return self
629 630 @synchronized
631 - def find(self, path):
632 """Recursively search the specified file path. 633 634 May return either a Collection or ArvadosFile. Return None if not 635 found. 636 If path is invalid (ex: starts with '/'), an IOError exception will be 637 raised. 638 639 """ 640 if not path: 641 raise errors.ArgumentError("Parameter 'path' is empty.") 642 643 pathcomponents = path.split("/", 1) 644 if pathcomponents[0] == '': 645 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) 646 647 item = self._items.get(pathcomponents[0]) 648 if item is None: 649 return None 650 elif len(pathcomponents) == 1: 651 return item 652 else: 653 if isinstance(item, RichCollectionBase): 654 if pathcomponents[1]: 655 return item.find(pathcomponents[1]) 656 else: 657 return item 658 else: 659 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
660 661 @synchronized
662 - def mkdirs(self, path):
663 """Recursive subcollection create. 664 665 Like `os.makedirs()`. Will create intermediate subcollections needed 666 to contain the leaf subcollection path. 667 668 """ 669 670 if self.find(path) != None: 671 raise IOError(errno.EEXIST, "Directory or file exists", path) 672 673 return self.find_or_create(path, COLLECTION)
674
675 - def open(self, path, mode="r", encoding=None):
676 """Open a file-like object for access. 677 678 :path: 679 path to a file in the collection 680 :mode: 681 a string consisting of "r", "w", or "a", optionally followed 682 by "b" or "t", optionally followed by "+". 683 :"b": 684 binary mode: write() accepts bytes, read() returns bytes. 685 :"t": 686 text mode (default): write() accepts strings, read() returns strings. 687 :"r": 688 opens for reading 689 :"r+": 690 opens for reading and writing. Reads/writes share a file pointer. 691 :"w", "w+": 692 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer. 693 :"a", "a+": 694 opens for reading and writing. All writes are appended to 695 the end of the file. Writing does not affect the file pointer for 696 reading. 697 698 """ 699 700 if not re.search(r'^[rwa][bt]?\+?$', mode): 701 raise errors.ArgumentError("Invalid mode {!r}".format(mode)) 702 703 if mode[0] == 'r' and '+' not in mode: 704 fclass = ArvadosFileReader 705 arvfile = self.find(path) 706 elif not self.writable(): 707 raise IOError(errno.EROFS, "Collection is read only") 708 else: 709 fclass = ArvadosFileWriter 710 arvfile = self.find_or_create(path, FILE) 711 712 if arvfile is None: 713 raise IOError(errno.ENOENT, "File not found", path) 714 if not isinstance(arvfile, ArvadosFile): 715 raise IOError(errno.EISDIR, "Is a directory", path) 716 717 if mode[0] == 'w': 718 arvfile.truncate(0) 719 720 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) 721 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) 722 if 'b' not in mode: 723 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader 724 f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) 725 return f
726
727 - def modified(self):
728 """Determine if the collection has been modified since last commited.""" 729 return not self.committed()
730 731 @synchronized
732 - def committed(self):
733 """Determine if the collection has been committed to the API server.""" 734 return self._committed
735 736 @synchronized
737 - def set_committed(self, value=True):
738 """Recursively set committed flag. 739 740 If value is True, set committed to be True for this and all children. 741 742 If value is False, set committed to be False for this and all parents. 743 """ 744 if value == self._committed: 745 return 746 if value: 747 for k,v in listitems(self._items): 748 v.set_committed(True) 749 self._committed = True 750 else: 751 self._committed = False 752 if self.parent is not None: 753 self.parent.set_committed(False)
754 755 @synchronized
756 - def __iter__(self):
757 """Iterate over names of files and collections contained in this collection.""" 758 return iter(viewkeys(self._items))
759 760 @synchronized
761 - def __getitem__(self, k):
762 """Get a file or collection that is directly contained by this collection. 763 764 If you want to search a path, use `find()` instead. 765 766 """ 767 return self._items[k]
768 769 @synchronized
770 - def __contains__(self, k):
771 """Test if there is a file or collection a directly contained by this collection.""" 772 return k in self._items
773 774 @synchronized
775 - def __len__(self):
776 """Get the number of items directly contained in this collection.""" 777 return len(self._items)
778 779 @must_be_writable 780 @synchronized
781 - def __delitem__(self, p):
782 """Delete an item by name which is directly contained by this collection.""" 783 del self._items[p] 784 self.set_committed(False) 785 self.notify(DEL, self, p, None)
786 787 @synchronized
788 - def keys(self):
789 """Get a list of names of files and collections directly contained in this collection.""" 790 return self._items.keys()
791 792 @synchronized
793 - def values(self):
794 """Get a list of files and collection objects directly contained in this collection.""" 795 return listvalues(self._items)
796 797 @synchronized
798 - def items(self):
799 """Get a list of (name, object) tuples directly contained in this collection.""" 800 return listitems(self._items)
801
802 - def exists(self, path):
803 """Test if there is a file or collection at `path`.""" 804 return self.find(path) is not None
805 806 @must_be_writable 807 @synchronized
808 - def remove(self, path, recursive=False):
809 """Remove the file or subcollection (directory) at `path`. 810 811 :recursive: 812 Specify whether to remove non-empty subcollections (True), or raise an error (False). 813 """ 814 815 if not path: 816 raise errors.ArgumentError("Parameter 'path' is empty.") 817 818 pathcomponents = path.split("/", 1) 819 item = self._items.get(pathcomponents[0]) 820 if item is None: 821 raise IOError(errno.ENOENT, "File not found", path) 822 if len(pathcomponents) == 1: 823 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive: 824 raise IOError(errno.ENOTEMPTY, "Directory not empty", path) 825 deleteditem = self._items[pathcomponents[0]] 826 del self._items[pathcomponents[0]] 827 self.set_committed(False) 828 self.notify(DEL, self, pathcomponents[0], deleteditem) 829 else: 830 item.remove(pathcomponents[1])
831
832 - def _clonefrom(self, source):
833 for k,v in listitems(source): 834 self._items[k] = v.clone(self, k)
835
836 - def clone(self):
837 raise NotImplementedError()
838 839 @must_be_writable 840 @synchronized
841 - def add(self, source_obj, target_name, overwrite=False, reparent=False):
842 """Copy or move a file or subcollection to this collection. 843 844 :source_obj: 845 An ArvadosFile, or Subcollection object 846 847 :target_name: 848 Destination item name. If the target name already exists and is a 849 file, this will raise an error unless you specify `overwrite=True`. 850 851 :overwrite: 852 Whether to overwrite target file if it already exists. 853 854 :reparent: 855 If True, source_obj will be moved from its parent collection to this collection. 856 If False, source_obj will be copied and the parent collection will be 857 unmodified. 858 859 """ 860 861 if target_name in self and not overwrite: 862 raise IOError(errno.EEXIST, "File already exists", target_name) 863 864 modified_from = None 865 if target_name in self: 866 modified_from = self[target_name] 867 868 # Actually make the move or copy. 869 if reparent: 870 source_obj._reparent(self, target_name) 871 item = source_obj 872 else: 873 item = source_obj.clone(self, target_name) 874 875 self._items[target_name] = item 876 self.set_committed(False) 877 if not self._has_remote_blocks and source_obj.has_remote_blocks(): 878 self.set_has_remote_blocks(True) 879 880 if modified_from: 881 self.notify(MOD, self, target_name, (modified_from, item)) 882 else: 883 self.notify(ADD, self, target_name, item)
884
885 - def _get_src_target(self, source, target_path, source_collection, create_dest):
886 if source_collection is None: 887 source_collection = self 888 889 # Find the object 890 if isinstance(source, basestring): 891 source_obj = source_collection.find(source) 892 if source_obj is None: 893 raise IOError(errno.ENOENT, "File not found", source) 894 sourcecomponents = source.split("/") 895 else: 896 source_obj = source 897 sourcecomponents = None 898 899 # Find parent collection the target path 900 targetcomponents = target_path.split("/") 901 902 # Determine the name to use. 903 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] 904 905 if not target_name: 906 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") 907 908 if create_dest: 909 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) 910 else: 911 if len(targetcomponents) > 1: 912 target_dir = self.find("/".join(targetcomponents[0:-1])) 913 else: 914 target_dir = self 915 916 if target_dir is None: 917 raise IOError(errno.ENOENT, "Target directory not found", target_name) 918 919 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents: 920 target_dir = target_dir[target_name] 921 target_name = sourcecomponents[-1] 922 923 return (source_obj, target_dir, target_name)
924 925 @must_be_writable 926 @synchronized
927 - def copy(self, source, target_path, source_collection=None, overwrite=False):
928 """Copy a file or subcollection to a new path in this collection. 929 930 :source: 931 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object. 932 933 :target_path: 934 Destination file or path. If the target path already exists and is a 935 subcollection, the item will be placed inside the subcollection. If 936 the target path already exists and is a file, this will raise an error 937 unless you specify `overwrite=True`. 938 939 :source_collection: 940 Collection to copy `source_path` from (default `self`) 941 942 :overwrite: 943 Whether to overwrite target file if it already exists. 944 """ 945 946 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) 947 target_dir.add(source_obj, target_name, overwrite, False)
948 949 @must_be_writable 950 @synchronized
951 - def rename(self, source, target_path, source_collection=None, overwrite=False):
952 """Move a file or subcollection from `source_collection` to a new path in this collection. 953 954 :source: 955 A string with a path to source file or subcollection. 956 957 :target_path: 958 Destination file or path. If the target path already exists and is a 959 subcollection, the item will be placed inside the subcollection. If 960 the target path already exists and is a file, this will raise an error 961 unless you specify `overwrite=True`. 962 963 :source_collection: 964 Collection to copy `source_path` from (default `self`) 965 966 :overwrite: 967 Whether to overwrite target file if it already exists. 968 """ 969 970 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) 971 if not source_obj.writable(): 972 raise IOError(errno.EROFS, "Source collection is read only", source) 973 target_dir.add(source_obj, target_name, overwrite, True)
974
975 - def portable_manifest_text(self, stream_name="."):
976 """Get the manifest text for this collection, sub collections and files. 977 978 This method does not flush outstanding blocks to Keep. It will return 979 a normalized manifest with access tokens stripped. 980 981 :stream_name: 982 Name to use for this stream (directory) 983 984 """ 985 return self._get_manifest_text(stream_name, True, True)
986 987 @synchronized
988 - def manifest_text(self, stream_name=".", strip=False, normalize=False, 989 only_committed=False):
990 """Get the manifest text for this collection, sub collections and files. 991 992 This method will flush outstanding blocks to Keep. By default, it will 993 not normalize an unmodified manifest or strip access tokens. 994 995 :stream_name: 996 Name to use for this stream (directory) 997 998 :strip: 999 If True, remove signing tokens from block locators if present. 1000 If False (default), block locators are left unchanged. 1001 1002 :normalize: 1003 If True, always export the manifest text in normalized form 1004 even if the Collection is not modified. If False (default) and the collection 1005 is not modified, return the original manifest text even if it is not 1006 in normalized form. 1007 1008 :only_committed: 1009 If True, don't commit pending blocks. 1010 1011 """ 1012 1013 if not only_committed: 1014 self._my_block_manager().commit_all() 1015 return self._get_manifest_text(stream_name, strip, normalize, 1016 only_committed=only_committed)
1017 1018 @synchronized
1019 - def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1020 """Get the manifest text for this collection, sub collections and files. 1021 1022 :stream_name: 1023 Name to use for this stream (directory) 1024 1025 :strip: 1026 If True, remove signing tokens from block locators if present. 1027 If False (default), block locators are left unchanged. 1028 1029 :normalize: 1030 If True, always export the manifest text in normalized form 1031 even if the Collection is not modified. If False (default) and the collection 1032 is not modified, return the original manifest text even if it is not 1033 in normalized form. 1034 1035 :only_committed: 1036 If True, only include blocks that were already committed to Keep. 1037 1038 """ 1039 1040 if not self.committed() or self._manifest_text is None or normalize: 1041 stream = {} 1042 buf = [] 1043 sorted_keys = sorted(self.keys()) 1044 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]: 1045 # Create a stream per file `k` 1046 arvfile = self[filename] 1047 filestream = [] 1048 for segment in arvfile.segments(): 1049 loc = segment.locator 1050 if arvfile.parent._my_block_manager().is_bufferblock(loc): 1051 if only_committed: 1052 continue 1053 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() 1054 if strip: 1055 loc = KeepLocator(loc).stripped() 1056 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size, 1057 segment.segment_offset, segment.range_size)) 1058 stream[filename] = filestream 1059 if stream: 1060 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n") 1061 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]: 1062 buf.append(self[dirname].manifest_text( 1063 stream_name=os.path.join(stream_name, dirname), 1064 strip=strip, normalize=True, only_committed=only_committed)) 1065 return "".join(buf) 1066 else: 1067 if strip: 1068 return self.stripped_manifest() 1069 else: 1070 return self._manifest_text
1071 1072 @synchronized
1073 - def _copy_remote_blocks(self, remote_blocks={}):
1074 """Scan through the entire collection and ask Keep to copy remote blocks. 1075 1076 When accessing a remote collection, blocks will have a remote signature 1077 (+R instead of +A). Collect these signatures and request Keep to copy the 1078 blocks to the local cluster, returning local (+A) signatures. 1079 1080 :remote_blocks: 1081 Shared cache of remote to local block mappings. This is used to avoid 1082 doing extra work when blocks are shared by more than one file in 1083 different subdirectories. 1084 1085 """ 1086 for item in self: 1087 remote_blocks = self[item]._copy_remote_blocks(remote_blocks) 1088 return remote_blocks
1089 1090 @synchronized
1091 - def diff(self, end_collection, prefix=".", holding_collection=None):
1092 """Generate list of add/modify/delete actions. 1093 1094 When given to `apply`, will change `self` to match `end_collection` 1095 1096 """ 1097 changes = [] 1098 if holding_collection is None: 1099 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep()) 1100 for k in self: 1101 if k not in end_collection: 1102 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, ""))) 1103 for k in end_collection: 1104 if k in self: 1105 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection): 1106 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection)) 1107 elif end_collection[k] != self[k]: 1108 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 1109 else: 1110 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, ""))) 1111 else: 1112 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, ""))) 1113 return changes
1114 1115 @must_be_writable 1116 @synchronized
1117 - def apply(self, changes):
1118 """Apply changes from `diff`. 1119 1120 If a change conflicts with a local change, it will be saved to an 1121 alternate path indicating the conflict. 1122 1123 """ 1124 if changes: 1125 self.set_committed(False) 1126 for change in changes: 1127 event_type = change[0] 1128 path = change[1] 1129 initial = change[2] 1130 local = self.find(path) 1131 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S", 1132 time.gmtime())) 1133 if event_type == ADD: 1134 if local is None: 1135 # No local file at path, safe to copy over new file 1136 self.copy(initial, path) 1137 elif local is not None and local != initial: 1138 # There is already local file and it is different: 1139 # save change to conflict file. 1140 self.copy(initial, conflictpath) 1141 elif event_type == MOD or event_type == TOK: 1142 final = change[3] 1143 if local == initial: 1144 # Local matches the "initial" item so it has not 1145 # changed locally and is safe to update. 1146 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile): 1147 # Replace contents of local file with new contents 1148 local.replace_contents(final) 1149 else: 1150 # Overwrite path with new item; this can happen if 1151 # path was a file and is now a collection or vice versa 1152 self.copy(final, path, overwrite=True) 1153 else: 1154 # Local is missing (presumably deleted) or local doesn't 1155 # match the "start" value, so save change to conflict file 1156 self.copy(final, conflictpath) 1157 elif event_type == DEL: 1158 if local == initial: 1159 # Local item matches "initial" value, so it is safe to remove. 1160 self.remove(path, recursive=True)
1161 # else, the file is modified or already removed, in either 1162 # case we don't want to try to remove it. 1163
1164 - def portable_data_hash(self):
1165 """Get the portable data hash for this collection's manifest.""" 1166 if self._manifest_locator and self.committed(): 1167 # If the collection is already saved on the API server, and it's committed 1168 # then return API server's PDH response. 1169 return self._portable_data_hash 1170 else: 1171 stripped = self.portable_manifest_text().encode() 1172 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1173 1174 @synchronized
1175 - def subscribe(self, callback):
1176 if self._callback is None: 1177 self._callback = callback 1178 else: 1179 raise errors.ArgumentError("A callback is already set on this collection.")
1180 1181 @synchronized
1182 - def unsubscribe(self):
1183 if self._callback is not None: 1184 self._callback = None
1185 1186 @synchronized
1187 - def notify(self, event, collection, name, item):
1188 if self._callback: 1189 self._callback(event, collection, name, item) 1190 self.root_collection().notify(event, collection, name, item)
1191 1192 @synchronized
1193 - def __eq__(self, other):
1194 if other is self: 1195 return True 1196 if not isinstance(other, RichCollectionBase): 1197 return False 1198 if len(self._items) != len(other): 1199 return False 1200 for k in self._items: 1201 if k not in other: 1202 return False 1203 if self._items[k] != other[k]: 1204 return False 1205 return True
1206
1207 - def __ne__(self, other):
1208 return not self.__eq__(other)
1209 1210 @synchronized
1211 - def flush(self):
1212 """Flush bufferblocks to Keep.""" 1213 for e in listvalues(self): 1214 e.flush()
1215
1216 1217 -class Collection(RichCollectionBase):
1218 """Represents the root of an Arvados Collection. 1219 1220 This class is threadsafe. The root collection object, all subcollections 1221 and files are protected by a single lock (i.e. each access locks the entire 1222 collection). 1223 1224 Brief summary of 1225 useful methods: 1226 1227 :To read an existing file: 1228 `c.open("myfile", "r")` 1229 1230 :To write a new file: 1231 `c.open("myfile", "w")` 1232 1233 :To determine if a file exists: 1234 `c.find("myfile") is not None` 1235 1236 :To copy a file: 1237 `c.copy("source", "dest")` 1238 1239 :To delete a file: 1240 `c.remove("myfile")` 1241 1242 :To save to an existing collection record: 1243 `c.save()` 1244 1245 :To save a new collection record: 1246 `c.save_new()` 1247 1248 :To merge remote changes into this object: 1249 `c.update()` 1250 1251 Must be associated with an API server Collection record (during 1252 initialization, or using `save_new`) to use `save` or `update` 1253 1254 """ 1255
1256 - def __init__(self, manifest_locator_or_text=None, 1257 api_client=None, 1258 keep_client=None, 1259 num_retries=None, 1260 parent=None, 1261 apiconfig=None, 1262 block_manager=None, 1263 replication_desired=None, 1264 put_threads=None):
1265 """Collection constructor. 1266 1267 :manifest_locator_or_text: 1268 An Arvados collection UUID, portable data hash, raw manifest 1269 text, or (if creating an empty collection) None. 1270 1271 :parent: 1272 the parent Collection, may be None. 1273 1274 :apiconfig: 1275 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN. 1276 Prefer this over supplying your own api_client and keep_client (except in testing). 1277 Will use default config settings if not specified. 1278 1279 :api_client: 1280 The API client object to use for requests. If not specified, create one using `apiconfig`. 1281 1282 :keep_client: 1283 the Keep client to use for requests. If not specified, create one using `apiconfig`. 1284 1285 :num_retries: 1286 the number of retries for API and Keep requests. 1287 1288 :block_manager: 1289 the block manager to use. If not specified, create one. 1290 1291 :replication_desired: 1292 How many copies should Arvados maintain. If None, API server default 1293 configuration applies. If not None, this value will also be used 1294 for determining the number of block copies being written. 1295 1296 """ 1297 super(Collection, self).__init__(parent) 1298 self._api_client = api_client 1299 self._keep_client = keep_client 1300 self._block_manager = block_manager 1301 self.replication_desired = replication_desired 1302 self.put_threads = put_threads 1303 1304 if apiconfig: 1305 self._config = apiconfig 1306 else: 1307 self._config = config.settings() 1308 1309 self.num_retries = num_retries if num_retries is not None else 0 1310 self._manifest_locator = None 1311 self._manifest_text = None 1312 self._portable_data_hash = None 1313 self._api_response = None 1314 self._past_versions = set() 1315 1316 self.lock = threading.RLock() 1317 self.events = None 1318 1319 if manifest_locator_or_text: 1320 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text): 1321 self._manifest_locator = manifest_locator_or_text 1322 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text): 1323 self._manifest_locator = manifest_locator_or_text 1324 if not self._has_local_collection_uuid(): 1325 self._has_remote_blocks = True 1326 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text): 1327 self._manifest_text = manifest_locator_or_text 1328 if '+R' in self._manifest_text: 1329 self._has_remote_blocks = True 1330 else: 1331 raise errors.ArgumentError( 1332 "Argument to CollectionReader is not a manifest or a collection UUID") 1333 1334 try: 1335 self._populate() 1336 except (IOError, errors.SyntaxError) as e: 1337 raise errors.ArgumentError("Error processing manifest text: %s", e)
1338
1339 - def root_collection(self):
1340 return self
1341
1342 - def get_properties(self):
1343 if self._api_response and self._api_response["properties"]: 1344 return self._api_response["properties"] 1345 else: 1346 return {}
1347
1348 - def get_trash_at(self):
1349 if self._api_response and self._api_response["trash_at"]: 1350 try: 1351 return ciso8601.parse_datetime(self._api_response["trash_at"]) 1352 except ValueError: 1353 return None 1354 else: 1355 return None
1356
1357 - def stream_name(self):
1358 return "."
1359
1360 - def writable(self):
1361 return True
1362 1363 @synchronized
1364 - def known_past_version(self, modified_at_and_portable_data_hash):
1365 return modified_at_and_portable_data_hash in self._past_versions
1366 1367 @synchronized 1368 @retry_method
1369 - def update(self, other=None, num_retries=None):
1370 """Merge the latest collection on the API server with the current collection.""" 1371 1372 if other is None: 1373 if self._manifest_locator is None: 1374 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") 1375 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries) 1376 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and 1377 response.get("portable_data_hash") != self.portable_data_hash()): 1378 # The record on the server is different from our current one, but we've seen it before, 1379 # so ignore it because it's already been merged. 1380 # However, if it's the same as our current record, proceed with the update, because we want to update 1381 # our tokens. 1382 return 1383 else: 1384 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash"))) 1385 other = CollectionReader(response["manifest_text"]) 1386 baseline = CollectionReader(self._manifest_text) 1387 self.apply(baseline.diff(other)) 1388 self._manifest_text = self.manifest_text()
1389 1390 @synchronized
1391 - def _my_api(self):
1392 if self._api_client is None: 1393 self._api_client = ThreadSafeApiCache(self._config) 1394 if self._keep_client is None: 1395 self._keep_client = self._api_client.keep 1396 return self._api_client
1397 1398 @synchronized
1399 - def _my_keep(self):
1400 if self._keep_client is None: 1401 if self._api_client is None: 1402 self._my_api() 1403 else: 1404 self._keep_client = KeepClient(api_client=self._api_client) 1405 return self._keep_client
1406 1407 @synchronized
1408 - def _my_block_manager(self):
1409 if self._block_manager is None: 1410 copies = (self.replication_desired or 1411 self._my_api()._rootDesc.get('defaultCollectionReplication', 1412 2)) 1413 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries) 1414 return self._block_manager
1415
1416 - def _remember_api_response(self, response):
1417 self._api_response = response 1418 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1419
1420 - def _populate_from_api_server(self):
1421 # As in KeepClient itself, we must wait until the last 1422 # possible moment to instantiate an API client, in order to 1423 # avoid tripping up clients that don't have access to an API 1424 # server. If we do build one, make sure our Keep client uses 1425 # it. If instantiation fails, we'll fall back to the except 1426 # clause, just like any other Collection lookup 1427 # failure. Return an exception, or None if successful. 1428 self._remember_api_response(self._my_api().collections().get( 1429 uuid=self._manifest_locator).execute( 1430 num_retries=self.num_retries)) 1431 self._manifest_text = self._api_response['manifest_text'] 1432 self._portable_data_hash = self._api_response['portable_data_hash'] 1433 # If not overriden via kwargs, we should try to load the 1434 # replication_desired from the API server 1435 if self.replication_desired is None: 1436 self.replication_desired = self._api_response.get('replication_desired', None)
1437
1438 - def _populate(self):
1439 if self._manifest_text is None: 1440 if self._manifest_locator is None: 1441 return 1442 else: 1443 self._populate_from_api_server() 1444 self._baseline_manifest = self._manifest_text 1445 self._import_manifest(self._manifest_text)
1446
1447 - def _has_collection_uuid(self):
1448 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1449
1450 - def _has_local_collection_uuid(self):
1451 return self._has_collection_uuid and \ 1452 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1453
1454 - def __enter__(self):
1455 return self
1456
1457 - def __exit__(self, exc_type, exc_value, traceback):
1458 """Support scoped auto-commit in a with: block.""" 1459 if exc_type is None: 1460 if self.writable() and self._has_collection_uuid(): 1461 self.save() 1462 self.stop_threads()
1463
1464 - def stop_threads(self):
1465 if self._block_manager is not None: 1466 self._block_manager.stop_threads()
1467 1468 @synchronized
1469 - def manifest_locator(self):
1470 """Get the manifest locator, if any. 1471 1472 The manifest locator will be set when the collection is loaded from an 1473 API server record or the portable data hash of a manifest. 1474 1475 The manifest locator will be None if the collection is newly created or 1476 was created directly from manifest text. The method `save_new()` will 1477 assign a manifest locator. 1478 1479 """ 1480 return self._manifest_locator
1481 1482 @synchronized
1483 - def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1484 if new_config is None: 1485 new_config = self._config 1486 if readonly: 1487 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config) 1488 else: 1489 newcollection = Collection(parent=new_parent, apiconfig=new_config) 1490 1491 newcollection._clonefrom(self) 1492 return newcollection
1493 1494 @synchronized
1495 - def api_response(self):
1496 """Returns information about this Collection fetched from the API server. 1497 1498 If the Collection exists in Keep but not the API server, currently 1499 returns None. Future versions may provide a synthetic response. 1500 1501 """ 1502 return self._api_response
1503
1504 - def find_or_create(self, path, create_type):
1505 """See `RichCollectionBase.find_or_create`""" 1506 if path == ".": 1507 return self 1508 else: 1509 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1510
1511 - def find(self, path):
1512 """See `RichCollectionBase.find`""" 1513 if path == ".": 1514 return self 1515 else: 1516 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1517
1518 - def remove(self, path, recursive=False):
1519 """See `RichCollectionBase.remove`""" 1520 if path == ".": 1521 raise errors.ArgumentError("Cannot remove '.'") 1522 else: 1523 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1524 1525 @must_be_writable 1526 @synchronized 1527 @retry_method
1528 - def save(self, 1529 properties=None, 1530 storage_classes=None, 1531 trash_at=None, 1532 merge=True, 1533 num_retries=None):
1534 """Save collection to an existing collection record. 1535 1536 Commit pending buffer blocks to Keep, merge with remote record (if 1537 merge=True, the default), and update the collection record. Returns 1538 the current manifest text. 1539 1540 Will raise AssertionError if not associated with a collection record on 1541 the API server. If you want to save a manifest to Keep only, see 1542 `save_new()`. 1543 1544 :properties: 1545 Additional properties of collection. This value will replace any existing 1546 properties of collection. 1547 1548 :storage_classes: 1549 Specify desirable storage classes to be used when writing data to Keep. 1550 1551 :trash_at: 1552 A collection is *expiring* when it has a *trash_at* time in the future. 1553 An expiring collection can be accessed as normal, 1554 but is scheduled to be trashed automatically at the *trash_at* time. 1555 1556 :merge: 1557 Update and merge remote changes before saving. Otherwise, any 1558 remote changes will be ignored and overwritten. 1559 1560 :num_retries: 1561 Retry count on API calls (if None, use the collection default) 1562 1563 """ 1564 if properties and type(properties) is not dict: 1565 raise errors.ArgumentError("properties must be dictionary type.") 1566 1567 if storage_classes and type(storage_classes) is not list: 1568 raise errors.ArgumentError("storage_classes must be list type.") 1569 1570 if trash_at and type(trash_at) is not datetime.datetime: 1571 raise errors.ArgumentError("trash_at must be datetime type.") 1572 1573 body={} 1574 if properties: 1575 body["properties"] = properties 1576 if storage_classes: 1577 body["storage_classes_desired"] = storage_classes 1578 if trash_at: 1579 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1580 body["trash_at"] = t 1581 1582 if not self.committed(): 1583 if self._has_remote_blocks: 1584 # Copy any remote blocks to the local cluster. 1585 self._copy_remote_blocks(remote_blocks={}) 1586 self._has_remote_blocks = False 1587 if not self._has_collection_uuid(): 1588 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.") 1589 elif not self._has_local_collection_uuid(): 1590 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.") 1591 1592 self._my_block_manager().commit_all() 1593 1594 if merge: 1595 self.update() 1596 1597 text = self.manifest_text(strip=False) 1598 body['manifest_text'] = text 1599 1600 self._remember_api_response(self._my_api().collections().update( 1601 uuid=self._manifest_locator, 1602 body=body 1603 ).execute(num_retries=num_retries)) 1604 self._manifest_text = self._api_response["manifest_text"] 1605 self._portable_data_hash = self._api_response["portable_data_hash"] 1606 self.set_committed(True) 1607 elif body: 1608 self._remember_api_response(self._my_api().collections().update( 1609 uuid=self._manifest_locator, 1610 body=body 1611 ).execute(num_retries=num_retries)) 1612 1613 return self._manifest_text
1614 1615 1616 @must_be_writable 1617 @synchronized 1618 @retry_method
1619 - def save_new(self, name=None, 1620 create_collection_record=True, 1621 owner_uuid=None, 1622 properties=None, 1623 storage_classes=None, 1624 trash_at=None, 1625 ensure_unique_name=False, 1626 num_retries=None):
1627 """Save collection to a new collection record. 1628 1629 Commit pending buffer blocks to Keep and, when create_collection_record 1630 is True (default), create a new collection record. After creating a 1631 new collection record, this Collection object will be associated with 1632 the new record used by `save()`. Returns the current manifest text. 1633 1634 :name: 1635 The collection name. 1636 1637 :create_collection_record: 1638 If True, create a collection record on the API server. 1639 If False, only commit blocks to Keep and return the manifest text. 1640 1641 :owner_uuid: 1642 the user, or project uuid that will own this collection. 1643 If None, defaults to the current user. 1644 1645 :properties: 1646 Additional properties of collection. This value will replace any existing 1647 properties of collection. 1648 1649 :storage_classes: 1650 Specify desirable storage classes to be used when writing data to Keep. 1651 1652 :trash_at: 1653 A collection is *expiring* when it has a *trash_at* time in the future. 1654 An expiring collection can be accessed as normal, 1655 but is scheduled to be trashed automatically at the *trash_at* time. 1656 1657 :ensure_unique_name: 1658 If True, ask the API server to rename the collection 1659 if it conflicts with a collection with the same name and owner. If 1660 False, a name conflict will result in an error. 1661 1662 :num_retries: 1663 Retry count on API calls (if None, use the collection default) 1664 1665 """ 1666 if properties and type(properties) is not dict: 1667 raise errors.ArgumentError("properties must be dictionary type.") 1668 1669 if storage_classes and type(storage_classes) is not list: 1670 raise errors.ArgumentError("storage_classes must be list type.") 1671 1672 if trash_at and type(trash_at) is not datetime.datetime: 1673 raise errors.ArgumentError("trash_at must be datetime type.") 1674 1675 if self._has_remote_blocks: 1676 # Copy any remote blocks to the local cluster. 1677 self._copy_remote_blocks(remote_blocks={}) 1678 self._has_remote_blocks = False 1679 1680 self._my_block_manager().commit_all() 1681 text = self.manifest_text(strip=False) 1682 1683 if create_collection_record: 1684 if name is None: 1685 name = "New collection" 1686 ensure_unique_name = True 1687 1688 body = {"manifest_text": text, 1689 "name": name, 1690 "replication_desired": self.replication_desired} 1691 if owner_uuid: 1692 body["owner_uuid"] = owner_uuid 1693 if properties: 1694 body["properties"] = properties 1695 if storage_classes: 1696 body["storage_classes_desired"] = storage_classes 1697 if trash_at: 1698 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ") 1699 body["trash_at"] = t 1700 1701 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)) 1702 text = self._api_response["manifest_text"] 1703 1704 self._manifest_locator = self._api_response["uuid"] 1705 self._portable_data_hash = self._api_response["portable_data_hash"] 1706 1707 self._manifest_text = text 1708 self.set_committed(True) 1709 1710 return text
1711 1712 _token_re = re.compile(r'(\S+)(\s+|$)') 1713 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*') 1714 _segment_re = re.compile(r'(\d+):(\d+):(\S+)') 1715
1716 - def _unescape_manifest_path(self, path):
1717 return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1718 1719 @synchronized
1720 - def _import_manifest(self, manifest_text):
1721 """Import a manifest into a `Collection`. 1722 1723 :manifest_text: 1724 The manifest text to import from. 1725 1726 """ 1727 if len(self) > 0: 1728 raise ArgumentError("Can only import manifest into an empty collection") 1729 1730 STREAM_NAME = 0 1731 BLOCKS = 1 1732 SEGMENTS = 2 1733 1734 stream_name = None 1735 state = STREAM_NAME 1736 1737 for token_and_separator in self._token_re.finditer(manifest_text): 1738 tok = token_and_separator.group(1) 1739 sep = token_and_separator.group(2) 1740 1741 if state == STREAM_NAME: 1742 # starting a new stream 1743 stream_name = self._unescape_manifest_path(tok) 1744 blocks = [] 1745 segments = [] 1746 streamoffset = 0 1747 state = BLOCKS 1748 self.find_or_create(stream_name, COLLECTION) 1749 continue 1750 1751 if state == BLOCKS: 1752 block_locator = self._block_re.match(tok) 1753 if block_locator: 1754 blocksize = int(block_locator.group(1)) 1755 blocks.append(Range(tok, streamoffset, blocksize, 0)) 1756 streamoffset += blocksize 1757 else: 1758 state = SEGMENTS 1759 1760 if state == SEGMENTS: 1761 file_segment = self._segment_re.match(tok) 1762 if file_segment: 1763 pos = int(file_segment.group(1)) 1764 size = int(file_segment.group(2)) 1765 name = self._unescape_manifest_path(file_segment.group(3)) 1766 if name.split('/')[-1] == '.': 1767 # placeholder for persisting an empty directory, not a real file 1768 if len(name) > 2: 1769 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION) 1770 else: 1771 filepath = os.path.join(stream_name, name) 1772 afile = self.find_or_create(filepath, FILE) 1773 if isinstance(afile, ArvadosFile): 1774 afile.add_segment(blocks, pos, size) 1775 else: 1776 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath) 1777 else: 1778 # error! 1779 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok) 1780 1781 if sep == "\n": 1782 stream_name = None 1783 state = STREAM_NAME 1784 1785 self.set_committed(True)
1786 1787 @synchronized
1788 - def notify(self, event, collection, name, item):
1789 if self._callback: 1790 self._callback(event, collection, name, item)
1791
1792 1793 -class Subcollection(RichCollectionBase):
1794 """This is a subdirectory within a collection that doesn't have its own API 1795 server record. 1796 1797 Subcollection locking falls under the umbrella lock of its root collection. 1798 1799 """ 1800
1801 - def __init__(self, parent, name):
1802 super(Subcollection, self).__init__(parent) 1803 self.lock = self.root_collection().lock 1804 self._manifest_text = None 1805 self.name = name 1806 self.num_retries = parent.num_retries
1807
1808 - def root_collection(self):
1809 return self.parent.root_collection()
1810
1811 - def writable(self):
1812 return self.root_collection().writable()
1813
1814 - def _my_api(self):
1815 return self.root_collection()._my_api()
1816
1817 - def _my_keep(self):
1818 return self.root_collection()._my_keep()
1819
1820 - def _my_block_manager(self):
1821 return self.root_collection()._my_block_manager()
1822
1823 - def stream_name(self):
1824 return os.path.join(self.parent.stream_name(), self.name)
1825 1826 @synchronized
1827 - def clone(self, new_parent, new_name):
1828 c = Subcollection(new_parent, new_name) 1829 c._clonefrom(self) 1830 return c
1831 1832 @must_be_writable 1833 @synchronized
1834 - def _reparent(self, newparent, newname):
1835 self.set_committed(False) 1836 self.flush() 1837 self.parent.remove(self.name, recursive=True) 1838 self.parent = newparent 1839 self.name = newname 1840 self.lock = self.parent.root_collection().lock
1841 1842 @synchronized
1843 - def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1844 """Encode empty directories by using an \056-named (".") empty file""" 1845 if len(self._items) == 0: 1846 return "%s %s 0:0:\\056\n" % ( 1847 escape(stream_name), config.EMPTY_BLOCK_LOCATOR) 1848 return super(Subcollection, self)._get_manifest_text(stream_name, 1849 strip, normalize, 1850 only_committed)
1851
1852 1853 -class CollectionReader(Collection):
1854 """A read-only collection object. 1855 1856 Initialize from a collection UUID or portable data hash, or raw 1857 manifest text. See `Collection` constructor for detailed options. 1858 1859 """
1860 - def __init__(self, manifest_locator_or_text, *args, **kwargs):
1861 self._in_init = True 1862 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs) 1863 self._in_init = False 1864 1865 # Forego any locking since it should never change once initialized. 1866 self.lock = NoopLock() 1867 1868 # Backwards compatability with old CollectionReader 1869 # all_streams() and all_files() 1870 self._streams = None
1871
1872 - def writable(self):
1873 return self._in_init
1874
1875 - def _populate_streams(orig_func):
1876 @functools.wraps(orig_func) 1877 def populate_streams_wrapper(self, *args, **kwargs): 1878 # Defer populating self._streams until needed since it creates a copy of the manifest. 1879 if self._streams is None: 1880 if self._manifest_text: 1881 self._streams = [sline.split() 1882 for sline in self._manifest_text.split("\n") 1883 if sline] 1884 else: 1885 self._streams = [] 1886 return orig_func(self, *args, **kwargs)
1887 return populate_streams_wrapper
1888 1889 @_populate_streams
1890 - def normalize(self):
1891 """Normalize the streams returned by `all_streams`. 1892 1893 This method is kept for backwards compatability and only affects the 1894 behavior of `all_streams()` and `all_files()` 1895 1896 """ 1897 1898 # Rearrange streams 1899 streams = {} 1900 for s in self.all_streams(): 1901 for f in s.all_files(): 1902 streamname, filename = split(s.name() + "/" + f.name()) 1903 if streamname not in streams: 1904 streams[streamname] = {} 1905 if filename not in streams[streamname]: 1906 streams[streamname][filename] = [] 1907 for r in f.segments: 1908 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size)) 1909 1910 self._streams = [normalize_stream(s, streams[s]) 1911 for s in sorted(streams)]
1912 @_populate_streams
1913 - def all_streams(self):
1914 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) 1915 for s in self._streams]
1916 1917 @_populate_streams
1918 - def all_files(self):
1919 for s in self.all_streams(): 1920 for f in s.all_files(): 1921 yield f
1922