arvados.keep
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import copy 6import collections 7import datetime 8import hashlib 9import errno 10import io 11import logging 12import math 13import os 14import pycurl 15import queue 16import re 17import socket 18import ssl 19import sys 20import threading 21import resource 22import urllib.parse 23import traceback 24import weakref 25 26from io import BytesIO 27 28import arvados 29import arvados.config as config 30import arvados.errors 31import arvados.retry as retry 32import arvados.util 33 34from ._internal import basedirs, diskcache, Timer 35from ._internal.pycurl import PyCurlHelper 36 37_logger = logging.getLogger('arvados.keep') 38global_client_object = None 39 40# Monkey patch TCP constants when not available (apple). Values sourced from: 41# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h 42if sys.platform == 'darwin': 43 if not hasattr(socket, 'TCP_KEEPALIVE'): 44 socket.TCP_KEEPALIVE = 0x010 45 if not hasattr(socket, 'TCP_KEEPINTVL'): 46 socket.TCP_KEEPINTVL = 0x101 47 if not hasattr(socket, 'TCP_KEEPCNT'): 48 socket.TCP_KEEPCNT = 0x102 49 50class KeepLocator(object): 51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 53 54 def __init__(self, locator_str): 55 self.hints = [] 56 self._perm_sig = None 57 self._perm_expiry = None 58 pieces = iter(locator_str.split('+')) 59 self.md5sum = next(pieces) 60 try: 61 self.size = int(next(pieces)) 62 except StopIteration: 63 self.size = None 64 for hint in pieces: 65 if self.HINT_RE.match(hint) is None: 66 raise ValueError("invalid hint format: {}".format(hint)) 67 elif hint.startswith('A'): 68 self.parse_permission_hint(hint) 69 else: 70 self.hints.append(hint) 71 72 def __str__(self): 73 return '+'.join( 74 str(s) 75 for s in [self.md5sum, self.size, 76 self.permission_hint()] + self.hints 77 if s is not None) 78 79 def stripped(self): 80 if self.size is not None: 81 return "%s+%i" % (self.md5sum, self.size) 82 else: 83 return self.md5sum 84 85 def _make_hex_prop(name, length): 86 # Build and return a new property with the given name that 87 # must be a hex string of the given length. 88 data_name = '_{}'.format(name) 89 def getter(self): 90 return getattr(self, data_name) 91 def setter(self, hex_str): 92 if not arvados.util.is_hex(hex_str, length): 93 raise ValueError("{} is not a {}-digit hex string: {!r}". 94 format(name, length, hex_str)) 95 setattr(self, data_name, hex_str) 96 return property(getter, setter) 97 98 md5sum = _make_hex_prop('md5sum', 32) 99 perm_sig = _make_hex_prop('perm_sig', 40) 100 101 @property 102 def perm_expiry(self): 103 return self._perm_expiry 104 105 @perm_expiry.setter 106 def perm_expiry(self, value): 107 if not arvados.util.is_hex(value, 1, 8): 108 raise ValueError( 109 "permission timestamp must be a hex Unix timestamp: {}". 110 format(value)) 111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 112 113 def permission_hint(self): 114 data = [self.perm_sig, self.perm_expiry] 115 if None in data: 116 return None 117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 118 return "A{}@{:08x}".format(*data) 119 120 def parse_permission_hint(self, s): 121 try: 122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 123 except IndexError: 124 raise ValueError("bad permission hint {}".format(s)) 125 126 def permission_expired(self, as_of_dt=None): 127 if self.perm_expiry is None: 128 return False 129 elif as_of_dt is None: 130 as_of_dt = datetime.datetime.now() 131 return self.perm_expiry <= as_of_dt 132 133 134class KeepBlockCache(object): 135 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 136 self.cache_max = cache_max 137 self._cache = collections.OrderedDict() 138 self._cache_lock = threading.Lock() 139 self._max_slots = max_slots 140 self._disk_cache = disk_cache 141 self._disk_cache_dir = disk_cache_dir 142 self._cache_updating = threading.Condition(self._cache_lock) 143 144 if self._disk_cache and self._disk_cache_dir is None: 145 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep')) 146 147 if self._max_slots == 0: 148 if self._disk_cache: 149 # Each block uses two file descriptors, one used to 150 # open it initially and hold the flock(), and a second 151 # hidden one used by mmap(). 152 # 153 # Set max slots to 1/8 of maximum file handles. This 154 # means we'll use at most 1/4 of total file handles. 155 # 156 # NOFILE typically defaults to 1024 on Linux so this 157 # is 128 slots (256 file handles), which means we can 158 # cache up to 8 GiB of 64 MiB blocks. This leaves 159 # 768 file handles for sockets and other stuff. 160 # 161 # When we want the ability to have more cache (e.g. in 162 # arv-mount) we'll increase rlimit before calling 163 # this. 164 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 165 else: 166 # RAM cache slots 167 self._max_slots = 512 168 169 if self.cache_max == 0: 170 if self._disk_cache: 171 fs = os.statvfs(self._disk_cache_dir) 172 # Calculation of available space incorporates existing cache usage 173 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 174 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 175 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 176 # pick smallest of: 177 # 10% of total disk size 178 # 25% of available space 179 # max_slots * 64 MiB 180 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 181 else: 182 # 256 MiB in RAM 183 self.cache_max = (256 * 1024 * 1024) 184 185 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 186 187 self.cache_total = 0 188 if self._disk_cache: 189 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 190 for slot in self._cache.values(): 191 self.cache_total += slot.size() 192 self.cap_cache() 193 194 class _CacheSlot: 195 __slots__ = ("locator", "ready", "content") 196 197 def __init__(self, locator): 198 self.locator = locator 199 self.ready = threading.Event() 200 self.content = None 201 202 def get(self): 203 self.ready.wait() 204 return self.content 205 206 def set(self, value): 207 if self.content is not None: 208 return False 209 self.content = value 210 self.ready.set() 211 return True 212 213 def size(self): 214 if self.content is None: 215 return 0 216 else: 217 return len(self.content) 218 219 def evict(self): 220 self.content = None 221 222 223 def _resize_cache(self, cache_max, max_slots): 224 # Try and make sure the contents of the cache do not exceed 225 # the supplied maximums. 226 227 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 228 return 229 230 _evict_candidates = collections.deque(self._cache.values()) 231 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 232 slot = _evict_candidates.popleft() 233 if not slot.ready.is_set(): 234 continue 235 236 sz = slot.size() 237 slot.evict() 238 self.cache_total -= sz 239 del self._cache[slot.locator] 240 241 242 def cap_cache(self): 243 '''Cap the cache size to self.cache_max''' 244 with self._cache_updating: 245 self._resize_cache(self.cache_max, self._max_slots) 246 self._cache_updating.notify_all() 247 248 def _get(self, locator): 249 # Test if the locator is already in the cache 250 if locator in self._cache: 251 n = self._cache[locator] 252 if n.ready.is_set() and n.content is None: 253 del self._cache[n.locator] 254 return None 255 self._cache.move_to_end(locator) 256 return n 257 if self._disk_cache: 258 # see if it exists on disk 259 n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 260 if n is not None: 261 self._cache[n.locator] = n 262 self.cache_total += n.size() 263 return n 264 return None 265 266 def get(self, locator): 267 with self._cache_lock: 268 return self._get(locator) 269 270 def reserve_cache(self, locator): 271 '''Reserve a cache slot for the specified locator, 272 or return the existing slot.''' 273 with self._cache_updating: 274 n = self._get(locator) 275 if n: 276 return n, False 277 else: 278 # Add a new cache slot for the locator 279 self._resize_cache(self.cache_max, self._max_slots-1) 280 while len(self._cache) >= self._max_slots: 281 # If there isn't a slot available, need to wait 282 # for something to happen that releases one of the 283 # cache slots. Idle for 200 ms or woken up by 284 # another thread 285 self._cache_updating.wait(timeout=0.2) 286 self._resize_cache(self.cache_max, self._max_slots-1) 287 288 if self._disk_cache: 289 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 290 else: 291 n = KeepBlockCache._CacheSlot(locator) 292 self._cache[n.locator] = n 293 return n, True 294 295 def set(self, slot, blob): 296 try: 297 if slot.set(blob): 298 self.cache_total += slot.size() 299 return 300 except OSError as e: 301 if e.errno == errno.ENOMEM: 302 # Reduce max slots to current - 4, cap cache and retry 303 with self._cache_lock: 304 self._max_slots = max(4, len(self._cache) - 4) 305 elif e.errno == errno.ENOSPC: 306 # Reduce disk max space to current - 256 MiB, cap cache and retry 307 with self._cache_lock: 308 sm = sum(st.size() for st in self._cache.values()) 309 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 310 elif e.errno == errno.ENODEV: 311 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 312 except Exception as e: 313 pass 314 finally: 315 # Check if we should evict things from the cache. Either 316 # because we added a new thing or there was an error and 317 # we possibly adjusted the limits down, so we might need 318 # to push something out. 319 self.cap_cache() 320 321 try: 322 # Only gets here if there was an error the first time. The 323 # exception handler adjusts limits downward in some cases 324 # to free up resources, which would make the operation 325 # succeed. 326 if slot.set(blob): 327 self.cache_total += slot.size() 328 except Exception as e: 329 # It failed again. Give up. 330 slot.set(None) 331 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 332 333 self.cap_cache() 334 335 336class _Counter: 337 def __init__(self, v=0): 338 self._lk = threading.Lock() 339 self._val = v 340 341 def add(self, v): 342 with self._lk: 343 self._val += v 344 345 def get(self): 346 with self._lk: 347 return self._val 348 349 350class KeepClient(object): 351 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 352 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 353 354 class _KeepService(PyCurlHelper): 355 """Make requests to a single Keep service, and track results. 356 357 A _KeepService is intended to last long enough to perform one 358 transaction (GET or PUT) against one Keep service. This can 359 involve calling either get() or put() multiple times in order 360 to retry after transient failures. However, calling both get() 361 and put() on a single instance -- or using the same instance 362 to access two different Keep services -- will not produce 363 sensible behavior. 364 """ 365 366 HTTP_ERRORS = ( 367 socket.error, 368 ssl.SSLError, 369 arvados.errors.HttpError, 370 ) 371 372 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 373 upload_counter=None, 374 download_counter=None, 375 headers={}, 376 insecure=False): 377 super().__init__() 378 self.root = root 379 self._user_agent_pool = user_agent_pool 380 self._result = {'error': None} 381 self._usable = True 382 self._session = None 383 self._socket = None 384 self.get_headers = {'Accept': 'application/octet-stream'} 385 self.get_headers.update(headers) 386 self.put_headers = headers 387 self.upload_counter = upload_counter 388 self.download_counter = download_counter 389 self.insecure = insecure 390 391 def usable(self): 392 """Is it worth attempting a request?""" 393 return self._usable 394 395 def finished(self): 396 """Did the request succeed or encounter permanent failure?""" 397 return self._result['error'] == False or not self._usable 398 399 def last_result(self): 400 return self._result 401 402 def _get_user_agent(self): 403 try: 404 return self._user_agent_pool.get(block=False) 405 except queue.Empty: 406 return pycurl.Curl() 407 408 def _put_user_agent(self, ua): 409 try: 410 ua.reset() 411 self._user_agent_pool.put(ua, block=False) 412 except: 413 ua.close() 414 415 def get(self, locator, method="GET", timeout=None): 416 # locator is a KeepLocator object. 417 url = self.root + str(locator) 418 _logger.debug("Request: %s %s", method, url) 419 curl = self._get_user_agent() 420 ok = None 421 try: 422 with Timer() as t: 423 self._headers = {} 424 response_body = BytesIO() 425 curl.setopt(pycurl.NOSIGNAL, 1) 426 curl.setopt(pycurl.OPENSOCKETFUNCTION, 427 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 428 curl.setopt(pycurl.URL, url.encode('utf-8')) 429 curl.setopt(pycurl.HTTPHEADER, [ 430 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 431 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 432 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 433 if self.insecure: 434 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 435 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 436 else: 437 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 438 if method == "HEAD": 439 curl.setopt(pycurl.NOBODY, True) 440 else: 441 curl.setopt(pycurl.HTTPGET, True) 442 self._setcurltimeouts(curl, timeout, method=="HEAD") 443 444 try: 445 curl.perform() 446 except Exception as e: 447 raise arvados.errors.HttpError(0, str(e)) 448 finally: 449 if self._socket: 450 self._socket.close() 451 self._socket = None 452 self._result = { 453 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 454 'body': response_body.getvalue(), 455 'headers': self._headers, 456 'error': False, 457 } 458 459 ok = retry.check_http_response_success(self._result['status_code']) 460 if not ok: 461 self._result['error'] = arvados.errors.HttpError( 462 self._result['status_code'], 463 self._headers.get('x-status-line', 'Error')) 464 except self.HTTP_ERRORS as e: 465 self._result = { 466 'error': e, 467 } 468 self._usable = ok != False 469 if self._result.get('status_code', None): 470 # The client worked well enough to get an HTTP status 471 # code, so presumably any problems are just on the 472 # server side and it's OK to reuse the client. 473 self._put_user_agent(curl) 474 else: 475 # Don't return this client to the pool, in case it's 476 # broken. 477 curl.close() 478 if not ok: 479 _logger.debug("Request fail: GET %s => %s: %s", 480 url, type(self._result['error']), str(self._result['error'])) 481 return None 482 if method == "HEAD": 483 _logger.info("HEAD %s: %s bytes", 484 self._result['status_code'], 485 self._result.get('content-length')) 486 if self._result['headers'].get('x-keep-locator'): 487 # This is a response to a remote block copy request, return 488 # the local copy block locator. 489 return self._result['headers'].get('x-keep-locator') 490 return True 491 492 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 493 self._result['status_code'], 494 len(self._result['body']), 495 t.msecs, 496 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 497 498 if self.download_counter: 499 self.download_counter.add(len(self._result['body'])) 500 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 501 if resp_md5 != locator.md5sum: 502 _logger.warning("Checksum fail: md5(%s) = %s", 503 url, resp_md5) 504 self._result['error'] = arvados.errors.HttpError( 505 0, 'Checksum fail') 506 return None 507 return self._result['body'] 508 509 def put(self, hash_s, body, timeout=None, headers={}): 510 put_headers = copy.copy(self.put_headers) 511 put_headers.update(headers) 512 url = self.root + hash_s 513 _logger.debug("Request: PUT %s", url) 514 curl = self._get_user_agent() 515 ok = None 516 try: 517 with Timer() as t: 518 self._headers = {} 519 body_reader = BytesIO(body) 520 response_body = BytesIO() 521 curl.setopt(pycurl.NOSIGNAL, 1) 522 curl.setopt(pycurl.OPENSOCKETFUNCTION, 523 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 524 curl.setopt(pycurl.URL, url.encode('utf-8')) 525 # Using UPLOAD tells cURL to wait for a "go ahead" from the 526 # Keep server (in the form of a HTTP/1.1 "100 Continue" 527 # response) instead of sending the request body immediately. 528 # This allows the server to reject the request if the request 529 # is invalid or the server is read-only, without waiting for 530 # the client to send the entire block. 531 curl.setopt(pycurl.UPLOAD, True) 532 curl.setopt(pycurl.INFILESIZE, len(body)) 533 curl.setopt(pycurl.READFUNCTION, body_reader.read) 534 curl.setopt(pycurl.HTTPHEADER, [ 535 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 536 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 537 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 538 if self.insecure: 539 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 540 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 541 else: 542 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 543 self._setcurltimeouts(curl, timeout) 544 try: 545 curl.perform() 546 except Exception as e: 547 raise arvados.errors.HttpError(0, str(e)) 548 finally: 549 if self._socket: 550 self._socket.close() 551 self._socket = None 552 self._result = { 553 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 554 'body': response_body.getvalue().decode('utf-8'), 555 'headers': self._headers, 556 'error': False, 557 } 558 ok = retry.check_http_response_success(self._result['status_code']) 559 if not ok: 560 self._result['error'] = arvados.errors.HttpError( 561 self._result['status_code'], 562 self._headers.get('x-status-line', 'Error')) 563 except self.HTTP_ERRORS as e: 564 self._result = { 565 'error': e, 566 } 567 self._usable = ok != False # still usable if ok is True or None 568 if self._result.get('status_code', None): 569 # Client is functional. See comment in get(). 570 self._put_user_agent(curl) 571 else: 572 curl.close() 573 if not ok: 574 _logger.debug("Request fail: PUT %s => %s: %s", 575 url, type(self._result['error']), str(self._result['error'])) 576 return False 577 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 578 self._result['status_code'], 579 len(body), 580 t.msecs, 581 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 582 if self.upload_counter: 583 self.upload_counter.add(len(body)) 584 return True 585 586 587 class _KeepWriterQueue(queue.Queue): 588 def __init__(self, copies, classes=[]): 589 queue.Queue.__init__(self) # Old-style superclass 590 self.wanted_copies = copies 591 self.wanted_storage_classes = classes 592 self.successful_copies = 0 593 self.confirmed_storage_classes = {} 594 self.response = None 595 self.storage_classes_tracking = True 596 self.queue_data_lock = threading.RLock() 597 self.pending_tries = max(copies, len(classes)) 598 self.pending_tries_notification = threading.Condition() 599 600 def write_success(self, response, replicas_nr, classes_confirmed): 601 with self.queue_data_lock: 602 self.successful_copies += replicas_nr 603 if classes_confirmed is None: 604 self.storage_classes_tracking = False 605 elif self.storage_classes_tracking: 606 for st_class, st_copies in classes_confirmed.items(): 607 try: 608 self.confirmed_storage_classes[st_class] += st_copies 609 except KeyError: 610 self.confirmed_storage_classes[st_class] = st_copies 611 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 612 self.response = response 613 with self.pending_tries_notification: 614 self.pending_tries_notification.notify_all() 615 616 def write_fail(self, ks): 617 with self.pending_tries_notification: 618 self.pending_tries += 1 619 self.pending_tries_notification.notify() 620 621 def pending_copies(self): 622 with self.queue_data_lock: 623 return self.wanted_copies - self.successful_copies 624 625 def satisfied_classes(self): 626 with self.queue_data_lock: 627 if not self.storage_classes_tracking: 628 # Notifies disabled storage classes expectation to 629 # the outer loop. 630 return None 631 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 632 633 def pending_classes(self): 634 with self.queue_data_lock: 635 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 636 return [] 637 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 638 for st_class, st_copies in self.confirmed_storage_classes.items(): 639 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 640 unsatisfied_classes.remove(st_class) 641 return unsatisfied_classes 642 643 def get_next_task(self): 644 with self.pending_tries_notification: 645 while True: 646 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 647 # This notify_all() is unnecessary -- 648 # write_success() already called notify_all() 649 # when pending<1 became true, so it's not 650 # possible for any other thread to be in 651 # wait() now -- but it's cheap insurance 652 # against deadlock so we do it anyway: 653 self.pending_tries_notification.notify_all() 654 # Drain the queue and then raise Queue.Empty 655 while True: 656 self.get_nowait() 657 self.task_done() 658 elif self.pending_tries > 0: 659 service, service_root = self.get_nowait() 660 if service.finished(): 661 self.task_done() 662 continue 663 self.pending_tries -= 1 664 return service, service_root 665 elif self.empty(): 666 self.pending_tries_notification.notify_all() 667 raise queue.Empty 668 else: 669 self.pending_tries_notification.wait() 670 671 672 class _KeepWriterThreadPool: 673 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 674 self.total_task_nr = 0 675 if (not max_service_replicas) or (max_service_replicas >= copies): 676 num_threads = 1 677 else: 678 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 679 _logger.debug("Pool max threads is %d", num_threads) 680 self.workers = [] 681 self.queue = KeepClient._KeepWriterQueue(copies, classes) 682 # Create workers 683 for _ in range(num_threads): 684 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout) 685 self.workers.append(w) 686 687 def add_task(self, ks, service_root): 688 self.queue.put((ks, service_root)) 689 self.total_task_nr += 1 690 691 def done(self): 692 return self.queue.successful_copies, self.queue.satisfied_classes() 693 694 def join(self): 695 # Start workers 696 for worker in self.workers: 697 worker.start() 698 # Wait for finished work 699 self.queue.join() 700 701 def response(self): 702 return self.queue.response 703 704 705 class _KeepWriterThread(threading.Thread): 706 class TaskFailed(RuntimeError): 707 """Exception for failed Keep writes 708 709 TODO: Move this class to the module top level and document it 710 711 @private 712 """ 713 714 715 def __init__(self, queue, data, data_hash, timeout=None): 716 super().__init__() 717 self.timeout = timeout 718 self.queue = queue 719 self.data = data 720 self.data_hash = data_hash 721 self.daemon = True 722 723 def run(self): 724 while True: 725 try: 726 service, service_root = self.queue.get_next_task() 727 except queue.Empty: 728 return 729 try: 730 locator, copies, classes = self.do_task(service, service_root) 731 except Exception as e: 732 if not isinstance(e, self.TaskFailed): 733 _logger.exception("Exception in _KeepWriterThread") 734 self.queue.write_fail(service) 735 else: 736 self.queue.write_success(locator, copies, classes) 737 finally: 738 self.queue.task_done() 739 740 def do_task(self, service, service_root): 741 classes = self.queue.pending_classes() 742 headers = {} 743 if len(classes) > 0: 744 classes.sort() 745 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 746 success = bool(service.put(self.data_hash, 747 self.data, 748 timeout=self.timeout, 749 headers=headers)) 750 result = service.last_result() 751 752 if not success: 753 if result.get('status_code'): 754 _logger.debug("Request fail: PUT %s => %s %s", 755 self.data_hash, 756 result.get('status_code'), 757 result.get('body')) 758 raise self.TaskFailed() 759 760 _logger.debug("_KeepWriterThread %s succeeded %s+%i %s", 761 str(threading.current_thread()), 762 self.data_hash, 763 len(self.data), 764 service_root) 765 try: 766 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 767 except (KeyError, ValueError): 768 replicas_stored = 1 769 770 classes_confirmed = {} 771 try: 772 scch = result['headers']['x-keep-storage-classes-confirmed'] 773 for confirmation in scch.replace(' ', '').split(','): 774 if '=' in confirmation: 775 stored_class, stored_copies = confirmation.split('=')[:2] 776 classes_confirmed[stored_class] = int(stored_copies) 777 except (KeyError, ValueError): 778 # Storage classes confirmed header missing or corrupt 779 classes_confirmed = None 780 781 return result['body'].strip(), replicas_stored, classes_confirmed 782 783 784 def __init__(self, api_client=None, proxy=None, 785 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 786 api_token=None, local_store=None, block_cache=None, 787 num_retries=10, session=None, num_prefetch_threads=None): 788 """Initialize a new KeepClient. 789 790 Arguments: 791 :api_client: 792 The API client to use to find Keep services. If not 793 provided, KeepClient will build one from available Arvados 794 configuration. 795 796 :proxy: 797 If specified, this KeepClient will send requests to this Keep 798 proxy. Otherwise, KeepClient will fall back to the setting of the 799 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 800 If you want to KeepClient does not use a proxy, pass in an empty 801 string. 802 803 :timeout: 804 The initial timeout (in seconds) for HTTP requests to Keep 805 non-proxy servers. A tuple of three floats is interpreted as 806 (connection_timeout, read_timeout, minimum_bandwidth). A connection 807 will be aborted if the average traffic rate falls below 808 minimum_bandwidth bytes per second over an interval of read_timeout 809 seconds. Because timeouts are often a result of transient server 810 load, the actual connection timeout will be increased by a factor 811 of two on each retry. 812 Default: (2, 256, 32768). 813 814 :proxy_timeout: 815 The initial timeout (in seconds) for HTTP requests to 816 Keep proxies. A tuple of three floats is interpreted as 817 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 818 described above for adjusting connection timeouts on retry also 819 applies. 820 Default: (20, 256, 32768). 821 822 :api_token: 823 If you're not using an API client, but only talking 824 directly to a Keep proxy, this parameter specifies an API token 825 to authenticate Keep requests. It is an error to specify both 826 api_client and api_token. If you specify neither, KeepClient 827 will use one available from the Arvados configuration. 828 829 :local_store: 830 If specified, this KeepClient will bypass Keep 831 services, and save data to the named directory. If unspecified, 832 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 833 environment variable. If you want to ensure KeepClient does not 834 use local storage, pass in an empty string. This is primarily 835 intended to mock a server for testing. 836 837 :num_retries: 838 The default number of times to retry failed requests. 839 This will be used as the default num_retries value when get() and 840 put() are called. Default 10. 841 """ 842 self.lock = threading.Lock() 843 if proxy is None: 844 if config.get('ARVADOS_KEEP_SERVICES'): 845 proxy = config.get('ARVADOS_KEEP_SERVICES') 846 else: 847 proxy = config.get('ARVADOS_KEEP_PROXY') 848 if api_token is None: 849 if api_client is None: 850 api_token = config.get('ARVADOS_API_TOKEN') 851 else: 852 api_token = api_client.api_token 853 elif api_client is not None: 854 raise ValueError( 855 "can't build KeepClient with both API client and token") 856 if local_store is None: 857 local_store = os.environ.get('KEEP_LOCAL_STORE') 858 859 if api_client is None: 860 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 861 else: 862 self.insecure = api_client.insecure 863 864 self.block_cache = block_cache if block_cache else KeepBlockCache() 865 self.timeout = timeout 866 self.proxy_timeout = proxy_timeout 867 self._user_agent_pool = queue.LifoQueue() 868 self.upload_counter = _Counter() 869 self.download_counter = _Counter() 870 self.put_counter = _Counter() 871 self.get_counter = _Counter() 872 self.hits_counter = _Counter() 873 self.misses_counter = _Counter() 874 self._storage_classes_unsupported_warning = False 875 self._default_classes = [] 876 if num_prefetch_threads is not None: 877 self.num_prefetch_threads = num_prefetch_threads 878 else: 879 self.num_prefetch_threads = 2 880 self._prefetch_queue = None 881 self._prefetch_threads = None 882 883 if local_store: 884 self.local_store = local_store 885 self.head = self.local_store_head 886 self.get = self.local_store_get 887 self.put = self.local_store_put 888 else: 889 self.num_retries = num_retries 890 self.max_replicas_per_service = None 891 if proxy: 892 proxy_uris = proxy.split() 893 for i in range(len(proxy_uris)): 894 if not proxy_uris[i].endswith('/'): 895 proxy_uris[i] += '/' 896 # URL validation 897 url = urllib.parse.urlparse(proxy_uris[i]) 898 if not (url.scheme and url.netloc): 899 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 900 self.api_token = api_token 901 self._gateway_services = {} 902 self._keep_services = [{ 903 'uuid': "00000-bi6l4-%015d" % idx, 904 'service_type': 'proxy', 905 '_service_root': uri, 906 } for idx, uri in enumerate(proxy_uris)] 907 self._writable_services = self._keep_services 908 self.using_proxy = True 909 self._static_services_list = True 910 else: 911 # It's important to avoid instantiating an API client 912 # unless we actually need one, for testing's sake. 913 if api_client is None: 914 api_client = arvados.api('v1') 915 self.api_client = api_client 916 self.api_token = api_client.api_token 917 self._gateway_services = {} 918 self._keep_services = None 919 self._writable_services = None 920 self.using_proxy = None 921 self._static_services_list = False 922 try: 923 self._default_classes = [ 924 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 925 except KeyError: 926 # We're talking to an old cluster 927 pass 928 929 def current_timeout(self, attempt_number): 930 """Return the appropriate timeout to use for this client. 931 932 The proxy timeout setting if the backend service is currently a proxy, 933 the regular timeout setting otherwise. The `attempt_number` indicates 934 how many times the operation has been tried already (starting from 0 935 for the first try), and scales the connection timeout portion of the 936 return value accordingly. 937 938 """ 939 # TODO(twp): the timeout should be a property of a 940 # _KeepService, not a KeepClient. See #4488. 941 t = self.proxy_timeout if self.using_proxy else self.timeout 942 if len(t) == 2: 943 return (t[0] * (1 << attempt_number), t[1]) 944 else: 945 return (t[0] * (1 << attempt_number), t[1], t[2]) 946 def _any_nondisk_services(self, service_list): 947 return any(ks.get('service_type', 'disk') != 'disk' 948 for ks in service_list) 949 950 def build_services_list(self, force_rebuild=False): 951 if (self._static_services_list or 952 (self._keep_services and not force_rebuild)): 953 return 954 with self.lock: 955 try: 956 keep_services = self.api_client.keep_services().accessible() 957 except Exception: # API server predates Keep services. 958 keep_services = self.api_client.keep_disks().list() 959 960 # Gateway services are only used when specified by UUID, 961 # so there's nothing to gain by filtering them by 962 # service_type. 963 self._gateway_services = {ks['uuid']: ks for ks in 964 keep_services.execute()['items']} 965 if not self._gateway_services: 966 raise arvados.errors.NoKeepServersError() 967 968 # Precompute the base URI for each service. 969 for r in self._gateway_services.values(): 970 host = r['service_host'] 971 if not host.startswith('[') and host.find(':') >= 0: 972 # IPv6 URIs must be formatted like http://[::1]:80/... 973 host = '[' + host + ']' 974 r['_service_root'] = "{}://{}:{:d}/".format( 975 'https' if r['service_ssl_flag'] else 'http', 976 host, 977 r['service_port']) 978 979 _logger.debug(str(self._gateway_services)) 980 self._keep_services = [ 981 ks for ks in self._gateway_services.values() 982 if not ks.get('service_type', '').startswith('gateway:')] 983 self._writable_services = [ks for ks in self._keep_services 984 if not ks.get('read_only')] 985 986 # For disk type services, max_replicas_per_service is 1 987 # It is unknown (unlimited) for other service types. 988 if self._any_nondisk_services(self._writable_services): 989 self.max_replicas_per_service = None 990 else: 991 self.max_replicas_per_service = 1 992 993 def _service_weight(self, data_hash, service_uuid): 994 """Compute the weight of a Keep service endpoint for a data 995 block with a known hash. 996 997 The weight is md5(h + u) where u is the last 15 characters of 998 the service endpoint's UUID. 999 """ 1000 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1001 1002 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1003 """Return an array of Keep service endpoints, in the order in 1004 which they should be probed when reading or writing data with 1005 the given hash+hints. 1006 """ 1007 self.build_services_list(force_rebuild) 1008 1009 sorted_roots = [] 1010 # Use the services indicated by the given +K@... remote 1011 # service hints, if any are present and can be resolved to a 1012 # URI. 1013 for hint in locator.hints: 1014 if hint.startswith('K@'): 1015 if len(hint) == 7: 1016 sorted_roots.append( 1017 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1018 elif len(hint) == 29: 1019 svc = self._gateway_services.get(hint[2:]) 1020 if svc: 1021 sorted_roots.append(svc['_service_root']) 1022 1023 # Sort the available local services by weight (heaviest first) 1024 # for this locator, and return their service_roots (base URIs) 1025 # in that order. 1026 use_services = self._keep_services 1027 if need_writable: 1028 use_services = self._writable_services 1029 self.using_proxy = self._any_nondisk_services(use_services) 1030 sorted_roots.extend([ 1031 svc['_service_root'] for svc in sorted( 1032 use_services, 1033 reverse=True, 1034 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1035 _logger.debug("{}: {}".format(locator, sorted_roots)) 1036 return sorted_roots 1037 1038 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1039 # roots_map is a dictionary, mapping Keep service root strings 1040 # to _KeepService objects. Poll for Keep services, and add any 1041 # new ones to roots_map. Return the current list of local 1042 # root strings. 1043 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,)) 1044 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1045 for root in local_roots: 1046 if root not in roots_map: 1047 roots_map[root] = self._KeepService( 1048 root, self._user_agent_pool, 1049 upload_counter=self.upload_counter, 1050 download_counter=self.download_counter, 1051 headers=headers, 1052 insecure=self.insecure) 1053 return local_roots 1054 1055 @staticmethod 1056 def _check_loop_result(result): 1057 # KeepClient RetryLoops should save results as a 2-tuple: the 1058 # actual result of the request, and the number of servers available 1059 # to receive the request this round. 1060 # This method returns True if there's a real result, False if 1061 # there are no more servers available, otherwise None. 1062 if isinstance(result, Exception): 1063 return None 1064 result, tried_server_count = result 1065 if (result is not None) and (result is not False): 1066 return True 1067 elif tried_server_count < 1: 1068 _logger.info("No more Keep services to try; giving up") 1069 return False 1070 else: 1071 return None 1072 1073 def get_from_cache(self, loc_s): 1074 """Fetch a block only if is in the cache, otherwise return None.""" 1075 locator = KeepLocator(loc_s) 1076 slot = self.block_cache.get(locator.md5sum) 1077 if slot is not None and slot.ready.is_set(): 1078 return slot.get() 1079 else: 1080 return None 1081 1082 def refresh_signature(self, loc): 1083 """Ask Keep to get the remote block and return its local signature""" 1084 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1085 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1086 1087 @retry.retry_method 1088 def head(self, loc_s, **kwargs): 1089 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1090 1091 @retry.retry_method 1092 def get(self, loc_s, **kwargs): 1093 return self._get_or_head(loc_s, method="GET", **kwargs) 1094 1095 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1096 """Get data from Keep. 1097 1098 This method fetches one or more blocks of data from Keep. It 1099 sends a request each Keep service registered with the API 1100 server (or the proxy provided when this client was 1101 instantiated), then each service named in location hints, in 1102 sequence. As soon as one service provides the data, it's 1103 returned. 1104 1105 Arguments: 1106 * loc_s: A string of one or more comma-separated locators to fetch. 1107 This method returns the concatenation of these blocks. 1108 * num_retries: The number of times to retry GET requests to 1109 *each* Keep server if it returns temporary failures, with 1110 exponential backoff. Note that, in each loop, the method may try 1111 to fetch data from every available Keep service, along with any 1112 that are named in location hints in the locator. The default value 1113 is set when the KeepClient is initialized. 1114 """ 1115 if ',' in loc_s: 1116 return ''.join(self.get(x) for x in loc_s.split(',')) 1117 1118 self.get_counter.add(1) 1119 1120 request_id = (request_id or 1121 (hasattr(self, 'api_client') and self.api_client.request_id) or 1122 arvados.util.new_request_id()) 1123 if headers is None: 1124 headers = {} 1125 headers['X-Request-Id'] = request_id 1126 1127 slot = None 1128 blob = None 1129 try: 1130 locator = KeepLocator(loc_s) 1131 if method == "GET": 1132 while slot is None: 1133 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1134 if first: 1135 # Fresh and empty "first time it is used" slot 1136 break 1137 if prefetch: 1138 # this is request for a prefetch to fill in 1139 # the cache, don't need to wait for the 1140 # result, so if it is already in flight return 1141 # immediately. Clear 'slot' to prevent 1142 # finally block from calling slot.set() 1143 if slot.ready.is_set(): 1144 slot.get() 1145 slot = None 1146 return None 1147 1148 blob = slot.get() 1149 if blob is not None: 1150 self.hits_counter.add(1) 1151 return blob 1152 1153 # If blob is None, this means either 1154 # 1155 # (a) another thread was fetching this block and 1156 # failed with an error or 1157 # 1158 # (b) cache thrashing caused the slot to be 1159 # evicted (content set to None) by another thread 1160 # between the call to reserve_cache() and get(). 1161 # 1162 # We'll handle these cases by reserving a new slot 1163 # and then doing a full GET request. 1164 slot = None 1165 1166 self.misses_counter.add(1) 1167 1168 # If the locator has hints specifying a prefix (indicating a 1169 # remote keepproxy) or the UUID of a local gateway service, 1170 # read data from the indicated service(s) instead of the usual 1171 # list of local disk services. 1172 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1173 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1174 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1175 for hint in locator.hints if ( 1176 hint.startswith('K@') and 1177 len(hint) == 29 and 1178 self._gateway_services.get(hint[2:]) 1179 )]) 1180 # Map root URLs to their _KeepService objects. 1181 roots_map = { 1182 root: self._KeepService(root, self._user_agent_pool, 1183 upload_counter=self.upload_counter, 1184 download_counter=self.download_counter, 1185 headers=headers, 1186 insecure=self.insecure) 1187 for root in hint_roots 1188 } 1189 1190 # See #3147 for a discussion of the loop implementation. Highlights: 1191 # * Refresh the list of Keep services after each failure, in case 1192 # it's being updated. 1193 # * Retry until we succeed, we're out of retries, or every available 1194 # service has returned permanent failure. 1195 sorted_roots = [] 1196 roots_map = {} 1197 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1198 backoff_start=2) 1199 for tries_left in loop: 1200 try: 1201 sorted_roots = self.map_new_services( 1202 roots_map, locator, 1203 force_rebuild=(tries_left < num_retries), 1204 need_writable=False, 1205 headers=headers) 1206 except Exception as error: 1207 loop.save_result(error) 1208 continue 1209 1210 # Query _KeepService objects that haven't returned 1211 # permanent failure, in our specified shuffle order. 1212 services_to_try = [roots_map[root] 1213 for root in sorted_roots 1214 if roots_map[root].usable()] 1215 for keep_service in services_to_try: 1216 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1217 if blob is not None: 1218 break 1219 loop.save_result((blob, len(services_to_try))) 1220 1221 # Always cache the result, then return it if we succeeded. 1222 if loop.success(): 1223 return blob 1224 finally: 1225 if slot is not None: 1226 self.block_cache.set(slot, blob) 1227 1228 # Q: Including 403 is necessary for the Keep tests to continue 1229 # passing, but maybe they should expect KeepReadError instead? 1230 not_founds = sum(1 for key in sorted_roots 1231 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1232 service_errors = ((key, roots_map[key].last_result()['error']) 1233 for key in sorted_roots) 1234 if not roots_map: 1235 raise arvados.errors.KeepReadError( 1236 "[{}] failed to read {}: no Keep services available ({})".format( 1237 request_id, loc_s, loop.last_result())) 1238 elif not_founds == len(sorted_roots): 1239 raise arvados.errors.NotFoundError( 1240 "[{}] {} not found".format(request_id, loc_s), service_errors) 1241 else: 1242 raise arvados.errors.KeepReadError( 1243 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1244 1245 @retry.retry_method 1246 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1247 """Save data in Keep. 1248 1249 This method will get a list of Keep services from the API server, and 1250 send the data to each one simultaneously in a new thread. Once the 1251 uploads are finished, if enough copies are saved, this method returns 1252 the most recent HTTP response body. If requests fail to upload 1253 enough copies, this method raises KeepWriteError. 1254 1255 Arguments: 1256 * data: The string of data to upload. 1257 * copies: The number of copies that the user requires be saved. 1258 Default 2. 1259 * num_retries: The number of times to retry PUT requests to 1260 *each* Keep server if it returns temporary failures, with 1261 exponential backoff. The default value is set when the 1262 KeepClient is initialized. 1263 * classes: An optional list of storage class names where copies should 1264 be written. 1265 """ 1266 1267 classes = classes or self._default_classes 1268 1269 if not isinstance(data, bytes): 1270 data = data.encode() 1271 1272 self.put_counter.add(1) 1273 1274 data_hash = hashlib.md5(data).hexdigest() 1275 loc_s = data_hash + '+' + str(len(data)) 1276 if copies < 1: 1277 return loc_s 1278 locator = KeepLocator(loc_s) 1279 1280 request_id = (request_id or 1281 (hasattr(self, 'api_client') and self.api_client.request_id) or 1282 arvados.util.new_request_id()) 1283 headers = { 1284 'X-Request-Id': request_id, 1285 'X-Keep-Desired-Replicas': str(copies), 1286 } 1287 roots_map = {} 1288 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1289 backoff_start=2) 1290 done_copies = 0 1291 done_classes = [] 1292 for tries_left in loop: 1293 try: 1294 sorted_roots = self.map_new_services( 1295 roots_map, locator, 1296 force_rebuild=(tries_left < num_retries), 1297 need_writable=True, 1298 headers=headers) 1299 except Exception as error: 1300 loop.save_result(error) 1301 continue 1302 1303 pending_classes = [] 1304 if done_classes is not None: 1305 pending_classes = list(set(classes) - set(done_classes)) 1306 writer_pool = KeepClient._KeepWriterThreadPool( 1307 data=data, 1308 data_hash=data_hash, 1309 copies=copies - done_copies, 1310 max_service_replicas=self.max_replicas_per_service, 1311 timeout=self.current_timeout(num_retries - tries_left), 1312 classes=pending_classes, 1313 ) 1314 for service_root, ks in [(root, roots_map[root]) 1315 for root in sorted_roots]: 1316 if ks.finished(): 1317 continue 1318 writer_pool.add_task(ks, service_root) 1319 writer_pool.join() 1320 pool_copies, pool_classes = writer_pool.done() 1321 done_copies += pool_copies 1322 if (done_classes is not None) and (pool_classes is not None): 1323 done_classes += pool_classes 1324 loop.save_result( 1325 (done_copies >= copies and set(done_classes) == set(classes), 1326 writer_pool.total_task_nr)) 1327 else: 1328 # Old keepstore contacted without storage classes support: 1329 # success is determined only by successful copies. 1330 # 1331 # Disable storage classes tracking from this point forward. 1332 if not self._storage_classes_unsupported_warning: 1333 self._storage_classes_unsupported_warning = True 1334 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1335 done_classes = None 1336 loop.save_result( 1337 (done_copies >= copies, writer_pool.total_task_nr)) 1338 1339 if loop.success(): 1340 return writer_pool.response() 1341 if not roots_map: 1342 raise arvados.errors.KeepWriteError( 1343 "[{}] failed to write {}: no Keep services available ({})".format( 1344 request_id, data_hash, loop.last_result())) 1345 else: 1346 service_errors = ((key, roots_map[key].last_result()['error']) 1347 for key in sorted_roots 1348 if roots_map[key].last_result()['error']) 1349 raise arvados.errors.KeepWriteError( 1350 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1351 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1352 1353 def _block_prefetch_worker(self): 1354 """The background downloader thread.""" 1355 while True: 1356 try: 1357 b = self._prefetch_queue.get() 1358 if b is None: 1359 return 1360 self.get(b, prefetch=True) 1361 except Exception: 1362 _logger.exception("Exception doing block prefetch") 1363 1364 def _start_prefetch_threads(self): 1365 if self._prefetch_threads is None: 1366 with self.lock: 1367 if self._prefetch_threads is not None: 1368 return 1369 self._prefetch_queue = queue.Queue() 1370 self._prefetch_threads = [] 1371 for i in range(0, self.num_prefetch_threads): 1372 thread = threading.Thread(target=self._block_prefetch_worker) 1373 self._prefetch_threads.append(thread) 1374 thread.daemon = True 1375 thread.start() 1376 1377 def block_prefetch(self, locator): 1378 """ 1379 This relies on the fact that KeepClient implements a block cache, 1380 so repeated requests for the same block will not result in repeated 1381 downloads (unless the block is evicted from the cache.) This method 1382 does not block. 1383 """ 1384 1385 if self.block_cache.get(locator) is not None: 1386 return 1387 1388 self._start_prefetch_threads() 1389 self._prefetch_queue.put(locator) 1390 1391 def stop_prefetch_threads(self): 1392 with self.lock: 1393 if self._prefetch_threads is not None: 1394 for t in self._prefetch_threads: 1395 self._prefetch_queue.put(None) 1396 for t in self._prefetch_threads: 1397 t.join() 1398 self._prefetch_threads = None 1399 self._prefetch_queue = None 1400 1401 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1402 """A stub for put(). 1403 1404 This method is used in place of the real put() method when 1405 using local storage (see constructor's local_store argument). 1406 1407 copies and num_retries arguments are ignored: they are here 1408 only for the sake of offering the same call signature as 1409 put(). 1410 1411 Data stored this way can be retrieved via local_store_get(). 1412 """ 1413 md5 = hashlib.md5(data).hexdigest() 1414 locator = '%s+%d' % (md5, len(data)) 1415 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1416 f.write(data) 1417 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1418 os.path.join(self.local_store, md5)) 1419 return locator 1420 1421 def local_store_get(self, loc_s, num_retries=None): 1422 """Companion to local_store_put().""" 1423 try: 1424 locator = KeepLocator(loc_s) 1425 except ValueError: 1426 raise arvados.errors.NotFoundError( 1427 "Invalid data locator: '%s'" % loc_s) 1428 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1429 return b'' 1430 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1431 return f.read() 1432 1433 def local_store_head(self, loc_s, num_retries=None): 1434 """Companion to local_store_put().""" 1435 try: 1436 locator = KeepLocator(loc_s) 1437 except ValueError: 1438 raise arvados.errors.NotFoundError( 1439 "Invalid data locator: '%s'" % loc_s) 1440 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1441 return True 1442 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1443 return True
51class KeepLocator(object): 52 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 53 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 54 55 def __init__(self, locator_str): 56 self.hints = [] 57 self._perm_sig = None 58 self._perm_expiry = None 59 pieces = iter(locator_str.split('+')) 60 self.md5sum = next(pieces) 61 try: 62 self.size = int(next(pieces)) 63 except StopIteration: 64 self.size = None 65 for hint in pieces: 66 if self.HINT_RE.match(hint) is None: 67 raise ValueError("invalid hint format: {}".format(hint)) 68 elif hint.startswith('A'): 69 self.parse_permission_hint(hint) 70 else: 71 self.hints.append(hint) 72 73 def __str__(self): 74 return '+'.join( 75 str(s) 76 for s in [self.md5sum, self.size, 77 self.permission_hint()] + self.hints 78 if s is not None) 79 80 def stripped(self): 81 if self.size is not None: 82 return "%s+%i" % (self.md5sum, self.size) 83 else: 84 return self.md5sum 85 86 def _make_hex_prop(name, length): 87 # Build and return a new property with the given name that 88 # must be a hex string of the given length. 89 data_name = '_{}'.format(name) 90 def getter(self): 91 return getattr(self, data_name) 92 def setter(self, hex_str): 93 if not arvados.util.is_hex(hex_str, length): 94 raise ValueError("{} is not a {}-digit hex string: {!r}". 95 format(name, length, hex_str)) 96 setattr(self, data_name, hex_str) 97 return property(getter, setter) 98 99 md5sum = _make_hex_prop('md5sum', 32) 100 perm_sig = _make_hex_prop('perm_sig', 40) 101 102 @property 103 def perm_expiry(self): 104 return self._perm_expiry 105 106 @perm_expiry.setter 107 def perm_expiry(self, value): 108 if not arvados.util.is_hex(value, 1, 8): 109 raise ValueError( 110 "permission timestamp must be a hex Unix timestamp: {}". 111 format(value)) 112 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 113 114 def permission_hint(self): 115 data = [self.perm_sig, self.perm_expiry] 116 if None in data: 117 return None 118 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 119 return "A{}@{:08x}".format(*data) 120 121 def parse_permission_hint(self, s): 122 try: 123 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 124 except IndexError: 125 raise ValueError("bad permission hint {}".format(s)) 126 127 def permission_expired(self, as_of_dt=None): 128 if self.perm_expiry is None: 129 return False 130 elif as_of_dt is None: 131 as_of_dt = datetime.datetime.now() 132 return self.perm_expiry <= as_of_dt
55 def __init__(self, locator_str): 56 self.hints = [] 57 self._perm_sig = None 58 self._perm_expiry = None 59 pieces = iter(locator_str.split('+')) 60 self.md5sum = next(pieces) 61 try: 62 self.size = int(next(pieces)) 63 except StopIteration: 64 self.size = None 65 for hint in pieces: 66 if self.HINT_RE.match(hint) is None: 67 raise ValueError("invalid hint format: {}".format(hint)) 68 elif hint.startswith('A'): 69 self.parse_permission_hint(hint) 70 else: 71 self.hints.append(hint)
135class KeepBlockCache(object): 136 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 137 self.cache_max = cache_max 138 self._cache = collections.OrderedDict() 139 self._cache_lock = threading.Lock() 140 self._max_slots = max_slots 141 self._disk_cache = disk_cache 142 self._disk_cache_dir = disk_cache_dir 143 self._cache_updating = threading.Condition(self._cache_lock) 144 145 if self._disk_cache and self._disk_cache_dir is None: 146 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep')) 147 148 if self._max_slots == 0: 149 if self._disk_cache: 150 # Each block uses two file descriptors, one used to 151 # open it initially and hold the flock(), and a second 152 # hidden one used by mmap(). 153 # 154 # Set max slots to 1/8 of maximum file handles. This 155 # means we'll use at most 1/4 of total file handles. 156 # 157 # NOFILE typically defaults to 1024 on Linux so this 158 # is 128 slots (256 file handles), which means we can 159 # cache up to 8 GiB of 64 MiB blocks. This leaves 160 # 768 file handles for sockets and other stuff. 161 # 162 # When we want the ability to have more cache (e.g. in 163 # arv-mount) we'll increase rlimit before calling 164 # this. 165 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 166 else: 167 # RAM cache slots 168 self._max_slots = 512 169 170 if self.cache_max == 0: 171 if self._disk_cache: 172 fs = os.statvfs(self._disk_cache_dir) 173 # Calculation of available space incorporates existing cache usage 174 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 175 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 176 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 177 # pick smallest of: 178 # 10% of total disk size 179 # 25% of available space 180 # max_slots * 64 MiB 181 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 182 else: 183 # 256 MiB in RAM 184 self.cache_max = (256 * 1024 * 1024) 185 186 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 187 188 self.cache_total = 0 189 if self._disk_cache: 190 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 191 for slot in self._cache.values(): 192 self.cache_total += slot.size() 193 self.cap_cache() 194 195 class _CacheSlot: 196 __slots__ = ("locator", "ready", "content") 197 198 def __init__(self, locator): 199 self.locator = locator 200 self.ready = threading.Event() 201 self.content = None 202 203 def get(self): 204 self.ready.wait() 205 return self.content 206 207 def set(self, value): 208 if self.content is not None: 209 return False 210 self.content = value 211 self.ready.set() 212 return True 213 214 def size(self): 215 if self.content is None: 216 return 0 217 else: 218 return len(self.content) 219 220 def evict(self): 221 self.content = None 222 223 224 def _resize_cache(self, cache_max, max_slots): 225 # Try and make sure the contents of the cache do not exceed 226 # the supplied maximums. 227 228 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 229 return 230 231 _evict_candidates = collections.deque(self._cache.values()) 232 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 233 slot = _evict_candidates.popleft() 234 if not slot.ready.is_set(): 235 continue 236 237 sz = slot.size() 238 slot.evict() 239 self.cache_total -= sz 240 del self._cache[slot.locator] 241 242 243 def cap_cache(self): 244 '''Cap the cache size to self.cache_max''' 245 with self._cache_updating: 246 self._resize_cache(self.cache_max, self._max_slots) 247 self._cache_updating.notify_all() 248 249 def _get(self, locator): 250 # Test if the locator is already in the cache 251 if locator in self._cache: 252 n = self._cache[locator] 253 if n.ready.is_set() and n.content is None: 254 del self._cache[n.locator] 255 return None 256 self._cache.move_to_end(locator) 257 return n 258 if self._disk_cache: 259 # see if it exists on disk 260 n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 261 if n is not None: 262 self._cache[n.locator] = n 263 self.cache_total += n.size() 264 return n 265 return None 266 267 def get(self, locator): 268 with self._cache_lock: 269 return self._get(locator) 270 271 def reserve_cache(self, locator): 272 '''Reserve a cache slot for the specified locator, 273 or return the existing slot.''' 274 with self._cache_updating: 275 n = self._get(locator) 276 if n: 277 return n, False 278 else: 279 # Add a new cache slot for the locator 280 self._resize_cache(self.cache_max, self._max_slots-1) 281 while len(self._cache) >= self._max_slots: 282 # If there isn't a slot available, need to wait 283 # for something to happen that releases one of the 284 # cache slots. Idle for 200 ms or woken up by 285 # another thread 286 self._cache_updating.wait(timeout=0.2) 287 self._resize_cache(self.cache_max, self._max_slots-1) 288 289 if self._disk_cache: 290 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 291 else: 292 n = KeepBlockCache._CacheSlot(locator) 293 self._cache[n.locator] = n 294 return n, True 295 296 def set(self, slot, blob): 297 try: 298 if slot.set(blob): 299 self.cache_total += slot.size() 300 return 301 except OSError as e: 302 if e.errno == errno.ENOMEM: 303 # Reduce max slots to current - 4, cap cache and retry 304 with self._cache_lock: 305 self._max_slots = max(4, len(self._cache) - 4) 306 elif e.errno == errno.ENOSPC: 307 # Reduce disk max space to current - 256 MiB, cap cache and retry 308 with self._cache_lock: 309 sm = sum(st.size() for st in self._cache.values()) 310 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 311 elif e.errno == errno.ENODEV: 312 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 313 except Exception as e: 314 pass 315 finally: 316 # Check if we should evict things from the cache. Either 317 # because we added a new thing or there was an error and 318 # we possibly adjusted the limits down, so we might need 319 # to push something out. 320 self.cap_cache() 321 322 try: 323 # Only gets here if there was an error the first time. The 324 # exception handler adjusts limits downward in some cases 325 # to free up resources, which would make the operation 326 # succeed. 327 if slot.set(blob): 328 self.cache_total += slot.size() 329 except Exception as e: 330 # It failed again. Give up. 331 slot.set(None) 332 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 333 334 self.cap_cache()
136 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 137 self.cache_max = cache_max 138 self._cache = collections.OrderedDict() 139 self._cache_lock = threading.Lock() 140 self._max_slots = max_slots 141 self._disk_cache = disk_cache 142 self._disk_cache_dir = disk_cache_dir 143 self._cache_updating = threading.Condition(self._cache_lock) 144 145 if self._disk_cache and self._disk_cache_dir is None: 146 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep')) 147 148 if self._max_slots == 0: 149 if self._disk_cache: 150 # Each block uses two file descriptors, one used to 151 # open it initially and hold the flock(), and a second 152 # hidden one used by mmap(). 153 # 154 # Set max slots to 1/8 of maximum file handles. This 155 # means we'll use at most 1/4 of total file handles. 156 # 157 # NOFILE typically defaults to 1024 on Linux so this 158 # is 128 slots (256 file handles), which means we can 159 # cache up to 8 GiB of 64 MiB blocks. This leaves 160 # 768 file handles for sockets and other stuff. 161 # 162 # When we want the ability to have more cache (e.g. in 163 # arv-mount) we'll increase rlimit before calling 164 # this. 165 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 166 else: 167 # RAM cache slots 168 self._max_slots = 512 169 170 if self.cache_max == 0: 171 if self._disk_cache: 172 fs = os.statvfs(self._disk_cache_dir) 173 # Calculation of available space incorporates existing cache usage 174 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 175 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 176 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 177 # pick smallest of: 178 # 10% of total disk size 179 # 25% of available space 180 # max_slots * 64 MiB 181 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 182 else: 183 # 256 MiB in RAM 184 self.cache_max = (256 * 1024 * 1024) 185 186 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 187 188 self.cache_total = 0 189 if self._disk_cache: 190 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 191 for slot in self._cache.values(): 192 self.cache_total += slot.size() 193 self.cap_cache()
243 def cap_cache(self): 244 '''Cap the cache size to self.cache_max''' 245 with self._cache_updating: 246 self._resize_cache(self.cache_max, self._max_slots) 247 self._cache_updating.notify_all()
Cap the cache size to self.cache_max
271 def reserve_cache(self, locator): 272 '''Reserve a cache slot for the specified locator, 273 or return the existing slot.''' 274 with self._cache_updating: 275 n = self._get(locator) 276 if n: 277 return n, False 278 else: 279 # Add a new cache slot for the locator 280 self._resize_cache(self.cache_max, self._max_slots-1) 281 while len(self._cache) >= self._max_slots: 282 # If there isn't a slot available, need to wait 283 # for something to happen that releases one of the 284 # cache slots. Idle for 200 ms or woken up by 285 # another thread 286 self._cache_updating.wait(timeout=0.2) 287 self._resize_cache(self.cache_max, self._max_slots-1) 288 289 if self._disk_cache: 290 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 291 else: 292 n = KeepBlockCache._CacheSlot(locator) 293 self._cache[n.locator] = n 294 return n, True
Reserve a cache slot for the specified locator, or return the existing slot.
296 def set(self, slot, blob): 297 try: 298 if slot.set(blob): 299 self.cache_total += slot.size() 300 return 301 except OSError as e: 302 if e.errno == errno.ENOMEM: 303 # Reduce max slots to current - 4, cap cache and retry 304 with self._cache_lock: 305 self._max_slots = max(4, len(self._cache) - 4) 306 elif e.errno == errno.ENOSPC: 307 # Reduce disk max space to current - 256 MiB, cap cache and retry 308 with self._cache_lock: 309 sm = sum(st.size() for st in self._cache.values()) 310 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 311 elif e.errno == errno.ENODEV: 312 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 313 except Exception as e: 314 pass 315 finally: 316 # Check if we should evict things from the cache. Either 317 # because we added a new thing or there was an error and 318 # we possibly adjusted the limits down, so we might need 319 # to push something out. 320 self.cap_cache() 321 322 try: 323 # Only gets here if there was an error the first time. The 324 # exception handler adjusts limits downward in some cases 325 # to free up resources, which would make the operation 326 # succeed. 327 if slot.set(blob): 328 self.cache_total += slot.size() 329 except Exception as e: 330 # It failed again. Give up. 331 slot.set(None) 332 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 333 334 self.cap_cache()
351class KeepClient(object): 352 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 353 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 354 355 class _KeepService(PyCurlHelper): 356 """Make requests to a single Keep service, and track results. 357 358 A _KeepService is intended to last long enough to perform one 359 transaction (GET or PUT) against one Keep service. This can 360 involve calling either get() or put() multiple times in order 361 to retry after transient failures. However, calling both get() 362 and put() on a single instance -- or using the same instance 363 to access two different Keep services -- will not produce 364 sensible behavior. 365 """ 366 367 HTTP_ERRORS = ( 368 socket.error, 369 ssl.SSLError, 370 arvados.errors.HttpError, 371 ) 372 373 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 374 upload_counter=None, 375 download_counter=None, 376 headers={}, 377 insecure=False): 378 super().__init__() 379 self.root = root 380 self._user_agent_pool = user_agent_pool 381 self._result = {'error': None} 382 self._usable = True 383 self._session = None 384 self._socket = None 385 self.get_headers = {'Accept': 'application/octet-stream'} 386 self.get_headers.update(headers) 387 self.put_headers = headers 388 self.upload_counter = upload_counter 389 self.download_counter = download_counter 390 self.insecure = insecure 391 392 def usable(self): 393 """Is it worth attempting a request?""" 394 return self._usable 395 396 def finished(self): 397 """Did the request succeed or encounter permanent failure?""" 398 return self._result['error'] == False or not self._usable 399 400 def last_result(self): 401 return self._result 402 403 def _get_user_agent(self): 404 try: 405 return self._user_agent_pool.get(block=False) 406 except queue.Empty: 407 return pycurl.Curl() 408 409 def _put_user_agent(self, ua): 410 try: 411 ua.reset() 412 self._user_agent_pool.put(ua, block=False) 413 except: 414 ua.close() 415 416 def get(self, locator, method="GET", timeout=None): 417 # locator is a KeepLocator object. 418 url = self.root + str(locator) 419 _logger.debug("Request: %s %s", method, url) 420 curl = self._get_user_agent() 421 ok = None 422 try: 423 with Timer() as t: 424 self._headers = {} 425 response_body = BytesIO() 426 curl.setopt(pycurl.NOSIGNAL, 1) 427 curl.setopt(pycurl.OPENSOCKETFUNCTION, 428 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 429 curl.setopt(pycurl.URL, url.encode('utf-8')) 430 curl.setopt(pycurl.HTTPHEADER, [ 431 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 432 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 433 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 434 if self.insecure: 435 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 436 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 437 else: 438 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 439 if method == "HEAD": 440 curl.setopt(pycurl.NOBODY, True) 441 else: 442 curl.setopt(pycurl.HTTPGET, True) 443 self._setcurltimeouts(curl, timeout, method=="HEAD") 444 445 try: 446 curl.perform() 447 except Exception as e: 448 raise arvados.errors.HttpError(0, str(e)) 449 finally: 450 if self._socket: 451 self._socket.close() 452 self._socket = None 453 self._result = { 454 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 455 'body': response_body.getvalue(), 456 'headers': self._headers, 457 'error': False, 458 } 459 460 ok = retry.check_http_response_success(self._result['status_code']) 461 if not ok: 462 self._result['error'] = arvados.errors.HttpError( 463 self._result['status_code'], 464 self._headers.get('x-status-line', 'Error')) 465 except self.HTTP_ERRORS as e: 466 self._result = { 467 'error': e, 468 } 469 self._usable = ok != False 470 if self._result.get('status_code', None): 471 # The client worked well enough to get an HTTP status 472 # code, so presumably any problems are just on the 473 # server side and it's OK to reuse the client. 474 self._put_user_agent(curl) 475 else: 476 # Don't return this client to the pool, in case it's 477 # broken. 478 curl.close() 479 if not ok: 480 _logger.debug("Request fail: GET %s => %s: %s", 481 url, type(self._result['error']), str(self._result['error'])) 482 return None 483 if method == "HEAD": 484 _logger.info("HEAD %s: %s bytes", 485 self._result['status_code'], 486 self._result.get('content-length')) 487 if self._result['headers'].get('x-keep-locator'): 488 # This is a response to a remote block copy request, return 489 # the local copy block locator. 490 return self._result['headers'].get('x-keep-locator') 491 return True 492 493 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 494 self._result['status_code'], 495 len(self._result['body']), 496 t.msecs, 497 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 498 499 if self.download_counter: 500 self.download_counter.add(len(self._result['body'])) 501 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 502 if resp_md5 != locator.md5sum: 503 _logger.warning("Checksum fail: md5(%s) = %s", 504 url, resp_md5) 505 self._result['error'] = arvados.errors.HttpError( 506 0, 'Checksum fail') 507 return None 508 return self._result['body'] 509 510 def put(self, hash_s, body, timeout=None, headers={}): 511 put_headers = copy.copy(self.put_headers) 512 put_headers.update(headers) 513 url = self.root + hash_s 514 _logger.debug("Request: PUT %s", url) 515 curl = self._get_user_agent() 516 ok = None 517 try: 518 with Timer() as t: 519 self._headers = {} 520 body_reader = BytesIO(body) 521 response_body = BytesIO() 522 curl.setopt(pycurl.NOSIGNAL, 1) 523 curl.setopt(pycurl.OPENSOCKETFUNCTION, 524 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 525 curl.setopt(pycurl.URL, url.encode('utf-8')) 526 # Using UPLOAD tells cURL to wait for a "go ahead" from the 527 # Keep server (in the form of a HTTP/1.1 "100 Continue" 528 # response) instead of sending the request body immediately. 529 # This allows the server to reject the request if the request 530 # is invalid or the server is read-only, without waiting for 531 # the client to send the entire block. 532 curl.setopt(pycurl.UPLOAD, True) 533 curl.setopt(pycurl.INFILESIZE, len(body)) 534 curl.setopt(pycurl.READFUNCTION, body_reader.read) 535 curl.setopt(pycurl.HTTPHEADER, [ 536 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 537 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 538 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 539 if self.insecure: 540 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 541 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 542 else: 543 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 544 self._setcurltimeouts(curl, timeout) 545 try: 546 curl.perform() 547 except Exception as e: 548 raise arvados.errors.HttpError(0, str(e)) 549 finally: 550 if self._socket: 551 self._socket.close() 552 self._socket = None 553 self._result = { 554 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 555 'body': response_body.getvalue().decode('utf-8'), 556 'headers': self._headers, 557 'error': False, 558 } 559 ok = retry.check_http_response_success(self._result['status_code']) 560 if not ok: 561 self._result['error'] = arvados.errors.HttpError( 562 self._result['status_code'], 563 self._headers.get('x-status-line', 'Error')) 564 except self.HTTP_ERRORS as e: 565 self._result = { 566 'error': e, 567 } 568 self._usable = ok != False # still usable if ok is True or None 569 if self._result.get('status_code', None): 570 # Client is functional. See comment in get(). 571 self._put_user_agent(curl) 572 else: 573 curl.close() 574 if not ok: 575 _logger.debug("Request fail: PUT %s => %s: %s", 576 url, type(self._result['error']), str(self._result['error'])) 577 return False 578 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 579 self._result['status_code'], 580 len(body), 581 t.msecs, 582 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 583 if self.upload_counter: 584 self.upload_counter.add(len(body)) 585 return True 586 587 588 class _KeepWriterQueue(queue.Queue): 589 def __init__(self, copies, classes=[]): 590 queue.Queue.__init__(self) # Old-style superclass 591 self.wanted_copies = copies 592 self.wanted_storage_classes = classes 593 self.successful_copies = 0 594 self.confirmed_storage_classes = {} 595 self.response = None 596 self.storage_classes_tracking = True 597 self.queue_data_lock = threading.RLock() 598 self.pending_tries = max(copies, len(classes)) 599 self.pending_tries_notification = threading.Condition() 600 601 def write_success(self, response, replicas_nr, classes_confirmed): 602 with self.queue_data_lock: 603 self.successful_copies += replicas_nr 604 if classes_confirmed is None: 605 self.storage_classes_tracking = False 606 elif self.storage_classes_tracking: 607 for st_class, st_copies in classes_confirmed.items(): 608 try: 609 self.confirmed_storage_classes[st_class] += st_copies 610 except KeyError: 611 self.confirmed_storage_classes[st_class] = st_copies 612 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 613 self.response = response 614 with self.pending_tries_notification: 615 self.pending_tries_notification.notify_all() 616 617 def write_fail(self, ks): 618 with self.pending_tries_notification: 619 self.pending_tries += 1 620 self.pending_tries_notification.notify() 621 622 def pending_copies(self): 623 with self.queue_data_lock: 624 return self.wanted_copies - self.successful_copies 625 626 def satisfied_classes(self): 627 with self.queue_data_lock: 628 if not self.storage_classes_tracking: 629 # Notifies disabled storage classes expectation to 630 # the outer loop. 631 return None 632 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 633 634 def pending_classes(self): 635 with self.queue_data_lock: 636 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 637 return [] 638 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 639 for st_class, st_copies in self.confirmed_storage_classes.items(): 640 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 641 unsatisfied_classes.remove(st_class) 642 return unsatisfied_classes 643 644 def get_next_task(self): 645 with self.pending_tries_notification: 646 while True: 647 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 648 # This notify_all() is unnecessary -- 649 # write_success() already called notify_all() 650 # when pending<1 became true, so it's not 651 # possible for any other thread to be in 652 # wait() now -- but it's cheap insurance 653 # against deadlock so we do it anyway: 654 self.pending_tries_notification.notify_all() 655 # Drain the queue and then raise Queue.Empty 656 while True: 657 self.get_nowait() 658 self.task_done() 659 elif self.pending_tries > 0: 660 service, service_root = self.get_nowait() 661 if service.finished(): 662 self.task_done() 663 continue 664 self.pending_tries -= 1 665 return service, service_root 666 elif self.empty(): 667 self.pending_tries_notification.notify_all() 668 raise queue.Empty 669 else: 670 self.pending_tries_notification.wait() 671 672 673 class _KeepWriterThreadPool: 674 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 675 self.total_task_nr = 0 676 if (not max_service_replicas) or (max_service_replicas >= copies): 677 num_threads = 1 678 else: 679 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 680 _logger.debug("Pool max threads is %d", num_threads) 681 self.workers = [] 682 self.queue = KeepClient._KeepWriterQueue(copies, classes) 683 # Create workers 684 for _ in range(num_threads): 685 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout) 686 self.workers.append(w) 687 688 def add_task(self, ks, service_root): 689 self.queue.put((ks, service_root)) 690 self.total_task_nr += 1 691 692 def done(self): 693 return self.queue.successful_copies, self.queue.satisfied_classes() 694 695 def join(self): 696 # Start workers 697 for worker in self.workers: 698 worker.start() 699 # Wait for finished work 700 self.queue.join() 701 702 def response(self): 703 return self.queue.response 704 705 706 class _KeepWriterThread(threading.Thread): 707 class TaskFailed(RuntimeError): 708 """Exception for failed Keep writes 709 710 TODO: Move this class to the module top level and document it 711 712 @private 713 """ 714 715 716 def __init__(self, queue, data, data_hash, timeout=None): 717 super().__init__() 718 self.timeout = timeout 719 self.queue = queue 720 self.data = data 721 self.data_hash = data_hash 722 self.daemon = True 723 724 def run(self): 725 while True: 726 try: 727 service, service_root = self.queue.get_next_task() 728 except queue.Empty: 729 return 730 try: 731 locator, copies, classes = self.do_task(service, service_root) 732 except Exception as e: 733 if not isinstance(e, self.TaskFailed): 734 _logger.exception("Exception in _KeepWriterThread") 735 self.queue.write_fail(service) 736 else: 737 self.queue.write_success(locator, copies, classes) 738 finally: 739 self.queue.task_done() 740 741 def do_task(self, service, service_root): 742 classes = self.queue.pending_classes() 743 headers = {} 744 if len(classes) > 0: 745 classes.sort() 746 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 747 success = bool(service.put(self.data_hash, 748 self.data, 749 timeout=self.timeout, 750 headers=headers)) 751 result = service.last_result() 752 753 if not success: 754 if result.get('status_code'): 755 _logger.debug("Request fail: PUT %s => %s %s", 756 self.data_hash, 757 result.get('status_code'), 758 result.get('body')) 759 raise self.TaskFailed() 760 761 _logger.debug("_KeepWriterThread %s succeeded %s+%i %s", 762 str(threading.current_thread()), 763 self.data_hash, 764 len(self.data), 765 service_root) 766 try: 767 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 768 except (KeyError, ValueError): 769 replicas_stored = 1 770 771 classes_confirmed = {} 772 try: 773 scch = result['headers']['x-keep-storage-classes-confirmed'] 774 for confirmation in scch.replace(' ', '').split(','): 775 if '=' in confirmation: 776 stored_class, stored_copies = confirmation.split('=')[:2] 777 classes_confirmed[stored_class] = int(stored_copies) 778 except (KeyError, ValueError): 779 # Storage classes confirmed header missing or corrupt 780 classes_confirmed = None 781 782 return result['body'].strip(), replicas_stored, classes_confirmed 783 784 785 def __init__(self, api_client=None, proxy=None, 786 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 787 api_token=None, local_store=None, block_cache=None, 788 num_retries=10, session=None, num_prefetch_threads=None): 789 """Initialize a new KeepClient. 790 791 Arguments: 792 :api_client: 793 The API client to use to find Keep services. If not 794 provided, KeepClient will build one from available Arvados 795 configuration. 796 797 :proxy: 798 If specified, this KeepClient will send requests to this Keep 799 proxy. Otherwise, KeepClient will fall back to the setting of the 800 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 801 If you want to KeepClient does not use a proxy, pass in an empty 802 string. 803 804 :timeout: 805 The initial timeout (in seconds) for HTTP requests to Keep 806 non-proxy servers. A tuple of three floats is interpreted as 807 (connection_timeout, read_timeout, minimum_bandwidth). A connection 808 will be aborted if the average traffic rate falls below 809 minimum_bandwidth bytes per second over an interval of read_timeout 810 seconds. Because timeouts are often a result of transient server 811 load, the actual connection timeout will be increased by a factor 812 of two on each retry. 813 Default: (2, 256, 32768). 814 815 :proxy_timeout: 816 The initial timeout (in seconds) for HTTP requests to 817 Keep proxies. A tuple of three floats is interpreted as 818 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 819 described above for adjusting connection timeouts on retry also 820 applies. 821 Default: (20, 256, 32768). 822 823 :api_token: 824 If you're not using an API client, but only talking 825 directly to a Keep proxy, this parameter specifies an API token 826 to authenticate Keep requests. It is an error to specify both 827 api_client and api_token. If you specify neither, KeepClient 828 will use one available from the Arvados configuration. 829 830 :local_store: 831 If specified, this KeepClient will bypass Keep 832 services, and save data to the named directory. If unspecified, 833 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 834 environment variable. If you want to ensure KeepClient does not 835 use local storage, pass in an empty string. This is primarily 836 intended to mock a server for testing. 837 838 :num_retries: 839 The default number of times to retry failed requests. 840 This will be used as the default num_retries value when get() and 841 put() are called. Default 10. 842 """ 843 self.lock = threading.Lock() 844 if proxy is None: 845 if config.get('ARVADOS_KEEP_SERVICES'): 846 proxy = config.get('ARVADOS_KEEP_SERVICES') 847 else: 848 proxy = config.get('ARVADOS_KEEP_PROXY') 849 if api_token is None: 850 if api_client is None: 851 api_token = config.get('ARVADOS_API_TOKEN') 852 else: 853 api_token = api_client.api_token 854 elif api_client is not None: 855 raise ValueError( 856 "can't build KeepClient with both API client and token") 857 if local_store is None: 858 local_store = os.environ.get('KEEP_LOCAL_STORE') 859 860 if api_client is None: 861 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 862 else: 863 self.insecure = api_client.insecure 864 865 self.block_cache = block_cache if block_cache else KeepBlockCache() 866 self.timeout = timeout 867 self.proxy_timeout = proxy_timeout 868 self._user_agent_pool = queue.LifoQueue() 869 self.upload_counter = _Counter() 870 self.download_counter = _Counter() 871 self.put_counter = _Counter() 872 self.get_counter = _Counter() 873 self.hits_counter = _Counter() 874 self.misses_counter = _Counter() 875 self._storage_classes_unsupported_warning = False 876 self._default_classes = [] 877 if num_prefetch_threads is not None: 878 self.num_prefetch_threads = num_prefetch_threads 879 else: 880 self.num_prefetch_threads = 2 881 self._prefetch_queue = None 882 self._prefetch_threads = None 883 884 if local_store: 885 self.local_store = local_store 886 self.head = self.local_store_head 887 self.get = self.local_store_get 888 self.put = self.local_store_put 889 else: 890 self.num_retries = num_retries 891 self.max_replicas_per_service = None 892 if proxy: 893 proxy_uris = proxy.split() 894 for i in range(len(proxy_uris)): 895 if not proxy_uris[i].endswith('/'): 896 proxy_uris[i] += '/' 897 # URL validation 898 url = urllib.parse.urlparse(proxy_uris[i]) 899 if not (url.scheme and url.netloc): 900 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 901 self.api_token = api_token 902 self._gateway_services = {} 903 self._keep_services = [{ 904 'uuid': "00000-bi6l4-%015d" % idx, 905 'service_type': 'proxy', 906 '_service_root': uri, 907 } for idx, uri in enumerate(proxy_uris)] 908 self._writable_services = self._keep_services 909 self.using_proxy = True 910 self._static_services_list = True 911 else: 912 # It's important to avoid instantiating an API client 913 # unless we actually need one, for testing's sake. 914 if api_client is None: 915 api_client = arvados.api('v1') 916 self.api_client = api_client 917 self.api_token = api_client.api_token 918 self._gateway_services = {} 919 self._keep_services = None 920 self._writable_services = None 921 self.using_proxy = None 922 self._static_services_list = False 923 try: 924 self._default_classes = [ 925 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 926 except KeyError: 927 # We're talking to an old cluster 928 pass 929 930 def current_timeout(self, attempt_number): 931 """Return the appropriate timeout to use for this client. 932 933 The proxy timeout setting if the backend service is currently a proxy, 934 the regular timeout setting otherwise. The `attempt_number` indicates 935 how many times the operation has been tried already (starting from 0 936 for the first try), and scales the connection timeout portion of the 937 return value accordingly. 938 939 """ 940 # TODO(twp): the timeout should be a property of a 941 # _KeepService, not a KeepClient. See #4488. 942 t = self.proxy_timeout if self.using_proxy else self.timeout 943 if len(t) == 2: 944 return (t[0] * (1 << attempt_number), t[1]) 945 else: 946 return (t[0] * (1 << attempt_number), t[1], t[2]) 947 def _any_nondisk_services(self, service_list): 948 return any(ks.get('service_type', 'disk') != 'disk' 949 for ks in service_list) 950 951 def build_services_list(self, force_rebuild=False): 952 if (self._static_services_list or 953 (self._keep_services and not force_rebuild)): 954 return 955 with self.lock: 956 try: 957 keep_services = self.api_client.keep_services().accessible() 958 except Exception: # API server predates Keep services. 959 keep_services = self.api_client.keep_disks().list() 960 961 # Gateway services are only used when specified by UUID, 962 # so there's nothing to gain by filtering them by 963 # service_type. 964 self._gateway_services = {ks['uuid']: ks for ks in 965 keep_services.execute()['items']} 966 if not self._gateway_services: 967 raise arvados.errors.NoKeepServersError() 968 969 # Precompute the base URI for each service. 970 for r in self._gateway_services.values(): 971 host = r['service_host'] 972 if not host.startswith('[') and host.find(':') >= 0: 973 # IPv6 URIs must be formatted like http://[::1]:80/... 974 host = '[' + host + ']' 975 r['_service_root'] = "{}://{}:{:d}/".format( 976 'https' if r['service_ssl_flag'] else 'http', 977 host, 978 r['service_port']) 979 980 _logger.debug(str(self._gateway_services)) 981 self._keep_services = [ 982 ks for ks in self._gateway_services.values() 983 if not ks.get('service_type', '').startswith('gateway:')] 984 self._writable_services = [ks for ks in self._keep_services 985 if not ks.get('read_only')] 986 987 # For disk type services, max_replicas_per_service is 1 988 # It is unknown (unlimited) for other service types. 989 if self._any_nondisk_services(self._writable_services): 990 self.max_replicas_per_service = None 991 else: 992 self.max_replicas_per_service = 1 993 994 def _service_weight(self, data_hash, service_uuid): 995 """Compute the weight of a Keep service endpoint for a data 996 block with a known hash. 997 998 The weight is md5(h + u) where u is the last 15 characters of 999 the service endpoint's UUID. 1000 """ 1001 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1002 1003 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1004 """Return an array of Keep service endpoints, in the order in 1005 which they should be probed when reading or writing data with 1006 the given hash+hints. 1007 """ 1008 self.build_services_list(force_rebuild) 1009 1010 sorted_roots = [] 1011 # Use the services indicated by the given +K@... remote 1012 # service hints, if any are present and can be resolved to a 1013 # URI. 1014 for hint in locator.hints: 1015 if hint.startswith('K@'): 1016 if len(hint) == 7: 1017 sorted_roots.append( 1018 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1019 elif len(hint) == 29: 1020 svc = self._gateway_services.get(hint[2:]) 1021 if svc: 1022 sorted_roots.append(svc['_service_root']) 1023 1024 # Sort the available local services by weight (heaviest first) 1025 # for this locator, and return their service_roots (base URIs) 1026 # in that order. 1027 use_services = self._keep_services 1028 if need_writable: 1029 use_services = self._writable_services 1030 self.using_proxy = self._any_nondisk_services(use_services) 1031 sorted_roots.extend([ 1032 svc['_service_root'] for svc in sorted( 1033 use_services, 1034 reverse=True, 1035 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1036 _logger.debug("{}: {}".format(locator, sorted_roots)) 1037 return sorted_roots 1038 1039 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1040 # roots_map is a dictionary, mapping Keep service root strings 1041 # to _KeepService objects. Poll for Keep services, and add any 1042 # new ones to roots_map. Return the current list of local 1043 # root strings. 1044 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,)) 1045 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1046 for root in local_roots: 1047 if root not in roots_map: 1048 roots_map[root] = self._KeepService( 1049 root, self._user_agent_pool, 1050 upload_counter=self.upload_counter, 1051 download_counter=self.download_counter, 1052 headers=headers, 1053 insecure=self.insecure) 1054 return local_roots 1055 1056 @staticmethod 1057 def _check_loop_result(result): 1058 # KeepClient RetryLoops should save results as a 2-tuple: the 1059 # actual result of the request, and the number of servers available 1060 # to receive the request this round. 1061 # This method returns True if there's a real result, False if 1062 # there are no more servers available, otherwise None. 1063 if isinstance(result, Exception): 1064 return None 1065 result, tried_server_count = result 1066 if (result is not None) and (result is not False): 1067 return True 1068 elif tried_server_count < 1: 1069 _logger.info("No more Keep services to try; giving up") 1070 return False 1071 else: 1072 return None 1073 1074 def get_from_cache(self, loc_s): 1075 """Fetch a block only if is in the cache, otherwise return None.""" 1076 locator = KeepLocator(loc_s) 1077 slot = self.block_cache.get(locator.md5sum) 1078 if slot is not None and slot.ready.is_set(): 1079 return slot.get() 1080 else: 1081 return None 1082 1083 def refresh_signature(self, loc): 1084 """Ask Keep to get the remote block and return its local signature""" 1085 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1086 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1087 1088 @retry.retry_method 1089 def head(self, loc_s, **kwargs): 1090 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1091 1092 @retry.retry_method 1093 def get(self, loc_s, **kwargs): 1094 return self._get_or_head(loc_s, method="GET", **kwargs) 1095 1096 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1097 """Get data from Keep. 1098 1099 This method fetches one or more blocks of data from Keep. It 1100 sends a request each Keep service registered with the API 1101 server (or the proxy provided when this client was 1102 instantiated), then each service named in location hints, in 1103 sequence. As soon as one service provides the data, it's 1104 returned. 1105 1106 Arguments: 1107 * loc_s: A string of one or more comma-separated locators to fetch. 1108 This method returns the concatenation of these blocks. 1109 * num_retries: The number of times to retry GET requests to 1110 *each* Keep server if it returns temporary failures, with 1111 exponential backoff. Note that, in each loop, the method may try 1112 to fetch data from every available Keep service, along with any 1113 that are named in location hints in the locator. The default value 1114 is set when the KeepClient is initialized. 1115 """ 1116 if ',' in loc_s: 1117 return ''.join(self.get(x) for x in loc_s.split(',')) 1118 1119 self.get_counter.add(1) 1120 1121 request_id = (request_id or 1122 (hasattr(self, 'api_client') and self.api_client.request_id) or 1123 arvados.util.new_request_id()) 1124 if headers is None: 1125 headers = {} 1126 headers['X-Request-Id'] = request_id 1127 1128 slot = None 1129 blob = None 1130 try: 1131 locator = KeepLocator(loc_s) 1132 if method == "GET": 1133 while slot is None: 1134 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1135 if first: 1136 # Fresh and empty "first time it is used" slot 1137 break 1138 if prefetch: 1139 # this is request for a prefetch to fill in 1140 # the cache, don't need to wait for the 1141 # result, so if it is already in flight return 1142 # immediately. Clear 'slot' to prevent 1143 # finally block from calling slot.set() 1144 if slot.ready.is_set(): 1145 slot.get() 1146 slot = None 1147 return None 1148 1149 blob = slot.get() 1150 if blob is not None: 1151 self.hits_counter.add(1) 1152 return blob 1153 1154 # If blob is None, this means either 1155 # 1156 # (a) another thread was fetching this block and 1157 # failed with an error or 1158 # 1159 # (b) cache thrashing caused the slot to be 1160 # evicted (content set to None) by another thread 1161 # between the call to reserve_cache() and get(). 1162 # 1163 # We'll handle these cases by reserving a new slot 1164 # and then doing a full GET request. 1165 slot = None 1166 1167 self.misses_counter.add(1) 1168 1169 # If the locator has hints specifying a prefix (indicating a 1170 # remote keepproxy) or the UUID of a local gateway service, 1171 # read data from the indicated service(s) instead of the usual 1172 # list of local disk services. 1173 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1174 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1175 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1176 for hint in locator.hints if ( 1177 hint.startswith('K@') and 1178 len(hint) == 29 and 1179 self._gateway_services.get(hint[2:]) 1180 )]) 1181 # Map root URLs to their _KeepService objects. 1182 roots_map = { 1183 root: self._KeepService(root, self._user_agent_pool, 1184 upload_counter=self.upload_counter, 1185 download_counter=self.download_counter, 1186 headers=headers, 1187 insecure=self.insecure) 1188 for root in hint_roots 1189 } 1190 1191 # See #3147 for a discussion of the loop implementation. Highlights: 1192 # * Refresh the list of Keep services after each failure, in case 1193 # it's being updated. 1194 # * Retry until we succeed, we're out of retries, or every available 1195 # service has returned permanent failure. 1196 sorted_roots = [] 1197 roots_map = {} 1198 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1199 backoff_start=2) 1200 for tries_left in loop: 1201 try: 1202 sorted_roots = self.map_new_services( 1203 roots_map, locator, 1204 force_rebuild=(tries_left < num_retries), 1205 need_writable=False, 1206 headers=headers) 1207 except Exception as error: 1208 loop.save_result(error) 1209 continue 1210 1211 # Query _KeepService objects that haven't returned 1212 # permanent failure, in our specified shuffle order. 1213 services_to_try = [roots_map[root] 1214 for root in sorted_roots 1215 if roots_map[root].usable()] 1216 for keep_service in services_to_try: 1217 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1218 if blob is not None: 1219 break 1220 loop.save_result((blob, len(services_to_try))) 1221 1222 # Always cache the result, then return it if we succeeded. 1223 if loop.success(): 1224 return blob 1225 finally: 1226 if slot is not None: 1227 self.block_cache.set(slot, blob) 1228 1229 # Q: Including 403 is necessary for the Keep tests to continue 1230 # passing, but maybe they should expect KeepReadError instead? 1231 not_founds = sum(1 for key in sorted_roots 1232 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1233 service_errors = ((key, roots_map[key].last_result()['error']) 1234 for key in sorted_roots) 1235 if not roots_map: 1236 raise arvados.errors.KeepReadError( 1237 "[{}] failed to read {}: no Keep services available ({})".format( 1238 request_id, loc_s, loop.last_result())) 1239 elif not_founds == len(sorted_roots): 1240 raise arvados.errors.NotFoundError( 1241 "[{}] {} not found".format(request_id, loc_s), service_errors) 1242 else: 1243 raise arvados.errors.KeepReadError( 1244 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1245 1246 @retry.retry_method 1247 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1248 """Save data in Keep. 1249 1250 This method will get a list of Keep services from the API server, and 1251 send the data to each one simultaneously in a new thread. Once the 1252 uploads are finished, if enough copies are saved, this method returns 1253 the most recent HTTP response body. If requests fail to upload 1254 enough copies, this method raises KeepWriteError. 1255 1256 Arguments: 1257 * data: The string of data to upload. 1258 * copies: The number of copies that the user requires be saved. 1259 Default 2. 1260 * num_retries: The number of times to retry PUT requests to 1261 *each* Keep server if it returns temporary failures, with 1262 exponential backoff. The default value is set when the 1263 KeepClient is initialized. 1264 * classes: An optional list of storage class names where copies should 1265 be written. 1266 """ 1267 1268 classes = classes or self._default_classes 1269 1270 if not isinstance(data, bytes): 1271 data = data.encode() 1272 1273 self.put_counter.add(1) 1274 1275 data_hash = hashlib.md5(data).hexdigest() 1276 loc_s = data_hash + '+' + str(len(data)) 1277 if copies < 1: 1278 return loc_s 1279 locator = KeepLocator(loc_s) 1280 1281 request_id = (request_id or 1282 (hasattr(self, 'api_client') and self.api_client.request_id) or 1283 arvados.util.new_request_id()) 1284 headers = { 1285 'X-Request-Id': request_id, 1286 'X-Keep-Desired-Replicas': str(copies), 1287 } 1288 roots_map = {} 1289 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1290 backoff_start=2) 1291 done_copies = 0 1292 done_classes = [] 1293 for tries_left in loop: 1294 try: 1295 sorted_roots = self.map_new_services( 1296 roots_map, locator, 1297 force_rebuild=(tries_left < num_retries), 1298 need_writable=True, 1299 headers=headers) 1300 except Exception as error: 1301 loop.save_result(error) 1302 continue 1303 1304 pending_classes = [] 1305 if done_classes is not None: 1306 pending_classes = list(set(classes) - set(done_classes)) 1307 writer_pool = KeepClient._KeepWriterThreadPool( 1308 data=data, 1309 data_hash=data_hash, 1310 copies=copies - done_copies, 1311 max_service_replicas=self.max_replicas_per_service, 1312 timeout=self.current_timeout(num_retries - tries_left), 1313 classes=pending_classes, 1314 ) 1315 for service_root, ks in [(root, roots_map[root]) 1316 for root in sorted_roots]: 1317 if ks.finished(): 1318 continue 1319 writer_pool.add_task(ks, service_root) 1320 writer_pool.join() 1321 pool_copies, pool_classes = writer_pool.done() 1322 done_copies += pool_copies 1323 if (done_classes is not None) and (pool_classes is not None): 1324 done_classes += pool_classes 1325 loop.save_result( 1326 (done_copies >= copies and set(done_classes) == set(classes), 1327 writer_pool.total_task_nr)) 1328 else: 1329 # Old keepstore contacted without storage classes support: 1330 # success is determined only by successful copies. 1331 # 1332 # Disable storage classes tracking from this point forward. 1333 if not self._storage_classes_unsupported_warning: 1334 self._storage_classes_unsupported_warning = True 1335 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1336 done_classes = None 1337 loop.save_result( 1338 (done_copies >= copies, writer_pool.total_task_nr)) 1339 1340 if loop.success(): 1341 return writer_pool.response() 1342 if not roots_map: 1343 raise arvados.errors.KeepWriteError( 1344 "[{}] failed to write {}: no Keep services available ({})".format( 1345 request_id, data_hash, loop.last_result())) 1346 else: 1347 service_errors = ((key, roots_map[key].last_result()['error']) 1348 for key in sorted_roots 1349 if roots_map[key].last_result()['error']) 1350 raise arvados.errors.KeepWriteError( 1351 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1352 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1353 1354 def _block_prefetch_worker(self): 1355 """The background downloader thread.""" 1356 while True: 1357 try: 1358 b = self._prefetch_queue.get() 1359 if b is None: 1360 return 1361 self.get(b, prefetch=True) 1362 except Exception: 1363 _logger.exception("Exception doing block prefetch") 1364 1365 def _start_prefetch_threads(self): 1366 if self._prefetch_threads is None: 1367 with self.lock: 1368 if self._prefetch_threads is not None: 1369 return 1370 self._prefetch_queue = queue.Queue() 1371 self._prefetch_threads = [] 1372 for i in range(0, self.num_prefetch_threads): 1373 thread = threading.Thread(target=self._block_prefetch_worker) 1374 self._prefetch_threads.append(thread) 1375 thread.daemon = True 1376 thread.start() 1377 1378 def block_prefetch(self, locator): 1379 """ 1380 This relies on the fact that KeepClient implements a block cache, 1381 so repeated requests for the same block will not result in repeated 1382 downloads (unless the block is evicted from the cache.) This method 1383 does not block. 1384 """ 1385 1386 if self.block_cache.get(locator) is not None: 1387 return 1388 1389 self._start_prefetch_threads() 1390 self._prefetch_queue.put(locator) 1391 1392 def stop_prefetch_threads(self): 1393 with self.lock: 1394 if self._prefetch_threads is not None: 1395 for t in self._prefetch_threads: 1396 self._prefetch_queue.put(None) 1397 for t in self._prefetch_threads: 1398 t.join() 1399 self._prefetch_threads = None 1400 self._prefetch_queue = None 1401 1402 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1403 """A stub for put(). 1404 1405 This method is used in place of the real put() method when 1406 using local storage (see constructor's local_store argument). 1407 1408 copies and num_retries arguments are ignored: they are here 1409 only for the sake of offering the same call signature as 1410 put(). 1411 1412 Data stored this way can be retrieved via local_store_get(). 1413 """ 1414 md5 = hashlib.md5(data).hexdigest() 1415 locator = '%s+%d' % (md5, len(data)) 1416 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1417 f.write(data) 1418 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1419 os.path.join(self.local_store, md5)) 1420 return locator 1421 1422 def local_store_get(self, loc_s, num_retries=None): 1423 """Companion to local_store_put().""" 1424 try: 1425 locator = KeepLocator(loc_s) 1426 except ValueError: 1427 raise arvados.errors.NotFoundError( 1428 "Invalid data locator: '%s'" % loc_s) 1429 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1430 return b'' 1431 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1432 return f.read() 1433 1434 def local_store_head(self, loc_s, num_retries=None): 1435 """Companion to local_store_put().""" 1436 try: 1437 locator = KeepLocator(loc_s) 1438 except ValueError: 1439 raise arvados.errors.NotFoundError( 1440 "Invalid data locator: '%s'" % loc_s) 1441 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1442 return True 1443 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1444 return True
785 def __init__(self, api_client=None, proxy=None, 786 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 787 api_token=None, local_store=None, block_cache=None, 788 num_retries=10, session=None, num_prefetch_threads=None): 789 """Initialize a new KeepClient. 790 791 Arguments: 792 :api_client: 793 The API client to use to find Keep services. If not 794 provided, KeepClient will build one from available Arvados 795 configuration. 796 797 :proxy: 798 If specified, this KeepClient will send requests to this Keep 799 proxy. Otherwise, KeepClient will fall back to the setting of the 800 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 801 If you want to KeepClient does not use a proxy, pass in an empty 802 string. 803 804 :timeout: 805 The initial timeout (in seconds) for HTTP requests to Keep 806 non-proxy servers. A tuple of three floats is interpreted as 807 (connection_timeout, read_timeout, minimum_bandwidth). A connection 808 will be aborted if the average traffic rate falls below 809 minimum_bandwidth bytes per second over an interval of read_timeout 810 seconds. Because timeouts are often a result of transient server 811 load, the actual connection timeout will be increased by a factor 812 of two on each retry. 813 Default: (2, 256, 32768). 814 815 :proxy_timeout: 816 The initial timeout (in seconds) for HTTP requests to 817 Keep proxies. A tuple of three floats is interpreted as 818 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 819 described above for adjusting connection timeouts on retry also 820 applies. 821 Default: (20, 256, 32768). 822 823 :api_token: 824 If you're not using an API client, but only talking 825 directly to a Keep proxy, this parameter specifies an API token 826 to authenticate Keep requests. It is an error to specify both 827 api_client and api_token. If you specify neither, KeepClient 828 will use one available from the Arvados configuration. 829 830 :local_store: 831 If specified, this KeepClient will bypass Keep 832 services, and save data to the named directory. If unspecified, 833 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 834 environment variable. If you want to ensure KeepClient does not 835 use local storage, pass in an empty string. This is primarily 836 intended to mock a server for testing. 837 838 :num_retries: 839 The default number of times to retry failed requests. 840 This will be used as the default num_retries value when get() and 841 put() are called. Default 10. 842 """ 843 self.lock = threading.Lock() 844 if proxy is None: 845 if config.get('ARVADOS_KEEP_SERVICES'): 846 proxy = config.get('ARVADOS_KEEP_SERVICES') 847 else: 848 proxy = config.get('ARVADOS_KEEP_PROXY') 849 if api_token is None: 850 if api_client is None: 851 api_token = config.get('ARVADOS_API_TOKEN') 852 else: 853 api_token = api_client.api_token 854 elif api_client is not None: 855 raise ValueError( 856 "can't build KeepClient with both API client and token") 857 if local_store is None: 858 local_store = os.environ.get('KEEP_LOCAL_STORE') 859 860 if api_client is None: 861 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 862 else: 863 self.insecure = api_client.insecure 864 865 self.block_cache = block_cache if block_cache else KeepBlockCache() 866 self.timeout = timeout 867 self.proxy_timeout = proxy_timeout 868 self._user_agent_pool = queue.LifoQueue() 869 self.upload_counter = _Counter() 870 self.download_counter = _Counter() 871 self.put_counter = _Counter() 872 self.get_counter = _Counter() 873 self.hits_counter = _Counter() 874 self.misses_counter = _Counter() 875 self._storage_classes_unsupported_warning = False 876 self._default_classes = [] 877 if num_prefetch_threads is not None: 878 self.num_prefetch_threads = num_prefetch_threads 879 else: 880 self.num_prefetch_threads = 2 881 self._prefetch_queue = None 882 self._prefetch_threads = None 883 884 if local_store: 885 self.local_store = local_store 886 self.head = self.local_store_head 887 self.get = self.local_store_get 888 self.put = self.local_store_put 889 else: 890 self.num_retries = num_retries 891 self.max_replicas_per_service = None 892 if proxy: 893 proxy_uris = proxy.split() 894 for i in range(len(proxy_uris)): 895 if not proxy_uris[i].endswith('/'): 896 proxy_uris[i] += '/' 897 # URL validation 898 url = urllib.parse.urlparse(proxy_uris[i]) 899 if not (url.scheme and url.netloc): 900 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 901 self.api_token = api_token 902 self._gateway_services = {} 903 self._keep_services = [{ 904 'uuid': "00000-bi6l4-%015d" % idx, 905 'service_type': 'proxy', 906 '_service_root': uri, 907 } for idx, uri in enumerate(proxy_uris)] 908 self._writable_services = self._keep_services 909 self.using_proxy = True 910 self._static_services_list = True 911 else: 912 # It's important to avoid instantiating an API client 913 # unless we actually need one, for testing's sake. 914 if api_client is None: 915 api_client = arvados.api('v1') 916 self.api_client = api_client 917 self.api_token = api_client.api_token 918 self._gateway_services = {} 919 self._keep_services = None 920 self._writable_services = None 921 self.using_proxy = None 922 self._static_services_list = False 923 try: 924 self._default_classes = [ 925 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 926 except KeyError: 927 # We're talking to an old cluster 928 pass
Initialize a new KeepClient.
Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.
:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.
:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).
:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).
:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.
:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.
:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.
930 def current_timeout(self, attempt_number): 931 """Return the appropriate timeout to use for this client. 932 933 The proxy timeout setting if the backend service is currently a proxy, 934 the regular timeout setting otherwise. The `attempt_number` indicates 935 how many times the operation has been tried already (starting from 0 936 for the first try), and scales the connection timeout portion of the 937 return value accordingly. 938 939 """ 940 # TODO(twp): the timeout should be a property of a 941 # _KeepService, not a KeepClient. See #4488. 942 t = self.proxy_timeout if self.using_proxy else self.timeout 943 if len(t) == 2: 944 return (t[0] * (1 << attempt_number), t[1]) 945 else: 946 return (t[0] * (1 << attempt_number), t[1], t[2])
Return the appropriate timeout to use for this client.
The proxy timeout setting if the backend service is currently a proxy,
the regular timeout setting otherwise. The attempt_number
indicates
how many times the operation has been tried already (starting from 0
for the first try), and scales the connection timeout portion of the
return value accordingly.
951 def build_services_list(self, force_rebuild=False): 952 if (self._static_services_list or 953 (self._keep_services and not force_rebuild)): 954 return 955 with self.lock: 956 try: 957 keep_services = self.api_client.keep_services().accessible() 958 except Exception: # API server predates Keep services. 959 keep_services = self.api_client.keep_disks().list() 960 961 # Gateway services are only used when specified by UUID, 962 # so there's nothing to gain by filtering them by 963 # service_type. 964 self._gateway_services = {ks['uuid']: ks for ks in 965 keep_services.execute()['items']} 966 if not self._gateway_services: 967 raise arvados.errors.NoKeepServersError() 968 969 # Precompute the base URI for each service. 970 for r in self._gateway_services.values(): 971 host = r['service_host'] 972 if not host.startswith('[') and host.find(':') >= 0: 973 # IPv6 URIs must be formatted like http://[::1]:80/... 974 host = '[' + host + ']' 975 r['_service_root'] = "{}://{}:{:d}/".format( 976 'https' if r['service_ssl_flag'] else 'http', 977 host, 978 r['service_port']) 979 980 _logger.debug(str(self._gateway_services)) 981 self._keep_services = [ 982 ks for ks in self._gateway_services.values() 983 if not ks.get('service_type', '').startswith('gateway:')] 984 self._writable_services = [ks for ks in self._keep_services 985 if not ks.get('read_only')] 986 987 # For disk type services, max_replicas_per_service is 1 988 # It is unknown (unlimited) for other service types. 989 if self._any_nondisk_services(self._writable_services): 990 self.max_replicas_per_service = None 991 else: 992 self.max_replicas_per_service = 1
1003 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1004 """Return an array of Keep service endpoints, in the order in 1005 which they should be probed when reading or writing data with 1006 the given hash+hints. 1007 """ 1008 self.build_services_list(force_rebuild) 1009 1010 sorted_roots = [] 1011 # Use the services indicated by the given +K@... remote 1012 # service hints, if any are present and can be resolved to a 1013 # URI. 1014 for hint in locator.hints: 1015 if hint.startswith('K@'): 1016 if len(hint) == 7: 1017 sorted_roots.append( 1018 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1019 elif len(hint) == 29: 1020 svc = self._gateway_services.get(hint[2:]) 1021 if svc: 1022 sorted_roots.append(svc['_service_root']) 1023 1024 # Sort the available local services by weight (heaviest first) 1025 # for this locator, and return their service_roots (base URIs) 1026 # in that order. 1027 use_services = self._keep_services 1028 if need_writable: 1029 use_services = self._writable_services 1030 self.using_proxy = self._any_nondisk_services(use_services) 1031 sorted_roots.extend([ 1032 svc['_service_root'] for svc in sorted( 1033 use_services, 1034 reverse=True, 1035 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1036 _logger.debug("{}: {}".format(locator, sorted_roots)) 1037 return sorted_roots
Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.
1039 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1040 # roots_map is a dictionary, mapping Keep service root strings 1041 # to _KeepService objects. Poll for Keep services, and add any 1042 # new ones to roots_map. Return the current list of local 1043 # root strings. 1044 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,)) 1045 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1046 for root in local_roots: 1047 if root not in roots_map: 1048 roots_map[root] = self._KeepService( 1049 root, self._user_agent_pool, 1050 upload_counter=self.upload_counter, 1051 download_counter=self.download_counter, 1052 headers=headers, 1053 insecure=self.insecure) 1054 return local_roots
1074 def get_from_cache(self, loc_s): 1075 """Fetch a block only if is in the cache, otherwise return None.""" 1076 locator = KeepLocator(loc_s) 1077 slot = self.block_cache.get(locator.md5sum) 1078 if slot is not None and slot.ready.is_set(): 1079 return slot.get() 1080 else: 1081 return None
Fetch a block only if is in the cache, otherwise return None.
1083 def refresh_signature(self, loc): 1084 """Ask Keep to get the remote block and return its local signature""" 1085 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1086 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
Ask Keep to get the remote block and return its local signature
1246 @retry.retry_method 1247 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1248 """Save data in Keep. 1249 1250 This method will get a list of Keep services from the API server, and 1251 send the data to each one simultaneously in a new thread. Once the 1252 uploads are finished, if enough copies are saved, this method returns 1253 the most recent HTTP response body. If requests fail to upload 1254 enough copies, this method raises KeepWriteError. 1255 1256 Arguments: 1257 * data: The string of data to upload. 1258 * copies: The number of copies that the user requires be saved. 1259 Default 2. 1260 * num_retries: The number of times to retry PUT requests to 1261 *each* Keep server if it returns temporary failures, with 1262 exponential backoff. The default value is set when the 1263 KeepClient is initialized. 1264 * classes: An optional list of storage class names where copies should 1265 be written. 1266 """ 1267 1268 classes = classes or self._default_classes 1269 1270 if not isinstance(data, bytes): 1271 data = data.encode() 1272 1273 self.put_counter.add(1) 1274 1275 data_hash = hashlib.md5(data).hexdigest() 1276 loc_s = data_hash + '+' + str(len(data)) 1277 if copies < 1: 1278 return loc_s 1279 locator = KeepLocator(loc_s) 1280 1281 request_id = (request_id or 1282 (hasattr(self, 'api_client') and self.api_client.request_id) or 1283 arvados.util.new_request_id()) 1284 headers = { 1285 'X-Request-Id': request_id, 1286 'X-Keep-Desired-Replicas': str(copies), 1287 } 1288 roots_map = {} 1289 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1290 backoff_start=2) 1291 done_copies = 0 1292 done_classes = [] 1293 for tries_left in loop: 1294 try: 1295 sorted_roots = self.map_new_services( 1296 roots_map, locator, 1297 force_rebuild=(tries_left < num_retries), 1298 need_writable=True, 1299 headers=headers) 1300 except Exception as error: 1301 loop.save_result(error) 1302 continue 1303 1304 pending_classes = [] 1305 if done_classes is not None: 1306 pending_classes = list(set(classes) - set(done_classes)) 1307 writer_pool = KeepClient._KeepWriterThreadPool( 1308 data=data, 1309 data_hash=data_hash, 1310 copies=copies - done_copies, 1311 max_service_replicas=self.max_replicas_per_service, 1312 timeout=self.current_timeout(num_retries - tries_left), 1313 classes=pending_classes, 1314 ) 1315 for service_root, ks in [(root, roots_map[root]) 1316 for root in sorted_roots]: 1317 if ks.finished(): 1318 continue 1319 writer_pool.add_task(ks, service_root) 1320 writer_pool.join() 1321 pool_copies, pool_classes = writer_pool.done() 1322 done_copies += pool_copies 1323 if (done_classes is not None) and (pool_classes is not None): 1324 done_classes += pool_classes 1325 loop.save_result( 1326 (done_copies >= copies and set(done_classes) == set(classes), 1327 writer_pool.total_task_nr)) 1328 else: 1329 # Old keepstore contacted without storage classes support: 1330 # success is determined only by successful copies. 1331 # 1332 # Disable storage classes tracking from this point forward. 1333 if not self._storage_classes_unsupported_warning: 1334 self._storage_classes_unsupported_warning = True 1335 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1336 done_classes = None 1337 loop.save_result( 1338 (done_copies >= copies, writer_pool.total_task_nr)) 1339 1340 if loop.success(): 1341 return writer_pool.response() 1342 if not roots_map: 1343 raise arvados.errors.KeepWriteError( 1344 "[{}] failed to write {}: no Keep services available ({})".format( 1345 request_id, data_hash, loop.last_result())) 1346 else: 1347 service_errors = ((key, roots_map[key].last_result()['error']) 1348 for key in sorted_roots 1349 if roots_map[key].last_result()['error']) 1350 raise arvados.errors.KeepWriteError( 1351 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1352 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
Save data in Keep.
This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.
Arguments:
- data: The string of data to upload.
- copies: The number of copies that the user requires be saved. Default 2.
- num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
- classes: An optional list of storage class names where copies should be written.
1378 def block_prefetch(self, locator): 1379 """ 1380 This relies on the fact that KeepClient implements a block cache, 1381 so repeated requests for the same block will not result in repeated 1382 downloads (unless the block is evicted from the cache.) This method 1383 does not block. 1384 """ 1385 1386 if self.block_cache.get(locator) is not None: 1387 return 1388 1389 self._start_prefetch_threads() 1390 self._prefetch_queue.put(locator)
This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.
1402 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1403 """A stub for put(). 1404 1405 This method is used in place of the real put() method when 1406 using local storage (see constructor's local_store argument). 1407 1408 copies and num_retries arguments are ignored: they are here 1409 only for the sake of offering the same call signature as 1410 put(). 1411 1412 Data stored this way can be retrieved via local_store_get(). 1413 """ 1414 md5 = hashlib.md5(data).hexdigest() 1415 locator = '%s+%d' % (md5, len(data)) 1416 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1417 f.write(data) 1418 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1419 os.path.join(self.local_store, md5)) 1420 return locator
A stub for put().
This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).
copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().
Data stored this way can be retrieved via local_store_get().
1422 def local_store_get(self, loc_s, num_retries=None): 1423 """Companion to local_store_put().""" 1424 try: 1425 locator = KeepLocator(loc_s) 1426 except ValueError: 1427 raise arvados.errors.NotFoundError( 1428 "Invalid data locator: '%s'" % loc_s) 1429 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1430 return b'' 1431 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1432 return f.read()
Companion to local_store_put().
1434 def local_store_head(self, loc_s, num_retries=None): 1435 """Companion to local_store_put().""" 1436 try: 1437 locator = KeepLocator(loc_s) 1438 except ValueError: 1439 raise arvados.errors.NotFoundError( 1440 "Invalid data locator: '%s'" % loc_s) 1441 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1442 return True 1443 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1444 return True
Companion to local_store_put().