arvados.keep
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import copy 6import collections 7import datetime 8import hashlib 9import errno 10import io 11import logging 12import math 13import os 14import pycurl 15import queue 16import re 17import socket 18import ssl 19import sys 20import threading 21import resource 22import urllib.parse 23import traceback 24import weakref 25 26from io import BytesIO 27 28import arvados 29import arvados.config as config 30import arvados.errors 31import arvados.retry as retry 32import arvados.util 33 34from ._internal import basedirs, diskcache, Timer 35from ._internal.pycurl import PyCurlHelper 36 37_logger = logging.getLogger('arvados.keep') 38global_client_object = None 39 40# Monkey patch TCP constants when not available (apple). Values sourced from: 41# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h 42if sys.platform == 'darwin': 43 if not hasattr(socket, 'TCP_KEEPALIVE'): 44 socket.TCP_KEEPALIVE = 0x010 45 if not hasattr(socket, 'TCP_KEEPINTVL'): 46 socket.TCP_KEEPINTVL = 0x101 47 if not hasattr(socket, 'TCP_KEEPCNT'): 48 socket.TCP_KEEPCNT = 0x102 49 50class KeepLocator(object): 51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 53 54 def __init__(self, locator_str): 55 self.hints = [] 56 self._perm_sig = None 57 self._perm_expiry = None 58 pieces = iter(locator_str.split('+')) 59 self.md5sum = next(pieces) 60 try: 61 self.size = int(next(pieces)) 62 except StopIteration: 63 self.size = None 64 for hint in pieces: 65 if self.HINT_RE.match(hint) is None: 66 raise ValueError("invalid hint format: {}".format(hint)) 67 elif hint.startswith('A'): 68 self.parse_permission_hint(hint) 69 else: 70 self.hints.append(hint) 71 72 def __str__(self): 73 return '+'.join( 74 str(s) 75 for s in [self.md5sum, self.size, 76 self.permission_hint()] + self.hints 77 if s is not None) 78 79 def stripped(self): 80 if self.size is not None: 81 return "%s+%i" % (self.md5sum, self.size) 82 else: 83 return self.md5sum 84 85 def _make_hex_prop(name, length): 86 # Build and return a new property with the given name that 87 # must be a hex string of the given length. 88 data_name = '_{}'.format(name) 89 def getter(self): 90 return getattr(self, data_name) 91 def setter(self, hex_str): 92 if not arvados.util.is_hex(hex_str, length): 93 raise ValueError("{} is not a {}-digit hex string: {!r}". 94 format(name, length, hex_str)) 95 setattr(self, data_name, hex_str) 96 return property(getter, setter) 97 98 md5sum = _make_hex_prop('md5sum', 32) 99 perm_sig = _make_hex_prop('perm_sig', 40) 100 101 @property 102 def perm_expiry(self): 103 return self._perm_expiry 104 105 @perm_expiry.setter 106 def perm_expiry(self, value): 107 if not arvados.util.is_hex(value, 1, 8): 108 raise ValueError( 109 "permission timestamp must be a hex Unix timestamp: {}". 110 format(value)) 111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 112 113 def permission_hint(self): 114 data = [self.perm_sig, self.perm_expiry] 115 if None in data: 116 return None 117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 118 return "A{}@{:08x}".format(*data) 119 120 def parse_permission_hint(self, s): 121 try: 122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 123 except IndexError: 124 raise ValueError("bad permission hint {}".format(s)) 125 126 def permission_expired(self, as_of_dt=None): 127 if self.perm_expiry is None: 128 return False 129 elif as_of_dt is None: 130 as_of_dt = datetime.datetime.now() 131 return self.perm_expiry <= as_of_dt 132 133 134class KeepBlockCache(object): 135 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 136 self.cache_max = cache_max 137 self._cache = collections.OrderedDict() 138 self._cache_lock = threading.Lock() 139 self._max_slots = max_slots 140 self._disk_cache = disk_cache 141 self._disk_cache_dir = disk_cache_dir 142 self._cache_updating = threading.Condition(self._cache_lock) 143 144 if self._disk_cache and self._disk_cache_dir is None: 145 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep')) 146 147 if self._max_slots == 0: 148 if self._disk_cache: 149 # Each block uses two file descriptors, one used to 150 # open it initially and hold the flock(), and a second 151 # hidden one used by mmap(). 152 # 153 # Set max slots to 1/8 of maximum file handles. This 154 # means we'll use at most 1/4 of total file handles. 155 # 156 # NOFILE typically defaults to 1024 on Linux so this 157 # is 128 slots (256 file handles), which means we can 158 # cache up to 8 GiB of 64 MiB blocks. This leaves 159 # 768 file handles for sockets and other stuff. 160 # 161 # When we want the ability to have more cache (e.g. in 162 # arv-mount) we'll increase rlimit before calling 163 # this. 164 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 165 else: 166 # RAM cache slots 167 self._max_slots = 512 168 169 if self.cache_max == 0: 170 if self._disk_cache: 171 fs = os.statvfs(self._disk_cache_dir) 172 # Calculation of available space incorporates existing cache usage 173 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 174 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 175 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 176 # pick smallest of: 177 # 10% of total disk size 178 # 25% of available space 179 # max_slots * 64 MiB 180 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 181 else: 182 # 256 MiB in RAM 183 self.cache_max = (256 * 1024 * 1024) 184 185 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 186 187 self.cache_total = 0 188 if self._disk_cache: 189 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 190 for slot in self._cache.values(): 191 self.cache_total += slot.size() 192 self.cap_cache() 193 194 class _CacheSlot: 195 __slots__ = ("locator", "ready", "content") 196 197 def __init__(self, locator): 198 self.locator = locator 199 self.ready = threading.Event() 200 self.content = None 201 202 def get(self): 203 self.ready.wait() 204 return self.content 205 206 def set(self, value): 207 if self.content is not None: 208 return False 209 self.content = value 210 self.ready.set() 211 return True 212 213 def size(self): 214 if self.content is None: 215 return 0 216 else: 217 return len(self.content) 218 219 def evict(self): 220 self.content = None 221 222 223 def _resize_cache(self, cache_max, max_slots): 224 # Try and make sure the contents of the cache do not exceed 225 # the supplied maximums. 226 227 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 228 return 229 230 _evict_candidates = collections.deque(self._cache.values()) 231 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 232 slot = _evict_candidates.popleft() 233 if not slot.ready.is_set(): 234 continue 235 236 sz = slot.size() 237 slot.evict() 238 self.cache_total -= sz 239 del self._cache[slot.locator] 240 241 242 def cap_cache(self): 243 '''Cap the cache size to self.cache_max''' 244 with self._cache_updating: 245 self._resize_cache(self.cache_max, self._max_slots) 246 self._cache_updating.notify_all() 247 248 def _get(self, locator): 249 # Test if the locator is already in the cache 250 if locator in self._cache: 251 n = self._cache[locator] 252 if n.ready.is_set() and n.content is None: 253 del self._cache[n.locator] 254 return None 255 self._cache.move_to_end(locator) 256 return n 257 if self._disk_cache: 258 # see if it exists on disk 259 n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 260 if n is not None: 261 self._cache[n.locator] = n 262 self.cache_total += n.size() 263 return n 264 return None 265 266 def get(self, locator): 267 with self._cache_lock: 268 return self._get(locator) 269 270 def reserve_cache(self, locator): 271 '''Reserve a cache slot for the specified locator, 272 or return the existing slot.''' 273 with self._cache_updating: 274 n = self._get(locator) 275 if n: 276 return n, False 277 else: 278 # Add a new cache slot for the locator 279 self._resize_cache(self.cache_max, self._max_slots-1) 280 while len(self._cache) >= self._max_slots: 281 # If there isn't a slot available, need to wait 282 # for something to happen that releases one of the 283 # cache slots. Idle for 200 ms or woken up by 284 # another thread 285 self._cache_updating.wait(timeout=0.2) 286 self._resize_cache(self.cache_max, self._max_slots-1) 287 288 if self._disk_cache: 289 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 290 else: 291 n = KeepBlockCache._CacheSlot(locator) 292 self._cache[n.locator] = n 293 return n, True 294 295 def set(self, slot, blob): 296 try: 297 if slot.set(blob): 298 self.cache_total += slot.size() 299 return 300 except OSError as e: 301 if e.errno == errno.ENOMEM: 302 # Reduce max slots to current - 4, cap cache and retry 303 with self._cache_lock: 304 self._max_slots = max(4, len(self._cache) - 4) 305 elif e.errno == errno.ENOSPC: 306 # Reduce disk max space to current - 256 MiB, cap cache and retry 307 with self._cache_lock: 308 sm = sum(st.size() for st in self._cache.values()) 309 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 310 elif e.errno == errno.ENODEV: 311 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 312 except Exception as e: 313 pass 314 finally: 315 # Check if we should evict things from the cache. Either 316 # because we added a new thing or there was an error and 317 # we possibly adjusted the limits down, so we might need 318 # to push something out. 319 self.cap_cache() 320 321 try: 322 # Only gets here if there was an error the first time. The 323 # exception handler adjusts limits downward in some cases 324 # to free up resources, which would make the operation 325 # succeed. 326 if slot.set(blob): 327 self.cache_total += slot.size() 328 except Exception as e: 329 # It failed again. Give up. 330 slot.set(None) 331 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 332 333 self.cap_cache() 334 335 def clear(self): 336 with self._cache_lock: 337 self._cache.clear() 338 self.cache_total = 0 339 340class _Counter: 341 def __init__(self, v=0): 342 self._lk = threading.Lock() 343 self._val = v 344 345 def add(self, v): 346 with self._lk: 347 self._val += v 348 349 def get(self): 350 with self._lk: 351 return self._val 352 353 354class KeepClient(object): 355 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 356 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 357 358 class _KeepService(PyCurlHelper): 359 """Make requests to a single Keep service, and track results. 360 361 A _KeepService is intended to last long enough to perform one 362 transaction (GET or PUT) against one Keep service. This can 363 involve calling either get() or put() multiple times in order 364 to retry after transient failures. However, calling both get() 365 and put() on a single instance -- or using the same instance 366 to access two different Keep services -- will not produce 367 sensible behavior. 368 """ 369 370 HTTP_ERRORS = ( 371 socket.error, 372 ssl.SSLError, 373 arvados.errors.HttpError, 374 ) 375 376 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 377 upload_counter=None, 378 download_counter=None, 379 headers={}, 380 insecure=False): 381 super().__init__() 382 self.root = root 383 self._user_agent_pool = user_agent_pool 384 self._result = {'error': None} 385 self._usable = True 386 self._session = None 387 self._socket = None 388 self.get_headers = {'Accept': 'application/octet-stream'} 389 self.get_headers.update(headers) 390 self.put_headers = headers 391 self.upload_counter = upload_counter 392 self.download_counter = download_counter 393 self.insecure = insecure 394 395 def usable(self): 396 """Is it worth attempting a request?""" 397 return self._usable 398 399 def finished(self): 400 """Did the request succeed or encounter permanent failure?""" 401 return self._result['error'] == False or not self._usable 402 403 def last_result(self): 404 return self._result 405 406 def _get_user_agent(self): 407 try: 408 return self._user_agent_pool.get(block=False) 409 except queue.Empty: 410 return pycurl.Curl() 411 412 def _put_user_agent(self, ua): 413 try: 414 ua.reset() 415 self._user_agent_pool.put(ua, block=False) 416 except: 417 ua.close() 418 419 def get(self, locator, method="GET", timeout=None): 420 # locator is a KeepLocator object. 421 url = self.root + str(locator) 422 _logger.debug("Request: %s %s", method, url) 423 curl = self._get_user_agent() 424 ok = None 425 try: 426 with Timer() as t: 427 self._headers = {} 428 response_body = BytesIO() 429 curl.setopt(pycurl.NOSIGNAL, 1) 430 curl.setopt(pycurl.OPENSOCKETFUNCTION, 431 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 432 curl.setopt(pycurl.URL, url.encode('utf-8')) 433 curl.setopt(pycurl.HTTPHEADER, [ 434 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 435 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 436 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 437 if self.insecure: 438 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 439 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 440 else: 441 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 442 if method == "HEAD": 443 curl.setopt(pycurl.NOBODY, True) 444 else: 445 curl.setopt(pycurl.HTTPGET, True) 446 self._setcurltimeouts(curl, timeout, method=="HEAD") 447 448 try: 449 curl.perform() 450 except Exception as e: 451 raise arvados.errors.HttpError(0, str(e)) 452 finally: 453 if self._socket: 454 self._socket.close() 455 self._socket = None 456 self._result = { 457 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 458 'body': response_body.getvalue(), 459 'headers': self._headers, 460 'error': False, 461 } 462 463 ok = retry.check_http_response_success(self._result['status_code']) 464 if not ok: 465 self._result['error'] = arvados.errors.HttpError( 466 self._result['status_code'], 467 self._headers.get('x-status-line', 'Error')) 468 except self.HTTP_ERRORS as e: 469 self._result = { 470 'error': e, 471 } 472 self._usable = ok != False 473 if self._result.get('status_code', None): 474 # The client worked well enough to get an HTTP status 475 # code, so presumably any problems are just on the 476 # server side and it's OK to reuse the client. 477 self._put_user_agent(curl) 478 else: 479 # Don't return this client to the pool, in case it's 480 # broken. 481 curl.close() 482 if not ok: 483 _logger.debug("Request fail: GET %s => %s: %s", 484 url, type(self._result['error']), str(self._result['error'])) 485 return None 486 if method == "HEAD": 487 _logger.info("HEAD %s: %s bytes", 488 self._result['status_code'], 489 self._result.get('content-length')) 490 if self._result['headers'].get('x-keep-locator'): 491 # This is a response to a remote block copy request, return 492 # the local copy block locator. 493 return self._result['headers'].get('x-keep-locator') 494 return True 495 496 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 497 self._result['status_code'], 498 len(self._result['body']), 499 t.msecs, 500 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 501 502 if self.download_counter: 503 self.download_counter.add(len(self._result['body'])) 504 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 505 if resp_md5 != locator.md5sum: 506 _logger.warning("Checksum fail: md5(%s) = %s", 507 url, resp_md5) 508 self._result['error'] = arvados.errors.HttpError( 509 0, 'Checksum fail') 510 return None 511 return self._result['body'] 512 513 def put(self, hash_s, body, timeout=None, headers={}): 514 put_headers = copy.copy(self.put_headers) 515 put_headers.update(headers) 516 url = self.root + hash_s 517 _logger.debug("Request: PUT %s", url) 518 curl = self._get_user_agent() 519 ok = None 520 try: 521 with Timer() as t: 522 self._headers = {} 523 body_reader = BytesIO(body) 524 response_body = BytesIO() 525 curl.setopt(pycurl.NOSIGNAL, 1) 526 curl.setopt(pycurl.OPENSOCKETFUNCTION, 527 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 528 curl.setopt(pycurl.URL, url.encode('utf-8')) 529 # Using UPLOAD tells cURL to wait for a "go ahead" from the 530 # Keep server (in the form of a HTTP/1.1 "100 Continue" 531 # response) instead of sending the request body immediately. 532 # This allows the server to reject the request if the request 533 # is invalid or the server is read-only, without waiting for 534 # the client to send the entire block. 535 curl.setopt(pycurl.UPLOAD, True) 536 curl.setopt(pycurl.INFILESIZE, len(body)) 537 curl.setopt(pycurl.READFUNCTION, body_reader.read) 538 curl.setopt(pycurl.HTTPHEADER, [ 539 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 540 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 541 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 542 if self.insecure: 543 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 544 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 545 else: 546 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 547 self._setcurltimeouts(curl, timeout) 548 try: 549 curl.perform() 550 except Exception as e: 551 raise arvados.errors.HttpError(0, str(e)) 552 finally: 553 if self._socket: 554 self._socket.close() 555 self._socket = None 556 self._result = { 557 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 558 'body': response_body.getvalue().decode('utf-8'), 559 'headers': self._headers, 560 'error': False, 561 } 562 ok = retry.check_http_response_success(self._result['status_code']) 563 if not ok: 564 self._result['error'] = arvados.errors.HttpError( 565 self._result['status_code'], 566 self._headers.get('x-status-line', 'Error')) 567 except self.HTTP_ERRORS as e: 568 self._result = { 569 'error': e, 570 } 571 self._usable = ok != False # still usable if ok is True or None 572 if self._result.get('status_code', None): 573 # Client is functional. See comment in get(). 574 self._put_user_agent(curl) 575 else: 576 curl.close() 577 if not ok: 578 _logger.debug("Request fail: PUT %s => %s: %s", 579 url, type(self._result['error']), str(self._result['error'])) 580 return False 581 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 582 self._result['status_code'], 583 len(body), 584 t.msecs, 585 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 586 if self.upload_counter: 587 self.upload_counter.add(len(body)) 588 return True 589 590 591 class _KeepWriterQueue(queue.Queue): 592 def __init__(self, copies, classes=[]): 593 queue.Queue.__init__(self) # Old-style superclass 594 self.wanted_copies = copies 595 self.wanted_storage_classes = classes 596 self.successful_copies = 0 597 self.confirmed_storage_classes = {} 598 self.response = None 599 self.storage_classes_tracking = True 600 self.queue_data_lock = threading.RLock() 601 self.pending_tries = max(copies, len(classes)) 602 self.pending_tries_notification = threading.Condition() 603 604 def write_success(self, response, replicas_nr, classes_confirmed): 605 with self.queue_data_lock: 606 self.successful_copies += replicas_nr 607 if classes_confirmed is None: 608 self.storage_classes_tracking = False 609 elif self.storage_classes_tracking: 610 for st_class, st_copies in classes_confirmed.items(): 611 try: 612 self.confirmed_storage_classes[st_class] += st_copies 613 except KeyError: 614 self.confirmed_storage_classes[st_class] = st_copies 615 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 616 self.response = response 617 with self.pending_tries_notification: 618 self.pending_tries_notification.notify_all() 619 620 def write_fail(self, ks): 621 with self.pending_tries_notification: 622 self.pending_tries += 1 623 self.pending_tries_notification.notify() 624 625 def pending_copies(self): 626 with self.queue_data_lock: 627 return self.wanted_copies - self.successful_copies 628 629 def satisfied_classes(self): 630 with self.queue_data_lock: 631 if not self.storage_classes_tracking: 632 # Notifies disabled storage classes expectation to 633 # the outer loop. 634 return None 635 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 636 637 def pending_classes(self): 638 with self.queue_data_lock: 639 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 640 return [] 641 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 642 for st_class, st_copies in self.confirmed_storage_classes.items(): 643 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 644 unsatisfied_classes.remove(st_class) 645 return unsatisfied_classes 646 647 def get_next_task(self): 648 with self.pending_tries_notification: 649 while True: 650 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 651 # This notify_all() is unnecessary -- 652 # write_success() already called notify_all() 653 # when pending<1 became true, so it's not 654 # possible for any other thread to be in 655 # wait() now -- but it's cheap insurance 656 # against deadlock so we do it anyway: 657 self.pending_tries_notification.notify_all() 658 # Drain the queue and then raise Queue.Empty 659 while True: 660 self.get_nowait() 661 self.task_done() 662 elif self.pending_tries > 0: 663 service, service_root = self.get_nowait() 664 if service.finished(): 665 self.task_done() 666 continue 667 self.pending_tries -= 1 668 return service, service_root 669 elif self.empty(): 670 self.pending_tries_notification.notify_all() 671 raise queue.Empty 672 else: 673 self.pending_tries_notification.wait() 674 675 676 class _KeepWriterThreadPool: 677 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 678 self.total_task_nr = 0 679 if (not max_service_replicas) or (max_service_replicas >= copies): 680 num_threads = 1 681 else: 682 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 683 _logger.debug("Pool max threads is %d", num_threads) 684 self.workers = [] 685 self.queue = KeepClient._KeepWriterQueue(copies, classes) 686 # Create workers 687 for _ in range(num_threads): 688 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout) 689 self.workers.append(w) 690 691 def add_task(self, ks, service_root): 692 self.queue.put((ks, service_root)) 693 self.total_task_nr += 1 694 695 def done(self): 696 return self.queue.successful_copies, self.queue.satisfied_classes() 697 698 def join(self): 699 # Start workers 700 for worker in self.workers: 701 worker.start() 702 # Wait for finished work 703 self.queue.join() 704 705 def response(self): 706 return self.queue.response 707 708 709 class _KeepWriterThread(threading.Thread): 710 class TaskFailed(RuntimeError): 711 """Exception for failed Keep writes 712 713 TODO: Move this class to the module top level and document it 714 715 @private 716 """ 717 718 719 def __init__(self, queue, data, data_hash, timeout=None): 720 super().__init__() 721 self.timeout = timeout 722 self.queue = queue 723 self.data = data 724 self.data_hash = data_hash 725 self.daemon = True 726 727 def run(self): 728 while True: 729 try: 730 service, service_root = self.queue.get_next_task() 731 except queue.Empty: 732 return 733 try: 734 locator, copies, classes = self.do_task(service, service_root) 735 except Exception as e: 736 if not isinstance(e, self.TaskFailed): 737 _logger.exception("Exception in _KeepWriterThread") 738 self.queue.write_fail(service) 739 else: 740 self.queue.write_success(locator, copies, classes) 741 finally: 742 self.queue.task_done() 743 744 def do_task(self, service, service_root): 745 classes = self.queue.pending_classes() 746 headers = {} 747 if len(classes) > 0: 748 classes.sort() 749 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 750 success = bool(service.put(self.data_hash, 751 self.data, 752 timeout=self.timeout, 753 headers=headers)) 754 result = service.last_result() 755 756 if not success: 757 if result.get('status_code'): 758 _logger.debug("Request fail: PUT %s => %s %s", 759 self.data_hash, 760 result.get('status_code'), 761 result.get('body')) 762 raise self.TaskFailed() 763 764 _logger.debug("_KeepWriterThread %s succeeded %s+%i %s", 765 str(threading.current_thread()), 766 self.data_hash, 767 len(self.data), 768 service_root) 769 try: 770 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 771 except (KeyError, ValueError): 772 replicas_stored = 1 773 774 classes_confirmed = {} 775 try: 776 scch = result['headers']['x-keep-storage-classes-confirmed'] 777 for confirmation in scch.replace(' ', '').split(','): 778 if '=' in confirmation: 779 stored_class, stored_copies = confirmation.split('=')[:2] 780 classes_confirmed[stored_class] = int(stored_copies) 781 except (KeyError, ValueError): 782 # Storage classes confirmed header missing or corrupt 783 classes_confirmed = None 784 785 return result['body'].strip(), replicas_stored, classes_confirmed 786 787 788 def __init__(self, api_client=None, proxy=None, 789 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 790 api_token=None, local_store=None, block_cache=None, 791 num_retries=10, session=None, num_prefetch_threads=None): 792 """Initialize a new KeepClient. 793 794 Arguments: 795 :api_client: 796 The API client to use to find Keep services. If not 797 provided, KeepClient will build one from available Arvados 798 configuration. 799 800 :proxy: 801 If specified, this KeepClient will send requests to this Keep 802 proxy. Otherwise, KeepClient will fall back to the setting of the 803 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 804 If you want to KeepClient does not use a proxy, pass in an empty 805 string. 806 807 :timeout: 808 The initial timeout (in seconds) for HTTP requests to Keep 809 non-proxy servers. A tuple of three floats is interpreted as 810 (connection_timeout, read_timeout, minimum_bandwidth). A connection 811 will be aborted if the average traffic rate falls below 812 minimum_bandwidth bytes per second over an interval of read_timeout 813 seconds. Because timeouts are often a result of transient server 814 load, the actual connection timeout will be increased by a factor 815 of two on each retry. 816 Default: (2, 256, 32768). 817 818 :proxy_timeout: 819 The initial timeout (in seconds) for HTTP requests to 820 Keep proxies. A tuple of three floats is interpreted as 821 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 822 described above for adjusting connection timeouts on retry also 823 applies. 824 Default: (20, 256, 32768). 825 826 :api_token: 827 If you're not using an API client, but only talking 828 directly to a Keep proxy, this parameter specifies an API token 829 to authenticate Keep requests. It is an error to specify both 830 api_client and api_token. If you specify neither, KeepClient 831 will use one available from the Arvados configuration. 832 833 :local_store: 834 If specified, this KeepClient will bypass Keep 835 services, and save data to the named directory. If unspecified, 836 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 837 environment variable. If you want to ensure KeepClient does not 838 use local storage, pass in an empty string. This is primarily 839 intended to mock a server for testing. 840 841 :num_retries: 842 The default number of times to retry failed requests. 843 This will be used as the default num_retries value when get() and 844 put() are called. Default 10. 845 """ 846 self.lock = threading.Lock() 847 if proxy is None: 848 if config.get('ARVADOS_KEEP_SERVICES'): 849 proxy = config.get('ARVADOS_KEEP_SERVICES') 850 else: 851 proxy = config.get('ARVADOS_KEEP_PROXY') 852 if api_token is None: 853 if api_client is None: 854 api_token = config.get('ARVADOS_API_TOKEN') 855 else: 856 api_token = api_client.api_token 857 elif api_client is not None: 858 raise ValueError( 859 "can't build KeepClient with both API client and token") 860 if local_store is None: 861 local_store = os.environ.get('KEEP_LOCAL_STORE') 862 863 if api_client is None: 864 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 865 else: 866 self.insecure = api_client.insecure 867 868 self.block_cache = block_cache if block_cache else KeepBlockCache() 869 self.timeout = timeout 870 self.proxy_timeout = proxy_timeout 871 self._user_agent_pool = queue.LifoQueue() 872 self.upload_counter = _Counter() 873 self.download_counter = _Counter() 874 self.put_counter = _Counter() 875 self.get_counter = _Counter() 876 self.hits_counter = _Counter() 877 self.misses_counter = _Counter() 878 self._storage_classes_unsupported_warning = False 879 self._default_classes = [] 880 if num_prefetch_threads is not None: 881 self.num_prefetch_threads = num_prefetch_threads 882 else: 883 self.num_prefetch_threads = 2 884 self._prefetch_queue = None 885 self._prefetch_threads = None 886 887 if local_store: 888 self.local_store = local_store 889 self.head = self.local_store_head 890 self.get = self.local_store_get 891 self.put = self.local_store_put 892 else: 893 self.num_retries = num_retries 894 self.max_replicas_per_service = None 895 if proxy: 896 proxy_uris = proxy.split() 897 for i in range(len(proxy_uris)): 898 if not proxy_uris[i].endswith('/'): 899 proxy_uris[i] += '/' 900 # URL validation 901 url = urllib.parse.urlparse(proxy_uris[i]) 902 if not (url.scheme and url.netloc): 903 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 904 self.api_token = api_token 905 self._gateway_services = {} 906 self._keep_services = [{ 907 'uuid': "00000-bi6l4-%015d" % idx, 908 'service_type': 'proxy', 909 '_service_root': uri, 910 } for idx, uri in enumerate(proxy_uris)] 911 self._writable_services = self._keep_services 912 self.using_proxy = True 913 self._static_services_list = True 914 else: 915 # It's important to avoid instantiating an API client 916 # unless we actually need one, for testing's sake. 917 if api_client is None: 918 api_client = arvados.api('v1') 919 self.api_client = api_client 920 self.api_token = api_client.api_token 921 self._gateway_services = {} 922 self._keep_services = None 923 self._writable_services = None 924 self.using_proxy = None 925 self._static_services_list = False 926 try: 927 self._default_classes = [ 928 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 929 except KeyError: 930 # We're talking to an old cluster 931 pass 932 933 def current_timeout(self, attempt_number): 934 """Return the appropriate timeout to use for this client. 935 936 The proxy timeout setting if the backend service is currently a proxy, 937 the regular timeout setting otherwise. The `attempt_number` indicates 938 how many times the operation has been tried already (starting from 0 939 for the first try), and scales the connection timeout portion of the 940 return value accordingly. 941 942 """ 943 # TODO(twp): the timeout should be a property of a 944 # _KeepService, not a KeepClient. See #4488. 945 t = self.proxy_timeout if self.using_proxy else self.timeout 946 if len(t) == 2: 947 return (t[0] * (1 << attempt_number), t[1]) 948 else: 949 return (t[0] * (1 << attempt_number), t[1], t[2]) 950 def _any_nondisk_services(self, service_list): 951 return any(ks.get('service_type', 'disk') != 'disk' 952 for ks in service_list) 953 954 def build_services_list(self, force_rebuild=False): 955 if (self._static_services_list or 956 (self._keep_services and not force_rebuild)): 957 return 958 with self.lock: 959 try: 960 keep_services = self.api_client.keep_services().accessible() 961 except Exception: # API server predates Keep services. 962 keep_services = self.api_client.keep_disks().list() 963 964 # Gateway services are only used when specified by UUID, 965 # so there's nothing to gain by filtering them by 966 # service_type. 967 self._gateway_services = {ks['uuid']: ks for ks in 968 keep_services.execute()['items']} 969 if not self._gateway_services: 970 raise arvados.errors.NoKeepServersError() 971 972 # Precompute the base URI for each service. 973 for r in self._gateway_services.values(): 974 host = r['service_host'] 975 if not host.startswith('[') and host.find(':') >= 0: 976 # IPv6 URIs must be formatted like http://[::1]:80/... 977 host = '[' + host + ']' 978 r['_service_root'] = "{}://{}:{:d}/".format( 979 'https' if r['service_ssl_flag'] else 'http', 980 host, 981 r['service_port']) 982 983 _logger.debug(str(self._gateway_services)) 984 self._keep_services = [ 985 ks for ks in self._gateway_services.values() 986 if not ks.get('service_type', '').startswith('gateway:')] 987 self._writable_services = [ks for ks in self._keep_services 988 if not ks.get('read_only')] 989 990 # For disk type services, max_replicas_per_service is 1 991 # It is unknown (unlimited) for other service types. 992 if self._any_nondisk_services(self._writable_services): 993 self.max_replicas_per_service = None 994 else: 995 self.max_replicas_per_service = 1 996 997 def _service_weight(self, data_hash, service_uuid): 998 """Compute the weight of a Keep service endpoint for a data 999 block with a known hash. 1000 1001 The weight is md5(h + u) where u is the last 15 characters of 1002 the service endpoint's UUID. 1003 """ 1004 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1005 1006 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1007 """Return an array of Keep service endpoints, in the order in 1008 which they should be probed when reading or writing data with 1009 the given hash+hints. 1010 """ 1011 self.build_services_list(force_rebuild) 1012 1013 sorted_roots = [] 1014 # Use the services indicated by the given +K@... remote 1015 # service hints, if any are present and can be resolved to a 1016 # URI. 1017 for hint in locator.hints: 1018 if hint.startswith('K@'): 1019 if len(hint) == 7: 1020 sorted_roots.append( 1021 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1022 elif len(hint) == 29: 1023 svc = self._gateway_services.get(hint[2:]) 1024 if svc: 1025 sorted_roots.append(svc['_service_root']) 1026 1027 # Sort the available local services by weight (heaviest first) 1028 # for this locator, and return their service_roots (base URIs) 1029 # in that order. 1030 use_services = self._keep_services 1031 if need_writable: 1032 use_services = self._writable_services 1033 self.using_proxy = self._any_nondisk_services(use_services) 1034 sorted_roots.extend([ 1035 svc['_service_root'] for svc in sorted( 1036 use_services, 1037 reverse=True, 1038 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1039 _logger.debug("{}: {}".format(locator, sorted_roots)) 1040 return sorted_roots 1041 1042 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1043 # roots_map is a dictionary, mapping Keep service root strings 1044 # to _KeepService objects. Poll for Keep services, and add any 1045 # new ones to roots_map. Return the current list of local 1046 # root strings. 1047 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,)) 1048 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1049 for root in local_roots: 1050 if root not in roots_map: 1051 roots_map[root] = self._KeepService( 1052 root, self._user_agent_pool, 1053 upload_counter=self.upload_counter, 1054 download_counter=self.download_counter, 1055 headers=headers, 1056 insecure=self.insecure) 1057 return local_roots 1058 1059 @staticmethod 1060 def _check_loop_result(result): 1061 # KeepClient RetryLoops should save results as a 2-tuple: the 1062 # actual result of the request, and the number of servers available 1063 # to receive the request this round. 1064 # This method returns True if there's a real result, False if 1065 # there are no more servers available, otherwise None. 1066 if isinstance(result, Exception): 1067 return None 1068 result, tried_server_count = result 1069 if (result is not None) and (result is not False): 1070 return True 1071 elif tried_server_count < 1: 1072 _logger.info("No more Keep services to try; giving up") 1073 return False 1074 else: 1075 return None 1076 1077 def get_from_cache(self, loc_s): 1078 """Fetch a block only if is in the cache, otherwise return None.""" 1079 locator = KeepLocator(loc_s) 1080 slot = self.block_cache.get(locator.md5sum) 1081 if slot is not None and slot.ready.is_set(): 1082 return slot.get() 1083 else: 1084 return None 1085 1086 def refresh_signature(self, loc): 1087 """Ask Keep to get the remote block and return its local signature""" 1088 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1089 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1090 1091 @retry.retry_method 1092 def head(self, loc_s, **kwargs): 1093 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1094 1095 @retry.retry_method 1096 def get(self, loc_s, **kwargs): 1097 return self._get_or_head(loc_s, method="GET", **kwargs) 1098 1099 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1100 """Get data from Keep. 1101 1102 This method fetches one or more blocks of data from Keep. It 1103 sends a request each Keep service registered with the API 1104 server (or the proxy provided when this client was 1105 instantiated), then each service named in location hints, in 1106 sequence. As soon as one service provides the data, it's 1107 returned. 1108 1109 Arguments: 1110 * loc_s: A string of one or more comma-separated locators to fetch. 1111 This method returns the concatenation of these blocks. 1112 * num_retries: The number of times to retry GET requests to 1113 *each* Keep server if it returns temporary failures, with 1114 exponential backoff. Note that, in each loop, the method may try 1115 to fetch data from every available Keep service, along with any 1116 that are named in location hints in the locator. The default value 1117 is set when the KeepClient is initialized. 1118 """ 1119 if ',' in loc_s: 1120 return ''.join(self.get(x) for x in loc_s.split(',')) 1121 1122 self.get_counter.add(1) 1123 1124 request_id = (request_id or 1125 (hasattr(self, 'api_client') and self.api_client.request_id) or 1126 arvados.util.new_request_id()) 1127 if headers is None: 1128 headers = {} 1129 headers['X-Request-Id'] = request_id 1130 1131 slot = None 1132 blob = None 1133 try: 1134 locator = KeepLocator(loc_s) 1135 if method == "GET": 1136 while slot is None: 1137 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1138 if first: 1139 # Fresh and empty "first time it is used" slot 1140 break 1141 if prefetch: 1142 # this is request for a prefetch to fill in 1143 # the cache, don't need to wait for the 1144 # result, so if it is already in flight return 1145 # immediately. Clear 'slot' to prevent 1146 # finally block from calling slot.set() 1147 if slot.ready.is_set(): 1148 slot.get() 1149 slot = None 1150 return None 1151 1152 blob = slot.get() 1153 if blob is not None: 1154 self.hits_counter.add(1) 1155 return blob 1156 1157 # If blob is None, this means either 1158 # 1159 # (a) another thread was fetching this block and 1160 # failed with an error or 1161 # 1162 # (b) cache thrashing caused the slot to be 1163 # evicted (content set to None) by another thread 1164 # between the call to reserve_cache() and get(). 1165 # 1166 # We'll handle these cases by reserving a new slot 1167 # and then doing a full GET request. 1168 slot = None 1169 1170 self.misses_counter.add(1) 1171 1172 # If the locator has hints specifying a prefix (indicating a 1173 # remote keepproxy) or the UUID of a local gateway service, 1174 # read data from the indicated service(s) instead of the usual 1175 # list of local disk services. 1176 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1177 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1178 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1179 for hint in locator.hints if ( 1180 hint.startswith('K@') and 1181 len(hint) == 29 and 1182 self._gateway_services.get(hint[2:]) 1183 )]) 1184 # Map root URLs to their _KeepService objects. 1185 roots_map = { 1186 root: self._KeepService(root, self._user_agent_pool, 1187 upload_counter=self.upload_counter, 1188 download_counter=self.download_counter, 1189 headers=headers, 1190 insecure=self.insecure) 1191 for root in hint_roots 1192 } 1193 1194 # See #3147 for a discussion of the loop implementation. Highlights: 1195 # * Refresh the list of Keep services after each failure, in case 1196 # it's being updated. 1197 # * Retry until we succeed, we're out of retries, or every available 1198 # service has returned permanent failure. 1199 sorted_roots = [] 1200 roots_map = {} 1201 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1202 backoff_start=2) 1203 for tries_left in loop: 1204 try: 1205 sorted_roots = self.map_new_services( 1206 roots_map, locator, 1207 force_rebuild=(tries_left < num_retries), 1208 need_writable=False, 1209 headers=headers) 1210 except Exception as error: 1211 loop.save_result(error) 1212 continue 1213 1214 # Query _KeepService objects that haven't returned 1215 # permanent failure, in our specified shuffle order. 1216 services_to_try = [roots_map[root] 1217 for root in sorted_roots 1218 if roots_map[root].usable()] 1219 for keep_service in services_to_try: 1220 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1221 if blob is not None: 1222 break 1223 loop.save_result((blob, len(services_to_try))) 1224 1225 # Always cache the result, then return it if we succeeded. 1226 if loop.success(): 1227 return blob 1228 finally: 1229 if slot is not None: 1230 self.block_cache.set(slot, blob) 1231 1232 # Q: Including 403 is necessary for the Keep tests to continue 1233 # passing, but maybe they should expect KeepReadError instead? 1234 not_founds = sum(1 for key in sorted_roots 1235 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1236 service_errors = ((key, roots_map[key].last_result()['error']) 1237 for key in sorted_roots) 1238 if not roots_map: 1239 raise arvados.errors.KeepReadError( 1240 "[{}] failed to read {}: no Keep services available ({})".format( 1241 request_id, loc_s, loop.last_result())) 1242 elif not_founds == len(sorted_roots): 1243 raise arvados.errors.NotFoundError( 1244 "[{}] {} not found".format(request_id, loc_s), service_errors) 1245 else: 1246 raise arvados.errors.KeepReadError( 1247 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1248 1249 @retry.retry_method 1250 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1251 """Save data in Keep. 1252 1253 This method will get a list of Keep services from the API server, and 1254 send the data to each one simultaneously in a new thread. Once the 1255 uploads are finished, if enough copies are saved, this method returns 1256 the most recent HTTP response body. If requests fail to upload 1257 enough copies, this method raises KeepWriteError. 1258 1259 Arguments: 1260 * data: The string of data to upload. 1261 * copies: The number of copies that the user requires be saved. 1262 Default 2. 1263 * num_retries: The number of times to retry PUT requests to 1264 *each* Keep server if it returns temporary failures, with 1265 exponential backoff. The default value is set when the 1266 KeepClient is initialized. 1267 * classes: An optional list of storage class names where copies should 1268 be written. 1269 """ 1270 1271 classes = classes or self._default_classes 1272 1273 if not isinstance(data, bytes): 1274 data = data.encode() 1275 1276 self.put_counter.add(1) 1277 1278 data_hash = hashlib.md5(data).hexdigest() 1279 loc_s = data_hash + '+' + str(len(data)) 1280 if copies < 1: 1281 return loc_s 1282 locator = KeepLocator(loc_s) 1283 1284 request_id = (request_id or 1285 (hasattr(self, 'api_client') and self.api_client.request_id) or 1286 arvados.util.new_request_id()) 1287 headers = { 1288 'X-Request-Id': request_id, 1289 'X-Keep-Desired-Replicas': str(copies), 1290 } 1291 roots_map = {} 1292 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1293 backoff_start=2) 1294 done_copies = 0 1295 done_classes = [] 1296 for tries_left in loop: 1297 try: 1298 sorted_roots = self.map_new_services( 1299 roots_map, locator, 1300 force_rebuild=(tries_left < num_retries), 1301 need_writable=True, 1302 headers=headers) 1303 except Exception as error: 1304 loop.save_result(error) 1305 continue 1306 1307 pending_classes = [] 1308 if done_classes is not None: 1309 pending_classes = list(set(classes) - set(done_classes)) 1310 writer_pool = KeepClient._KeepWriterThreadPool( 1311 data=data, 1312 data_hash=data_hash, 1313 copies=copies - done_copies, 1314 max_service_replicas=self.max_replicas_per_service, 1315 timeout=self.current_timeout(num_retries - tries_left), 1316 classes=pending_classes, 1317 ) 1318 for service_root, ks in [(root, roots_map[root]) 1319 for root in sorted_roots]: 1320 if ks.finished(): 1321 continue 1322 writer_pool.add_task(ks, service_root) 1323 writer_pool.join() 1324 pool_copies, pool_classes = writer_pool.done() 1325 done_copies += pool_copies 1326 if (done_classes is not None) and (pool_classes is not None): 1327 done_classes += pool_classes 1328 loop.save_result( 1329 (done_copies >= copies and set(done_classes) == set(classes), 1330 writer_pool.total_task_nr)) 1331 else: 1332 # Old keepstore contacted without storage classes support: 1333 # success is determined only by successful copies. 1334 # 1335 # Disable storage classes tracking from this point forward. 1336 if not self._storage_classes_unsupported_warning: 1337 self._storage_classes_unsupported_warning = True 1338 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1339 done_classes = None 1340 loop.save_result( 1341 (done_copies >= copies, writer_pool.total_task_nr)) 1342 1343 if loop.success(): 1344 return writer_pool.response() 1345 if not roots_map: 1346 raise arvados.errors.KeepWriteError( 1347 "[{}] failed to write {}: no Keep services available ({})".format( 1348 request_id, data_hash, loop.last_result())) 1349 else: 1350 service_errors = ((key, roots_map[key].last_result()['error']) 1351 for key in sorted_roots 1352 if roots_map[key].last_result()['error']) 1353 raise arvados.errors.KeepWriteError( 1354 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1355 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1356 1357 def _block_prefetch_worker(self): 1358 """The background downloader thread.""" 1359 while True: 1360 try: 1361 b = self._prefetch_queue.get() 1362 if b is None: 1363 return 1364 self.get(b, prefetch=True) 1365 except Exception: 1366 _logger.exception("Exception doing block prefetch") 1367 1368 def _start_prefetch_threads(self): 1369 if self._prefetch_threads is None: 1370 with self.lock: 1371 if self._prefetch_threads is not None: 1372 return 1373 self._prefetch_queue = queue.Queue() 1374 self._prefetch_threads = [] 1375 for i in range(0, self.num_prefetch_threads): 1376 thread = threading.Thread(target=self._block_prefetch_worker) 1377 self._prefetch_threads.append(thread) 1378 thread.daemon = True 1379 thread.start() 1380 1381 def block_prefetch(self, locator): 1382 """ 1383 This relies on the fact that KeepClient implements a block cache, 1384 so repeated requests for the same block will not result in repeated 1385 downloads (unless the block is evicted from the cache.) This method 1386 does not block. 1387 """ 1388 1389 if self.block_cache.get(locator) is not None: 1390 return 1391 1392 self._start_prefetch_threads() 1393 self._prefetch_queue.put(locator) 1394 1395 def stop_prefetch_threads(self): 1396 with self.lock: 1397 if self._prefetch_threads is not None: 1398 for t in self._prefetch_threads: 1399 self._prefetch_queue.put(None) 1400 for t in self._prefetch_threads: 1401 t.join() 1402 self._prefetch_threads = None 1403 self._prefetch_queue = None 1404 1405 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1406 """A stub for put(). 1407 1408 This method is used in place of the real put() method when 1409 using local storage (see constructor's local_store argument). 1410 1411 copies and num_retries arguments are ignored: they are here 1412 only for the sake of offering the same call signature as 1413 put(). 1414 1415 Data stored this way can be retrieved via local_store_get(). 1416 """ 1417 md5 = hashlib.md5(data).hexdigest() 1418 locator = '%s+%d' % (md5, len(data)) 1419 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1420 f.write(data) 1421 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1422 os.path.join(self.local_store, md5)) 1423 return locator 1424 1425 def local_store_get(self, loc_s, num_retries=None): 1426 """Companion to local_store_put().""" 1427 try: 1428 locator = KeepLocator(loc_s) 1429 except ValueError: 1430 raise arvados.errors.NotFoundError( 1431 "Invalid data locator: '%s'" % loc_s) 1432 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1433 return b'' 1434 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1435 return f.read() 1436 1437 def local_store_head(self, loc_s, num_retries=None): 1438 """Companion to local_store_put().""" 1439 try: 1440 locator = KeepLocator(loc_s) 1441 except ValueError: 1442 raise arvados.errors.NotFoundError( 1443 "Invalid data locator: '%s'" % loc_s) 1444 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1445 return True 1446 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1447 return True
51class KeepLocator(object): 52 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 53 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 54 55 def __init__(self, locator_str): 56 self.hints = [] 57 self._perm_sig = None 58 self._perm_expiry = None 59 pieces = iter(locator_str.split('+')) 60 self.md5sum = next(pieces) 61 try: 62 self.size = int(next(pieces)) 63 except StopIteration: 64 self.size = None 65 for hint in pieces: 66 if self.HINT_RE.match(hint) is None: 67 raise ValueError("invalid hint format: {}".format(hint)) 68 elif hint.startswith('A'): 69 self.parse_permission_hint(hint) 70 else: 71 self.hints.append(hint) 72 73 def __str__(self): 74 return '+'.join( 75 str(s) 76 for s in [self.md5sum, self.size, 77 self.permission_hint()] + self.hints 78 if s is not None) 79 80 def stripped(self): 81 if self.size is not None: 82 return "%s+%i" % (self.md5sum, self.size) 83 else: 84 return self.md5sum 85 86 def _make_hex_prop(name, length): 87 # Build and return a new property with the given name that 88 # must be a hex string of the given length. 89 data_name = '_{}'.format(name) 90 def getter(self): 91 return getattr(self, data_name) 92 def setter(self, hex_str): 93 if not arvados.util.is_hex(hex_str, length): 94 raise ValueError("{} is not a {}-digit hex string: {!r}". 95 format(name, length, hex_str)) 96 setattr(self, data_name, hex_str) 97 return property(getter, setter) 98 99 md5sum = _make_hex_prop('md5sum', 32) 100 perm_sig = _make_hex_prop('perm_sig', 40) 101 102 @property 103 def perm_expiry(self): 104 return self._perm_expiry 105 106 @perm_expiry.setter 107 def perm_expiry(self, value): 108 if not arvados.util.is_hex(value, 1, 8): 109 raise ValueError( 110 "permission timestamp must be a hex Unix timestamp: {}". 111 format(value)) 112 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 113 114 def permission_hint(self): 115 data = [self.perm_sig, self.perm_expiry] 116 if None in data: 117 return None 118 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 119 return "A{}@{:08x}".format(*data) 120 121 def parse_permission_hint(self, s): 122 try: 123 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 124 except IndexError: 125 raise ValueError("bad permission hint {}".format(s)) 126 127 def permission_expired(self, as_of_dt=None): 128 if self.perm_expiry is None: 129 return False 130 elif as_of_dt is None: 131 as_of_dt = datetime.datetime.now() 132 return self.perm_expiry <= as_of_dt
55 def __init__(self, locator_str): 56 self.hints = [] 57 self._perm_sig = None 58 self._perm_expiry = None 59 pieces = iter(locator_str.split('+')) 60 self.md5sum = next(pieces) 61 try: 62 self.size = int(next(pieces)) 63 except StopIteration: 64 self.size = None 65 for hint in pieces: 66 if self.HINT_RE.match(hint) is None: 67 raise ValueError("invalid hint format: {}".format(hint)) 68 elif hint.startswith('A'): 69 self.parse_permission_hint(hint) 70 else: 71 self.hints.append(hint)
135class KeepBlockCache(object): 136 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 137 self.cache_max = cache_max 138 self._cache = collections.OrderedDict() 139 self._cache_lock = threading.Lock() 140 self._max_slots = max_slots 141 self._disk_cache = disk_cache 142 self._disk_cache_dir = disk_cache_dir 143 self._cache_updating = threading.Condition(self._cache_lock) 144 145 if self._disk_cache and self._disk_cache_dir is None: 146 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep')) 147 148 if self._max_slots == 0: 149 if self._disk_cache: 150 # Each block uses two file descriptors, one used to 151 # open it initially and hold the flock(), and a second 152 # hidden one used by mmap(). 153 # 154 # Set max slots to 1/8 of maximum file handles. This 155 # means we'll use at most 1/4 of total file handles. 156 # 157 # NOFILE typically defaults to 1024 on Linux so this 158 # is 128 slots (256 file handles), which means we can 159 # cache up to 8 GiB of 64 MiB blocks. This leaves 160 # 768 file handles for sockets and other stuff. 161 # 162 # When we want the ability to have more cache (e.g. in 163 # arv-mount) we'll increase rlimit before calling 164 # this. 165 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 166 else: 167 # RAM cache slots 168 self._max_slots = 512 169 170 if self.cache_max == 0: 171 if self._disk_cache: 172 fs = os.statvfs(self._disk_cache_dir) 173 # Calculation of available space incorporates existing cache usage 174 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 175 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 176 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 177 # pick smallest of: 178 # 10% of total disk size 179 # 25% of available space 180 # max_slots * 64 MiB 181 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 182 else: 183 # 256 MiB in RAM 184 self.cache_max = (256 * 1024 * 1024) 185 186 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 187 188 self.cache_total = 0 189 if self._disk_cache: 190 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 191 for slot in self._cache.values(): 192 self.cache_total += slot.size() 193 self.cap_cache() 194 195 class _CacheSlot: 196 __slots__ = ("locator", "ready", "content") 197 198 def __init__(self, locator): 199 self.locator = locator 200 self.ready = threading.Event() 201 self.content = None 202 203 def get(self): 204 self.ready.wait() 205 return self.content 206 207 def set(self, value): 208 if self.content is not None: 209 return False 210 self.content = value 211 self.ready.set() 212 return True 213 214 def size(self): 215 if self.content is None: 216 return 0 217 else: 218 return len(self.content) 219 220 def evict(self): 221 self.content = None 222 223 224 def _resize_cache(self, cache_max, max_slots): 225 # Try and make sure the contents of the cache do not exceed 226 # the supplied maximums. 227 228 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 229 return 230 231 _evict_candidates = collections.deque(self._cache.values()) 232 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 233 slot = _evict_candidates.popleft() 234 if not slot.ready.is_set(): 235 continue 236 237 sz = slot.size() 238 slot.evict() 239 self.cache_total -= sz 240 del self._cache[slot.locator] 241 242 243 def cap_cache(self): 244 '''Cap the cache size to self.cache_max''' 245 with self._cache_updating: 246 self._resize_cache(self.cache_max, self._max_slots) 247 self._cache_updating.notify_all() 248 249 def _get(self, locator): 250 # Test if the locator is already in the cache 251 if locator in self._cache: 252 n = self._cache[locator] 253 if n.ready.is_set() and n.content is None: 254 del self._cache[n.locator] 255 return None 256 self._cache.move_to_end(locator) 257 return n 258 if self._disk_cache: 259 # see if it exists on disk 260 n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 261 if n is not None: 262 self._cache[n.locator] = n 263 self.cache_total += n.size() 264 return n 265 return None 266 267 def get(self, locator): 268 with self._cache_lock: 269 return self._get(locator) 270 271 def reserve_cache(self, locator): 272 '''Reserve a cache slot for the specified locator, 273 or return the existing slot.''' 274 with self._cache_updating: 275 n = self._get(locator) 276 if n: 277 return n, False 278 else: 279 # Add a new cache slot for the locator 280 self._resize_cache(self.cache_max, self._max_slots-1) 281 while len(self._cache) >= self._max_slots: 282 # If there isn't a slot available, need to wait 283 # for something to happen that releases one of the 284 # cache slots. Idle for 200 ms or woken up by 285 # another thread 286 self._cache_updating.wait(timeout=0.2) 287 self._resize_cache(self.cache_max, self._max_slots-1) 288 289 if self._disk_cache: 290 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 291 else: 292 n = KeepBlockCache._CacheSlot(locator) 293 self._cache[n.locator] = n 294 return n, True 295 296 def set(self, slot, blob): 297 try: 298 if slot.set(blob): 299 self.cache_total += slot.size() 300 return 301 except OSError as e: 302 if e.errno == errno.ENOMEM: 303 # Reduce max slots to current - 4, cap cache and retry 304 with self._cache_lock: 305 self._max_slots = max(4, len(self._cache) - 4) 306 elif e.errno == errno.ENOSPC: 307 # Reduce disk max space to current - 256 MiB, cap cache and retry 308 with self._cache_lock: 309 sm = sum(st.size() for st in self._cache.values()) 310 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 311 elif e.errno == errno.ENODEV: 312 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 313 except Exception as e: 314 pass 315 finally: 316 # Check if we should evict things from the cache. Either 317 # because we added a new thing or there was an error and 318 # we possibly adjusted the limits down, so we might need 319 # to push something out. 320 self.cap_cache() 321 322 try: 323 # Only gets here if there was an error the first time. The 324 # exception handler adjusts limits downward in some cases 325 # to free up resources, which would make the operation 326 # succeed. 327 if slot.set(blob): 328 self.cache_total += slot.size() 329 except Exception as e: 330 # It failed again. Give up. 331 slot.set(None) 332 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 333 334 self.cap_cache() 335 336 def clear(self): 337 with self._cache_lock: 338 self._cache.clear() 339 self.cache_total = 0
136 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 137 self.cache_max = cache_max 138 self._cache = collections.OrderedDict() 139 self._cache_lock = threading.Lock() 140 self._max_slots = max_slots 141 self._disk_cache = disk_cache 142 self._disk_cache_dir = disk_cache_dir 143 self._cache_updating = threading.Condition(self._cache_lock) 144 145 if self._disk_cache and self._disk_cache_dir is None: 146 self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep')) 147 148 if self._max_slots == 0: 149 if self._disk_cache: 150 # Each block uses two file descriptors, one used to 151 # open it initially and hold the flock(), and a second 152 # hidden one used by mmap(). 153 # 154 # Set max slots to 1/8 of maximum file handles. This 155 # means we'll use at most 1/4 of total file handles. 156 # 157 # NOFILE typically defaults to 1024 on Linux so this 158 # is 128 slots (256 file handles), which means we can 159 # cache up to 8 GiB of 64 MiB blocks. This leaves 160 # 768 file handles for sockets and other stuff. 161 # 162 # When we want the ability to have more cache (e.g. in 163 # arv-mount) we'll increase rlimit before calling 164 # this. 165 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 166 else: 167 # RAM cache slots 168 self._max_slots = 512 169 170 if self.cache_max == 0: 171 if self._disk_cache: 172 fs = os.statvfs(self._disk_cache_dir) 173 # Calculation of available space incorporates existing cache usage 174 existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 175 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 176 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 177 # pick smallest of: 178 # 10% of total disk size 179 # 25% of available space 180 # max_slots * 64 MiB 181 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 182 else: 183 # 256 MiB in RAM 184 self.cache_max = (256 * 1024 * 1024) 185 186 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 187 188 self.cache_total = 0 189 if self._disk_cache: 190 self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 191 for slot in self._cache.values(): 192 self.cache_total += slot.size() 193 self.cap_cache()
243 def cap_cache(self): 244 '''Cap the cache size to self.cache_max''' 245 with self._cache_updating: 246 self._resize_cache(self.cache_max, self._max_slots) 247 self._cache_updating.notify_all()
Cap the cache size to self.cache_max
271 def reserve_cache(self, locator): 272 '''Reserve a cache slot for the specified locator, 273 or return the existing slot.''' 274 with self._cache_updating: 275 n = self._get(locator) 276 if n: 277 return n, False 278 else: 279 # Add a new cache slot for the locator 280 self._resize_cache(self.cache_max, self._max_slots-1) 281 while len(self._cache) >= self._max_slots: 282 # If there isn't a slot available, need to wait 283 # for something to happen that releases one of the 284 # cache slots. Idle for 200 ms or woken up by 285 # another thread 286 self._cache_updating.wait(timeout=0.2) 287 self._resize_cache(self.cache_max, self._max_slots-1) 288 289 if self._disk_cache: 290 n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 291 else: 292 n = KeepBlockCache._CacheSlot(locator) 293 self._cache[n.locator] = n 294 return n, True
Reserve a cache slot for the specified locator, or return the existing slot.
296 def set(self, slot, blob): 297 try: 298 if slot.set(blob): 299 self.cache_total += slot.size() 300 return 301 except OSError as e: 302 if e.errno == errno.ENOMEM: 303 # Reduce max slots to current - 4, cap cache and retry 304 with self._cache_lock: 305 self._max_slots = max(4, len(self._cache) - 4) 306 elif e.errno == errno.ENOSPC: 307 # Reduce disk max space to current - 256 MiB, cap cache and retry 308 with self._cache_lock: 309 sm = sum(st.size() for st in self._cache.values()) 310 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 311 elif e.errno == errno.ENODEV: 312 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 313 except Exception as e: 314 pass 315 finally: 316 # Check if we should evict things from the cache. Either 317 # because we added a new thing or there was an error and 318 # we possibly adjusted the limits down, so we might need 319 # to push something out. 320 self.cap_cache() 321 322 try: 323 # Only gets here if there was an error the first time. The 324 # exception handler adjusts limits downward in some cases 325 # to free up resources, which would make the operation 326 # succeed. 327 if slot.set(blob): 328 self.cache_total += slot.size() 329 except Exception as e: 330 # It failed again. Give up. 331 slot.set(None) 332 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 333 334 self.cap_cache()
355class KeepClient(object): 356 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 357 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 358 359 class _KeepService(PyCurlHelper): 360 """Make requests to a single Keep service, and track results. 361 362 A _KeepService is intended to last long enough to perform one 363 transaction (GET or PUT) against one Keep service. This can 364 involve calling either get() or put() multiple times in order 365 to retry after transient failures. However, calling both get() 366 and put() on a single instance -- or using the same instance 367 to access two different Keep services -- will not produce 368 sensible behavior. 369 """ 370 371 HTTP_ERRORS = ( 372 socket.error, 373 ssl.SSLError, 374 arvados.errors.HttpError, 375 ) 376 377 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 378 upload_counter=None, 379 download_counter=None, 380 headers={}, 381 insecure=False): 382 super().__init__() 383 self.root = root 384 self._user_agent_pool = user_agent_pool 385 self._result = {'error': None} 386 self._usable = True 387 self._session = None 388 self._socket = None 389 self.get_headers = {'Accept': 'application/octet-stream'} 390 self.get_headers.update(headers) 391 self.put_headers = headers 392 self.upload_counter = upload_counter 393 self.download_counter = download_counter 394 self.insecure = insecure 395 396 def usable(self): 397 """Is it worth attempting a request?""" 398 return self._usable 399 400 def finished(self): 401 """Did the request succeed or encounter permanent failure?""" 402 return self._result['error'] == False or not self._usable 403 404 def last_result(self): 405 return self._result 406 407 def _get_user_agent(self): 408 try: 409 return self._user_agent_pool.get(block=False) 410 except queue.Empty: 411 return pycurl.Curl() 412 413 def _put_user_agent(self, ua): 414 try: 415 ua.reset() 416 self._user_agent_pool.put(ua, block=False) 417 except: 418 ua.close() 419 420 def get(self, locator, method="GET", timeout=None): 421 # locator is a KeepLocator object. 422 url = self.root + str(locator) 423 _logger.debug("Request: %s %s", method, url) 424 curl = self._get_user_agent() 425 ok = None 426 try: 427 with Timer() as t: 428 self._headers = {} 429 response_body = BytesIO() 430 curl.setopt(pycurl.NOSIGNAL, 1) 431 curl.setopt(pycurl.OPENSOCKETFUNCTION, 432 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 433 curl.setopt(pycurl.URL, url.encode('utf-8')) 434 curl.setopt(pycurl.HTTPHEADER, [ 435 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 436 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 437 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 438 if self.insecure: 439 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 440 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 441 else: 442 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 443 if method == "HEAD": 444 curl.setopt(pycurl.NOBODY, True) 445 else: 446 curl.setopt(pycurl.HTTPGET, True) 447 self._setcurltimeouts(curl, timeout, method=="HEAD") 448 449 try: 450 curl.perform() 451 except Exception as e: 452 raise arvados.errors.HttpError(0, str(e)) 453 finally: 454 if self._socket: 455 self._socket.close() 456 self._socket = None 457 self._result = { 458 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 459 'body': response_body.getvalue(), 460 'headers': self._headers, 461 'error': False, 462 } 463 464 ok = retry.check_http_response_success(self._result['status_code']) 465 if not ok: 466 self._result['error'] = arvados.errors.HttpError( 467 self._result['status_code'], 468 self._headers.get('x-status-line', 'Error')) 469 except self.HTTP_ERRORS as e: 470 self._result = { 471 'error': e, 472 } 473 self._usable = ok != False 474 if self._result.get('status_code', None): 475 # The client worked well enough to get an HTTP status 476 # code, so presumably any problems are just on the 477 # server side and it's OK to reuse the client. 478 self._put_user_agent(curl) 479 else: 480 # Don't return this client to the pool, in case it's 481 # broken. 482 curl.close() 483 if not ok: 484 _logger.debug("Request fail: GET %s => %s: %s", 485 url, type(self._result['error']), str(self._result['error'])) 486 return None 487 if method == "HEAD": 488 _logger.info("HEAD %s: %s bytes", 489 self._result['status_code'], 490 self._result.get('content-length')) 491 if self._result['headers'].get('x-keep-locator'): 492 # This is a response to a remote block copy request, return 493 # the local copy block locator. 494 return self._result['headers'].get('x-keep-locator') 495 return True 496 497 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 498 self._result['status_code'], 499 len(self._result['body']), 500 t.msecs, 501 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 502 503 if self.download_counter: 504 self.download_counter.add(len(self._result['body'])) 505 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 506 if resp_md5 != locator.md5sum: 507 _logger.warning("Checksum fail: md5(%s) = %s", 508 url, resp_md5) 509 self._result['error'] = arvados.errors.HttpError( 510 0, 'Checksum fail') 511 return None 512 return self._result['body'] 513 514 def put(self, hash_s, body, timeout=None, headers={}): 515 put_headers = copy.copy(self.put_headers) 516 put_headers.update(headers) 517 url = self.root + hash_s 518 _logger.debug("Request: PUT %s", url) 519 curl = self._get_user_agent() 520 ok = None 521 try: 522 with Timer() as t: 523 self._headers = {} 524 body_reader = BytesIO(body) 525 response_body = BytesIO() 526 curl.setopt(pycurl.NOSIGNAL, 1) 527 curl.setopt(pycurl.OPENSOCKETFUNCTION, 528 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 529 curl.setopt(pycurl.URL, url.encode('utf-8')) 530 # Using UPLOAD tells cURL to wait for a "go ahead" from the 531 # Keep server (in the form of a HTTP/1.1 "100 Continue" 532 # response) instead of sending the request body immediately. 533 # This allows the server to reject the request if the request 534 # is invalid or the server is read-only, without waiting for 535 # the client to send the entire block. 536 curl.setopt(pycurl.UPLOAD, True) 537 curl.setopt(pycurl.INFILESIZE, len(body)) 538 curl.setopt(pycurl.READFUNCTION, body_reader.read) 539 curl.setopt(pycurl.HTTPHEADER, [ 540 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 541 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 542 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 543 if self.insecure: 544 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 545 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 546 else: 547 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 548 self._setcurltimeouts(curl, timeout) 549 try: 550 curl.perform() 551 except Exception as e: 552 raise arvados.errors.HttpError(0, str(e)) 553 finally: 554 if self._socket: 555 self._socket.close() 556 self._socket = None 557 self._result = { 558 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 559 'body': response_body.getvalue().decode('utf-8'), 560 'headers': self._headers, 561 'error': False, 562 } 563 ok = retry.check_http_response_success(self._result['status_code']) 564 if not ok: 565 self._result['error'] = arvados.errors.HttpError( 566 self._result['status_code'], 567 self._headers.get('x-status-line', 'Error')) 568 except self.HTTP_ERRORS as e: 569 self._result = { 570 'error': e, 571 } 572 self._usable = ok != False # still usable if ok is True or None 573 if self._result.get('status_code', None): 574 # Client is functional. See comment in get(). 575 self._put_user_agent(curl) 576 else: 577 curl.close() 578 if not ok: 579 _logger.debug("Request fail: PUT %s => %s: %s", 580 url, type(self._result['error']), str(self._result['error'])) 581 return False 582 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 583 self._result['status_code'], 584 len(body), 585 t.msecs, 586 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 587 if self.upload_counter: 588 self.upload_counter.add(len(body)) 589 return True 590 591 592 class _KeepWriterQueue(queue.Queue): 593 def __init__(self, copies, classes=[]): 594 queue.Queue.__init__(self) # Old-style superclass 595 self.wanted_copies = copies 596 self.wanted_storage_classes = classes 597 self.successful_copies = 0 598 self.confirmed_storage_classes = {} 599 self.response = None 600 self.storage_classes_tracking = True 601 self.queue_data_lock = threading.RLock() 602 self.pending_tries = max(copies, len(classes)) 603 self.pending_tries_notification = threading.Condition() 604 605 def write_success(self, response, replicas_nr, classes_confirmed): 606 with self.queue_data_lock: 607 self.successful_copies += replicas_nr 608 if classes_confirmed is None: 609 self.storage_classes_tracking = False 610 elif self.storage_classes_tracking: 611 for st_class, st_copies in classes_confirmed.items(): 612 try: 613 self.confirmed_storage_classes[st_class] += st_copies 614 except KeyError: 615 self.confirmed_storage_classes[st_class] = st_copies 616 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 617 self.response = response 618 with self.pending_tries_notification: 619 self.pending_tries_notification.notify_all() 620 621 def write_fail(self, ks): 622 with self.pending_tries_notification: 623 self.pending_tries += 1 624 self.pending_tries_notification.notify() 625 626 def pending_copies(self): 627 with self.queue_data_lock: 628 return self.wanted_copies - self.successful_copies 629 630 def satisfied_classes(self): 631 with self.queue_data_lock: 632 if not self.storage_classes_tracking: 633 # Notifies disabled storage classes expectation to 634 # the outer loop. 635 return None 636 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 637 638 def pending_classes(self): 639 with self.queue_data_lock: 640 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 641 return [] 642 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 643 for st_class, st_copies in self.confirmed_storage_classes.items(): 644 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 645 unsatisfied_classes.remove(st_class) 646 return unsatisfied_classes 647 648 def get_next_task(self): 649 with self.pending_tries_notification: 650 while True: 651 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 652 # This notify_all() is unnecessary -- 653 # write_success() already called notify_all() 654 # when pending<1 became true, so it's not 655 # possible for any other thread to be in 656 # wait() now -- but it's cheap insurance 657 # against deadlock so we do it anyway: 658 self.pending_tries_notification.notify_all() 659 # Drain the queue and then raise Queue.Empty 660 while True: 661 self.get_nowait() 662 self.task_done() 663 elif self.pending_tries > 0: 664 service, service_root = self.get_nowait() 665 if service.finished(): 666 self.task_done() 667 continue 668 self.pending_tries -= 1 669 return service, service_root 670 elif self.empty(): 671 self.pending_tries_notification.notify_all() 672 raise queue.Empty 673 else: 674 self.pending_tries_notification.wait() 675 676 677 class _KeepWriterThreadPool: 678 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 679 self.total_task_nr = 0 680 if (not max_service_replicas) or (max_service_replicas >= copies): 681 num_threads = 1 682 else: 683 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 684 _logger.debug("Pool max threads is %d", num_threads) 685 self.workers = [] 686 self.queue = KeepClient._KeepWriterQueue(copies, classes) 687 # Create workers 688 for _ in range(num_threads): 689 w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout) 690 self.workers.append(w) 691 692 def add_task(self, ks, service_root): 693 self.queue.put((ks, service_root)) 694 self.total_task_nr += 1 695 696 def done(self): 697 return self.queue.successful_copies, self.queue.satisfied_classes() 698 699 def join(self): 700 # Start workers 701 for worker in self.workers: 702 worker.start() 703 # Wait for finished work 704 self.queue.join() 705 706 def response(self): 707 return self.queue.response 708 709 710 class _KeepWriterThread(threading.Thread): 711 class TaskFailed(RuntimeError): 712 """Exception for failed Keep writes 713 714 TODO: Move this class to the module top level and document it 715 716 @private 717 """ 718 719 720 def __init__(self, queue, data, data_hash, timeout=None): 721 super().__init__() 722 self.timeout = timeout 723 self.queue = queue 724 self.data = data 725 self.data_hash = data_hash 726 self.daemon = True 727 728 def run(self): 729 while True: 730 try: 731 service, service_root = self.queue.get_next_task() 732 except queue.Empty: 733 return 734 try: 735 locator, copies, classes = self.do_task(service, service_root) 736 except Exception as e: 737 if not isinstance(e, self.TaskFailed): 738 _logger.exception("Exception in _KeepWriterThread") 739 self.queue.write_fail(service) 740 else: 741 self.queue.write_success(locator, copies, classes) 742 finally: 743 self.queue.task_done() 744 745 def do_task(self, service, service_root): 746 classes = self.queue.pending_classes() 747 headers = {} 748 if len(classes) > 0: 749 classes.sort() 750 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 751 success = bool(service.put(self.data_hash, 752 self.data, 753 timeout=self.timeout, 754 headers=headers)) 755 result = service.last_result() 756 757 if not success: 758 if result.get('status_code'): 759 _logger.debug("Request fail: PUT %s => %s %s", 760 self.data_hash, 761 result.get('status_code'), 762 result.get('body')) 763 raise self.TaskFailed() 764 765 _logger.debug("_KeepWriterThread %s succeeded %s+%i %s", 766 str(threading.current_thread()), 767 self.data_hash, 768 len(self.data), 769 service_root) 770 try: 771 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 772 except (KeyError, ValueError): 773 replicas_stored = 1 774 775 classes_confirmed = {} 776 try: 777 scch = result['headers']['x-keep-storage-classes-confirmed'] 778 for confirmation in scch.replace(' ', '').split(','): 779 if '=' in confirmation: 780 stored_class, stored_copies = confirmation.split('=')[:2] 781 classes_confirmed[stored_class] = int(stored_copies) 782 except (KeyError, ValueError): 783 # Storage classes confirmed header missing or corrupt 784 classes_confirmed = None 785 786 return result['body'].strip(), replicas_stored, classes_confirmed 787 788 789 def __init__(self, api_client=None, proxy=None, 790 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 791 api_token=None, local_store=None, block_cache=None, 792 num_retries=10, session=None, num_prefetch_threads=None): 793 """Initialize a new KeepClient. 794 795 Arguments: 796 :api_client: 797 The API client to use to find Keep services. If not 798 provided, KeepClient will build one from available Arvados 799 configuration. 800 801 :proxy: 802 If specified, this KeepClient will send requests to this Keep 803 proxy. Otherwise, KeepClient will fall back to the setting of the 804 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 805 If you want to KeepClient does not use a proxy, pass in an empty 806 string. 807 808 :timeout: 809 The initial timeout (in seconds) for HTTP requests to Keep 810 non-proxy servers. A tuple of three floats is interpreted as 811 (connection_timeout, read_timeout, minimum_bandwidth). A connection 812 will be aborted if the average traffic rate falls below 813 minimum_bandwidth bytes per second over an interval of read_timeout 814 seconds. Because timeouts are often a result of transient server 815 load, the actual connection timeout will be increased by a factor 816 of two on each retry. 817 Default: (2, 256, 32768). 818 819 :proxy_timeout: 820 The initial timeout (in seconds) for HTTP requests to 821 Keep proxies. A tuple of three floats is interpreted as 822 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 823 described above for adjusting connection timeouts on retry also 824 applies. 825 Default: (20, 256, 32768). 826 827 :api_token: 828 If you're not using an API client, but only talking 829 directly to a Keep proxy, this parameter specifies an API token 830 to authenticate Keep requests. It is an error to specify both 831 api_client and api_token. If you specify neither, KeepClient 832 will use one available from the Arvados configuration. 833 834 :local_store: 835 If specified, this KeepClient will bypass Keep 836 services, and save data to the named directory. If unspecified, 837 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 838 environment variable. If you want to ensure KeepClient does not 839 use local storage, pass in an empty string. This is primarily 840 intended to mock a server for testing. 841 842 :num_retries: 843 The default number of times to retry failed requests. 844 This will be used as the default num_retries value when get() and 845 put() are called. Default 10. 846 """ 847 self.lock = threading.Lock() 848 if proxy is None: 849 if config.get('ARVADOS_KEEP_SERVICES'): 850 proxy = config.get('ARVADOS_KEEP_SERVICES') 851 else: 852 proxy = config.get('ARVADOS_KEEP_PROXY') 853 if api_token is None: 854 if api_client is None: 855 api_token = config.get('ARVADOS_API_TOKEN') 856 else: 857 api_token = api_client.api_token 858 elif api_client is not None: 859 raise ValueError( 860 "can't build KeepClient with both API client and token") 861 if local_store is None: 862 local_store = os.environ.get('KEEP_LOCAL_STORE') 863 864 if api_client is None: 865 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 866 else: 867 self.insecure = api_client.insecure 868 869 self.block_cache = block_cache if block_cache else KeepBlockCache() 870 self.timeout = timeout 871 self.proxy_timeout = proxy_timeout 872 self._user_agent_pool = queue.LifoQueue() 873 self.upload_counter = _Counter() 874 self.download_counter = _Counter() 875 self.put_counter = _Counter() 876 self.get_counter = _Counter() 877 self.hits_counter = _Counter() 878 self.misses_counter = _Counter() 879 self._storage_classes_unsupported_warning = False 880 self._default_classes = [] 881 if num_prefetch_threads is not None: 882 self.num_prefetch_threads = num_prefetch_threads 883 else: 884 self.num_prefetch_threads = 2 885 self._prefetch_queue = None 886 self._prefetch_threads = None 887 888 if local_store: 889 self.local_store = local_store 890 self.head = self.local_store_head 891 self.get = self.local_store_get 892 self.put = self.local_store_put 893 else: 894 self.num_retries = num_retries 895 self.max_replicas_per_service = None 896 if proxy: 897 proxy_uris = proxy.split() 898 for i in range(len(proxy_uris)): 899 if not proxy_uris[i].endswith('/'): 900 proxy_uris[i] += '/' 901 # URL validation 902 url = urllib.parse.urlparse(proxy_uris[i]) 903 if not (url.scheme and url.netloc): 904 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 905 self.api_token = api_token 906 self._gateway_services = {} 907 self._keep_services = [{ 908 'uuid': "00000-bi6l4-%015d" % idx, 909 'service_type': 'proxy', 910 '_service_root': uri, 911 } for idx, uri in enumerate(proxy_uris)] 912 self._writable_services = self._keep_services 913 self.using_proxy = True 914 self._static_services_list = True 915 else: 916 # It's important to avoid instantiating an API client 917 # unless we actually need one, for testing's sake. 918 if api_client is None: 919 api_client = arvados.api('v1') 920 self.api_client = api_client 921 self.api_token = api_client.api_token 922 self._gateway_services = {} 923 self._keep_services = None 924 self._writable_services = None 925 self.using_proxy = None 926 self._static_services_list = False 927 try: 928 self._default_classes = [ 929 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 930 except KeyError: 931 # We're talking to an old cluster 932 pass 933 934 def current_timeout(self, attempt_number): 935 """Return the appropriate timeout to use for this client. 936 937 The proxy timeout setting if the backend service is currently a proxy, 938 the regular timeout setting otherwise. The `attempt_number` indicates 939 how many times the operation has been tried already (starting from 0 940 for the first try), and scales the connection timeout portion of the 941 return value accordingly. 942 943 """ 944 # TODO(twp): the timeout should be a property of a 945 # _KeepService, not a KeepClient. See #4488. 946 t = self.proxy_timeout if self.using_proxy else self.timeout 947 if len(t) == 2: 948 return (t[0] * (1 << attempt_number), t[1]) 949 else: 950 return (t[0] * (1 << attempt_number), t[1], t[2]) 951 def _any_nondisk_services(self, service_list): 952 return any(ks.get('service_type', 'disk') != 'disk' 953 for ks in service_list) 954 955 def build_services_list(self, force_rebuild=False): 956 if (self._static_services_list or 957 (self._keep_services and not force_rebuild)): 958 return 959 with self.lock: 960 try: 961 keep_services = self.api_client.keep_services().accessible() 962 except Exception: # API server predates Keep services. 963 keep_services = self.api_client.keep_disks().list() 964 965 # Gateway services are only used when specified by UUID, 966 # so there's nothing to gain by filtering them by 967 # service_type. 968 self._gateway_services = {ks['uuid']: ks for ks in 969 keep_services.execute()['items']} 970 if not self._gateway_services: 971 raise arvados.errors.NoKeepServersError() 972 973 # Precompute the base URI for each service. 974 for r in self._gateway_services.values(): 975 host = r['service_host'] 976 if not host.startswith('[') and host.find(':') >= 0: 977 # IPv6 URIs must be formatted like http://[::1]:80/... 978 host = '[' + host + ']' 979 r['_service_root'] = "{}://{}:{:d}/".format( 980 'https' if r['service_ssl_flag'] else 'http', 981 host, 982 r['service_port']) 983 984 _logger.debug(str(self._gateway_services)) 985 self._keep_services = [ 986 ks for ks in self._gateway_services.values() 987 if not ks.get('service_type', '').startswith('gateway:')] 988 self._writable_services = [ks for ks in self._keep_services 989 if not ks.get('read_only')] 990 991 # For disk type services, max_replicas_per_service is 1 992 # It is unknown (unlimited) for other service types. 993 if self._any_nondisk_services(self._writable_services): 994 self.max_replicas_per_service = None 995 else: 996 self.max_replicas_per_service = 1 997 998 def _service_weight(self, data_hash, service_uuid): 999 """Compute the weight of a Keep service endpoint for a data 1000 block with a known hash. 1001 1002 The weight is md5(h + u) where u is the last 15 characters of 1003 the service endpoint's UUID. 1004 """ 1005 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1006 1007 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1008 """Return an array of Keep service endpoints, in the order in 1009 which they should be probed when reading or writing data with 1010 the given hash+hints. 1011 """ 1012 self.build_services_list(force_rebuild) 1013 1014 sorted_roots = [] 1015 # Use the services indicated by the given +K@... remote 1016 # service hints, if any are present and can be resolved to a 1017 # URI. 1018 for hint in locator.hints: 1019 if hint.startswith('K@'): 1020 if len(hint) == 7: 1021 sorted_roots.append( 1022 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1023 elif len(hint) == 29: 1024 svc = self._gateway_services.get(hint[2:]) 1025 if svc: 1026 sorted_roots.append(svc['_service_root']) 1027 1028 # Sort the available local services by weight (heaviest first) 1029 # for this locator, and return their service_roots (base URIs) 1030 # in that order. 1031 use_services = self._keep_services 1032 if need_writable: 1033 use_services = self._writable_services 1034 self.using_proxy = self._any_nondisk_services(use_services) 1035 sorted_roots.extend([ 1036 svc['_service_root'] for svc in sorted( 1037 use_services, 1038 reverse=True, 1039 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1040 _logger.debug("{}: {}".format(locator, sorted_roots)) 1041 return sorted_roots 1042 1043 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1044 # roots_map is a dictionary, mapping Keep service root strings 1045 # to _KeepService objects. Poll for Keep services, and add any 1046 # new ones to roots_map. Return the current list of local 1047 # root strings. 1048 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,)) 1049 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1050 for root in local_roots: 1051 if root not in roots_map: 1052 roots_map[root] = self._KeepService( 1053 root, self._user_agent_pool, 1054 upload_counter=self.upload_counter, 1055 download_counter=self.download_counter, 1056 headers=headers, 1057 insecure=self.insecure) 1058 return local_roots 1059 1060 @staticmethod 1061 def _check_loop_result(result): 1062 # KeepClient RetryLoops should save results as a 2-tuple: the 1063 # actual result of the request, and the number of servers available 1064 # to receive the request this round. 1065 # This method returns True if there's a real result, False if 1066 # there are no more servers available, otherwise None. 1067 if isinstance(result, Exception): 1068 return None 1069 result, tried_server_count = result 1070 if (result is not None) and (result is not False): 1071 return True 1072 elif tried_server_count < 1: 1073 _logger.info("No more Keep services to try; giving up") 1074 return False 1075 else: 1076 return None 1077 1078 def get_from_cache(self, loc_s): 1079 """Fetch a block only if is in the cache, otherwise return None.""" 1080 locator = KeepLocator(loc_s) 1081 slot = self.block_cache.get(locator.md5sum) 1082 if slot is not None and slot.ready.is_set(): 1083 return slot.get() 1084 else: 1085 return None 1086 1087 def refresh_signature(self, loc): 1088 """Ask Keep to get the remote block and return its local signature""" 1089 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1090 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1091 1092 @retry.retry_method 1093 def head(self, loc_s, **kwargs): 1094 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1095 1096 @retry.retry_method 1097 def get(self, loc_s, **kwargs): 1098 return self._get_or_head(loc_s, method="GET", **kwargs) 1099 1100 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1101 """Get data from Keep. 1102 1103 This method fetches one or more blocks of data from Keep. It 1104 sends a request each Keep service registered with the API 1105 server (or the proxy provided when this client was 1106 instantiated), then each service named in location hints, in 1107 sequence. As soon as one service provides the data, it's 1108 returned. 1109 1110 Arguments: 1111 * loc_s: A string of one or more comma-separated locators to fetch. 1112 This method returns the concatenation of these blocks. 1113 * num_retries: The number of times to retry GET requests to 1114 *each* Keep server if it returns temporary failures, with 1115 exponential backoff. Note that, in each loop, the method may try 1116 to fetch data from every available Keep service, along with any 1117 that are named in location hints in the locator. The default value 1118 is set when the KeepClient is initialized. 1119 """ 1120 if ',' in loc_s: 1121 return ''.join(self.get(x) for x in loc_s.split(',')) 1122 1123 self.get_counter.add(1) 1124 1125 request_id = (request_id or 1126 (hasattr(self, 'api_client') and self.api_client.request_id) or 1127 arvados.util.new_request_id()) 1128 if headers is None: 1129 headers = {} 1130 headers['X-Request-Id'] = request_id 1131 1132 slot = None 1133 blob = None 1134 try: 1135 locator = KeepLocator(loc_s) 1136 if method == "GET": 1137 while slot is None: 1138 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1139 if first: 1140 # Fresh and empty "first time it is used" slot 1141 break 1142 if prefetch: 1143 # this is request for a prefetch to fill in 1144 # the cache, don't need to wait for the 1145 # result, so if it is already in flight return 1146 # immediately. Clear 'slot' to prevent 1147 # finally block from calling slot.set() 1148 if slot.ready.is_set(): 1149 slot.get() 1150 slot = None 1151 return None 1152 1153 blob = slot.get() 1154 if blob is not None: 1155 self.hits_counter.add(1) 1156 return blob 1157 1158 # If blob is None, this means either 1159 # 1160 # (a) another thread was fetching this block and 1161 # failed with an error or 1162 # 1163 # (b) cache thrashing caused the slot to be 1164 # evicted (content set to None) by another thread 1165 # between the call to reserve_cache() and get(). 1166 # 1167 # We'll handle these cases by reserving a new slot 1168 # and then doing a full GET request. 1169 slot = None 1170 1171 self.misses_counter.add(1) 1172 1173 # If the locator has hints specifying a prefix (indicating a 1174 # remote keepproxy) or the UUID of a local gateway service, 1175 # read data from the indicated service(s) instead of the usual 1176 # list of local disk services. 1177 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1178 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1179 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1180 for hint in locator.hints if ( 1181 hint.startswith('K@') and 1182 len(hint) == 29 and 1183 self._gateway_services.get(hint[2:]) 1184 )]) 1185 # Map root URLs to their _KeepService objects. 1186 roots_map = { 1187 root: self._KeepService(root, self._user_agent_pool, 1188 upload_counter=self.upload_counter, 1189 download_counter=self.download_counter, 1190 headers=headers, 1191 insecure=self.insecure) 1192 for root in hint_roots 1193 } 1194 1195 # See #3147 for a discussion of the loop implementation. Highlights: 1196 # * Refresh the list of Keep services after each failure, in case 1197 # it's being updated. 1198 # * Retry until we succeed, we're out of retries, or every available 1199 # service has returned permanent failure. 1200 sorted_roots = [] 1201 roots_map = {} 1202 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1203 backoff_start=2) 1204 for tries_left in loop: 1205 try: 1206 sorted_roots = self.map_new_services( 1207 roots_map, locator, 1208 force_rebuild=(tries_left < num_retries), 1209 need_writable=False, 1210 headers=headers) 1211 except Exception as error: 1212 loop.save_result(error) 1213 continue 1214 1215 # Query _KeepService objects that haven't returned 1216 # permanent failure, in our specified shuffle order. 1217 services_to_try = [roots_map[root] 1218 for root in sorted_roots 1219 if roots_map[root].usable()] 1220 for keep_service in services_to_try: 1221 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1222 if blob is not None: 1223 break 1224 loop.save_result((blob, len(services_to_try))) 1225 1226 # Always cache the result, then return it if we succeeded. 1227 if loop.success(): 1228 return blob 1229 finally: 1230 if slot is not None: 1231 self.block_cache.set(slot, blob) 1232 1233 # Q: Including 403 is necessary for the Keep tests to continue 1234 # passing, but maybe they should expect KeepReadError instead? 1235 not_founds = sum(1 for key in sorted_roots 1236 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1237 service_errors = ((key, roots_map[key].last_result()['error']) 1238 for key in sorted_roots) 1239 if not roots_map: 1240 raise arvados.errors.KeepReadError( 1241 "[{}] failed to read {}: no Keep services available ({})".format( 1242 request_id, loc_s, loop.last_result())) 1243 elif not_founds == len(sorted_roots): 1244 raise arvados.errors.NotFoundError( 1245 "[{}] {} not found".format(request_id, loc_s), service_errors) 1246 else: 1247 raise arvados.errors.KeepReadError( 1248 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1249 1250 @retry.retry_method 1251 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1252 """Save data in Keep. 1253 1254 This method will get a list of Keep services from the API server, and 1255 send the data to each one simultaneously in a new thread. Once the 1256 uploads are finished, if enough copies are saved, this method returns 1257 the most recent HTTP response body. If requests fail to upload 1258 enough copies, this method raises KeepWriteError. 1259 1260 Arguments: 1261 * data: The string of data to upload. 1262 * copies: The number of copies that the user requires be saved. 1263 Default 2. 1264 * num_retries: The number of times to retry PUT requests to 1265 *each* Keep server if it returns temporary failures, with 1266 exponential backoff. The default value is set when the 1267 KeepClient is initialized. 1268 * classes: An optional list of storage class names where copies should 1269 be written. 1270 """ 1271 1272 classes = classes or self._default_classes 1273 1274 if not isinstance(data, bytes): 1275 data = data.encode() 1276 1277 self.put_counter.add(1) 1278 1279 data_hash = hashlib.md5(data).hexdigest() 1280 loc_s = data_hash + '+' + str(len(data)) 1281 if copies < 1: 1282 return loc_s 1283 locator = KeepLocator(loc_s) 1284 1285 request_id = (request_id or 1286 (hasattr(self, 'api_client') and self.api_client.request_id) or 1287 arvados.util.new_request_id()) 1288 headers = { 1289 'X-Request-Id': request_id, 1290 'X-Keep-Desired-Replicas': str(copies), 1291 } 1292 roots_map = {} 1293 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1294 backoff_start=2) 1295 done_copies = 0 1296 done_classes = [] 1297 for tries_left in loop: 1298 try: 1299 sorted_roots = self.map_new_services( 1300 roots_map, locator, 1301 force_rebuild=(tries_left < num_retries), 1302 need_writable=True, 1303 headers=headers) 1304 except Exception as error: 1305 loop.save_result(error) 1306 continue 1307 1308 pending_classes = [] 1309 if done_classes is not None: 1310 pending_classes = list(set(classes) - set(done_classes)) 1311 writer_pool = KeepClient._KeepWriterThreadPool( 1312 data=data, 1313 data_hash=data_hash, 1314 copies=copies - done_copies, 1315 max_service_replicas=self.max_replicas_per_service, 1316 timeout=self.current_timeout(num_retries - tries_left), 1317 classes=pending_classes, 1318 ) 1319 for service_root, ks in [(root, roots_map[root]) 1320 for root in sorted_roots]: 1321 if ks.finished(): 1322 continue 1323 writer_pool.add_task(ks, service_root) 1324 writer_pool.join() 1325 pool_copies, pool_classes = writer_pool.done() 1326 done_copies += pool_copies 1327 if (done_classes is not None) and (pool_classes is not None): 1328 done_classes += pool_classes 1329 loop.save_result( 1330 (done_copies >= copies and set(done_classes) == set(classes), 1331 writer_pool.total_task_nr)) 1332 else: 1333 # Old keepstore contacted without storage classes support: 1334 # success is determined only by successful copies. 1335 # 1336 # Disable storage classes tracking from this point forward. 1337 if not self._storage_classes_unsupported_warning: 1338 self._storage_classes_unsupported_warning = True 1339 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1340 done_classes = None 1341 loop.save_result( 1342 (done_copies >= copies, writer_pool.total_task_nr)) 1343 1344 if loop.success(): 1345 return writer_pool.response() 1346 if not roots_map: 1347 raise arvados.errors.KeepWriteError( 1348 "[{}] failed to write {}: no Keep services available ({})".format( 1349 request_id, data_hash, loop.last_result())) 1350 else: 1351 service_errors = ((key, roots_map[key].last_result()['error']) 1352 for key in sorted_roots 1353 if roots_map[key].last_result()['error']) 1354 raise arvados.errors.KeepWriteError( 1355 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1356 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1357 1358 def _block_prefetch_worker(self): 1359 """The background downloader thread.""" 1360 while True: 1361 try: 1362 b = self._prefetch_queue.get() 1363 if b is None: 1364 return 1365 self.get(b, prefetch=True) 1366 except Exception: 1367 _logger.exception("Exception doing block prefetch") 1368 1369 def _start_prefetch_threads(self): 1370 if self._prefetch_threads is None: 1371 with self.lock: 1372 if self._prefetch_threads is not None: 1373 return 1374 self._prefetch_queue = queue.Queue() 1375 self._prefetch_threads = [] 1376 for i in range(0, self.num_prefetch_threads): 1377 thread = threading.Thread(target=self._block_prefetch_worker) 1378 self._prefetch_threads.append(thread) 1379 thread.daemon = True 1380 thread.start() 1381 1382 def block_prefetch(self, locator): 1383 """ 1384 This relies on the fact that KeepClient implements a block cache, 1385 so repeated requests for the same block will not result in repeated 1386 downloads (unless the block is evicted from the cache.) This method 1387 does not block. 1388 """ 1389 1390 if self.block_cache.get(locator) is not None: 1391 return 1392 1393 self._start_prefetch_threads() 1394 self._prefetch_queue.put(locator) 1395 1396 def stop_prefetch_threads(self): 1397 with self.lock: 1398 if self._prefetch_threads is not None: 1399 for t in self._prefetch_threads: 1400 self._prefetch_queue.put(None) 1401 for t in self._prefetch_threads: 1402 t.join() 1403 self._prefetch_threads = None 1404 self._prefetch_queue = None 1405 1406 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1407 """A stub for put(). 1408 1409 This method is used in place of the real put() method when 1410 using local storage (see constructor's local_store argument). 1411 1412 copies and num_retries arguments are ignored: they are here 1413 only for the sake of offering the same call signature as 1414 put(). 1415 1416 Data stored this way can be retrieved via local_store_get(). 1417 """ 1418 md5 = hashlib.md5(data).hexdigest() 1419 locator = '%s+%d' % (md5, len(data)) 1420 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1421 f.write(data) 1422 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1423 os.path.join(self.local_store, md5)) 1424 return locator 1425 1426 def local_store_get(self, loc_s, num_retries=None): 1427 """Companion to local_store_put().""" 1428 try: 1429 locator = KeepLocator(loc_s) 1430 except ValueError: 1431 raise arvados.errors.NotFoundError( 1432 "Invalid data locator: '%s'" % loc_s) 1433 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1434 return b'' 1435 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1436 return f.read() 1437 1438 def local_store_head(self, loc_s, num_retries=None): 1439 """Companion to local_store_put().""" 1440 try: 1441 locator = KeepLocator(loc_s) 1442 except ValueError: 1443 raise arvados.errors.NotFoundError( 1444 "Invalid data locator: '%s'" % loc_s) 1445 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1446 return True 1447 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1448 return True
789 def __init__(self, api_client=None, proxy=None, 790 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 791 api_token=None, local_store=None, block_cache=None, 792 num_retries=10, session=None, num_prefetch_threads=None): 793 """Initialize a new KeepClient. 794 795 Arguments: 796 :api_client: 797 The API client to use to find Keep services. If not 798 provided, KeepClient will build one from available Arvados 799 configuration. 800 801 :proxy: 802 If specified, this KeepClient will send requests to this Keep 803 proxy. Otherwise, KeepClient will fall back to the setting of the 804 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 805 If you want to KeepClient does not use a proxy, pass in an empty 806 string. 807 808 :timeout: 809 The initial timeout (in seconds) for HTTP requests to Keep 810 non-proxy servers. A tuple of three floats is interpreted as 811 (connection_timeout, read_timeout, minimum_bandwidth). A connection 812 will be aborted if the average traffic rate falls below 813 minimum_bandwidth bytes per second over an interval of read_timeout 814 seconds. Because timeouts are often a result of transient server 815 load, the actual connection timeout will be increased by a factor 816 of two on each retry. 817 Default: (2, 256, 32768). 818 819 :proxy_timeout: 820 The initial timeout (in seconds) for HTTP requests to 821 Keep proxies. A tuple of three floats is interpreted as 822 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 823 described above for adjusting connection timeouts on retry also 824 applies. 825 Default: (20, 256, 32768). 826 827 :api_token: 828 If you're not using an API client, but only talking 829 directly to a Keep proxy, this parameter specifies an API token 830 to authenticate Keep requests. It is an error to specify both 831 api_client and api_token. If you specify neither, KeepClient 832 will use one available from the Arvados configuration. 833 834 :local_store: 835 If specified, this KeepClient will bypass Keep 836 services, and save data to the named directory. If unspecified, 837 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 838 environment variable. If you want to ensure KeepClient does not 839 use local storage, pass in an empty string. This is primarily 840 intended to mock a server for testing. 841 842 :num_retries: 843 The default number of times to retry failed requests. 844 This will be used as the default num_retries value when get() and 845 put() are called. Default 10. 846 """ 847 self.lock = threading.Lock() 848 if proxy is None: 849 if config.get('ARVADOS_KEEP_SERVICES'): 850 proxy = config.get('ARVADOS_KEEP_SERVICES') 851 else: 852 proxy = config.get('ARVADOS_KEEP_PROXY') 853 if api_token is None: 854 if api_client is None: 855 api_token = config.get('ARVADOS_API_TOKEN') 856 else: 857 api_token = api_client.api_token 858 elif api_client is not None: 859 raise ValueError( 860 "can't build KeepClient with both API client and token") 861 if local_store is None: 862 local_store = os.environ.get('KEEP_LOCAL_STORE') 863 864 if api_client is None: 865 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 866 else: 867 self.insecure = api_client.insecure 868 869 self.block_cache = block_cache if block_cache else KeepBlockCache() 870 self.timeout = timeout 871 self.proxy_timeout = proxy_timeout 872 self._user_agent_pool = queue.LifoQueue() 873 self.upload_counter = _Counter() 874 self.download_counter = _Counter() 875 self.put_counter = _Counter() 876 self.get_counter = _Counter() 877 self.hits_counter = _Counter() 878 self.misses_counter = _Counter() 879 self._storage_classes_unsupported_warning = False 880 self._default_classes = [] 881 if num_prefetch_threads is not None: 882 self.num_prefetch_threads = num_prefetch_threads 883 else: 884 self.num_prefetch_threads = 2 885 self._prefetch_queue = None 886 self._prefetch_threads = None 887 888 if local_store: 889 self.local_store = local_store 890 self.head = self.local_store_head 891 self.get = self.local_store_get 892 self.put = self.local_store_put 893 else: 894 self.num_retries = num_retries 895 self.max_replicas_per_service = None 896 if proxy: 897 proxy_uris = proxy.split() 898 for i in range(len(proxy_uris)): 899 if not proxy_uris[i].endswith('/'): 900 proxy_uris[i] += '/' 901 # URL validation 902 url = urllib.parse.urlparse(proxy_uris[i]) 903 if not (url.scheme and url.netloc): 904 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 905 self.api_token = api_token 906 self._gateway_services = {} 907 self._keep_services = [{ 908 'uuid': "00000-bi6l4-%015d" % idx, 909 'service_type': 'proxy', 910 '_service_root': uri, 911 } for idx, uri in enumerate(proxy_uris)] 912 self._writable_services = self._keep_services 913 self.using_proxy = True 914 self._static_services_list = True 915 else: 916 # It's important to avoid instantiating an API client 917 # unless we actually need one, for testing's sake. 918 if api_client is None: 919 api_client = arvados.api('v1') 920 self.api_client = api_client 921 self.api_token = api_client.api_token 922 self._gateway_services = {} 923 self._keep_services = None 924 self._writable_services = None 925 self.using_proxy = None 926 self._static_services_list = False 927 try: 928 self._default_classes = [ 929 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 930 except KeyError: 931 # We're talking to an old cluster 932 pass
Initialize a new KeepClient.
Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.
:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.
:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).
:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).
:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.
:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.
:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.
934 def current_timeout(self, attempt_number): 935 """Return the appropriate timeout to use for this client. 936 937 The proxy timeout setting if the backend service is currently a proxy, 938 the regular timeout setting otherwise. The `attempt_number` indicates 939 how many times the operation has been tried already (starting from 0 940 for the first try), and scales the connection timeout portion of the 941 return value accordingly. 942 943 """ 944 # TODO(twp): the timeout should be a property of a 945 # _KeepService, not a KeepClient. See #4488. 946 t = self.proxy_timeout if self.using_proxy else self.timeout 947 if len(t) == 2: 948 return (t[0] * (1 << attempt_number), t[1]) 949 else: 950 return (t[0] * (1 << attempt_number), t[1], t[2])
Return the appropriate timeout to use for this client.
The proxy timeout setting if the backend service is currently a proxy,
the regular timeout setting otherwise. The attempt_number
indicates
how many times the operation has been tried already (starting from 0
for the first try), and scales the connection timeout portion of the
return value accordingly.
955 def build_services_list(self, force_rebuild=False): 956 if (self._static_services_list or 957 (self._keep_services and not force_rebuild)): 958 return 959 with self.lock: 960 try: 961 keep_services = self.api_client.keep_services().accessible() 962 except Exception: # API server predates Keep services. 963 keep_services = self.api_client.keep_disks().list() 964 965 # Gateway services are only used when specified by UUID, 966 # so there's nothing to gain by filtering them by 967 # service_type. 968 self._gateway_services = {ks['uuid']: ks for ks in 969 keep_services.execute()['items']} 970 if not self._gateway_services: 971 raise arvados.errors.NoKeepServersError() 972 973 # Precompute the base URI for each service. 974 for r in self._gateway_services.values(): 975 host = r['service_host'] 976 if not host.startswith('[') and host.find(':') >= 0: 977 # IPv6 URIs must be formatted like http://[::1]:80/... 978 host = '[' + host + ']' 979 r['_service_root'] = "{}://{}:{:d}/".format( 980 'https' if r['service_ssl_flag'] else 'http', 981 host, 982 r['service_port']) 983 984 _logger.debug(str(self._gateway_services)) 985 self._keep_services = [ 986 ks for ks in self._gateway_services.values() 987 if not ks.get('service_type', '').startswith('gateway:')] 988 self._writable_services = [ks for ks in self._keep_services 989 if not ks.get('read_only')] 990 991 # For disk type services, max_replicas_per_service is 1 992 # It is unknown (unlimited) for other service types. 993 if self._any_nondisk_services(self._writable_services): 994 self.max_replicas_per_service = None 995 else: 996 self.max_replicas_per_service = 1
1007 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1008 """Return an array of Keep service endpoints, in the order in 1009 which they should be probed when reading or writing data with 1010 the given hash+hints. 1011 """ 1012 self.build_services_list(force_rebuild) 1013 1014 sorted_roots = [] 1015 # Use the services indicated by the given +K@... remote 1016 # service hints, if any are present and can be resolved to a 1017 # URI. 1018 for hint in locator.hints: 1019 if hint.startswith('K@'): 1020 if len(hint) == 7: 1021 sorted_roots.append( 1022 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1023 elif len(hint) == 29: 1024 svc = self._gateway_services.get(hint[2:]) 1025 if svc: 1026 sorted_roots.append(svc['_service_root']) 1027 1028 # Sort the available local services by weight (heaviest first) 1029 # for this locator, and return their service_roots (base URIs) 1030 # in that order. 1031 use_services = self._keep_services 1032 if need_writable: 1033 use_services = self._writable_services 1034 self.using_proxy = self._any_nondisk_services(use_services) 1035 sorted_roots.extend([ 1036 svc['_service_root'] for svc in sorted( 1037 use_services, 1038 reverse=True, 1039 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1040 _logger.debug("{}: {}".format(locator, sorted_roots)) 1041 return sorted_roots
Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.
1043 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1044 # roots_map is a dictionary, mapping Keep service root strings 1045 # to _KeepService objects. Poll for Keep services, and add any 1046 # new ones to roots_map. Return the current list of local 1047 # root strings. 1048 headers.setdefault('Authorization', "Bearer %s" % (self.api_token,)) 1049 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1050 for root in local_roots: 1051 if root not in roots_map: 1052 roots_map[root] = self._KeepService( 1053 root, self._user_agent_pool, 1054 upload_counter=self.upload_counter, 1055 download_counter=self.download_counter, 1056 headers=headers, 1057 insecure=self.insecure) 1058 return local_roots
1078 def get_from_cache(self, loc_s): 1079 """Fetch a block only if is in the cache, otherwise return None.""" 1080 locator = KeepLocator(loc_s) 1081 slot = self.block_cache.get(locator.md5sum) 1082 if slot is not None and slot.ready.is_set(): 1083 return slot.get() 1084 else: 1085 return None
Fetch a block only if is in the cache, otherwise return None.
1087 def refresh_signature(self, loc): 1088 """Ask Keep to get the remote block and return its local signature""" 1089 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1090 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
Ask Keep to get the remote block and return its local signature
1250 @retry.retry_method 1251 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1252 """Save data in Keep. 1253 1254 This method will get a list of Keep services from the API server, and 1255 send the data to each one simultaneously in a new thread. Once the 1256 uploads are finished, if enough copies are saved, this method returns 1257 the most recent HTTP response body. If requests fail to upload 1258 enough copies, this method raises KeepWriteError. 1259 1260 Arguments: 1261 * data: The string of data to upload. 1262 * copies: The number of copies that the user requires be saved. 1263 Default 2. 1264 * num_retries: The number of times to retry PUT requests to 1265 *each* Keep server if it returns temporary failures, with 1266 exponential backoff. The default value is set when the 1267 KeepClient is initialized. 1268 * classes: An optional list of storage class names where copies should 1269 be written. 1270 """ 1271 1272 classes = classes or self._default_classes 1273 1274 if not isinstance(data, bytes): 1275 data = data.encode() 1276 1277 self.put_counter.add(1) 1278 1279 data_hash = hashlib.md5(data).hexdigest() 1280 loc_s = data_hash + '+' + str(len(data)) 1281 if copies < 1: 1282 return loc_s 1283 locator = KeepLocator(loc_s) 1284 1285 request_id = (request_id or 1286 (hasattr(self, 'api_client') and self.api_client.request_id) or 1287 arvados.util.new_request_id()) 1288 headers = { 1289 'X-Request-Id': request_id, 1290 'X-Keep-Desired-Replicas': str(copies), 1291 } 1292 roots_map = {} 1293 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1294 backoff_start=2) 1295 done_copies = 0 1296 done_classes = [] 1297 for tries_left in loop: 1298 try: 1299 sorted_roots = self.map_new_services( 1300 roots_map, locator, 1301 force_rebuild=(tries_left < num_retries), 1302 need_writable=True, 1303 headers=headers) 1304 except Exception as error: 1305 loop.save_result(error) 1306 continue 1307 1308 pending_classes = [] 1309 if done_classes is not None: 1310 pending_classes = list(set(classes) - set(done_classes)) 1311 writer_pool = KeepClient._KeepWriterThreadPool( 1312 data=data, 1313 data_hash=data_hash, 1314 copies=copies - done_copies, 1315 max_service_replicas=self.max_replicas_per_service, 1316 timeout=self.current_timeout(num_retries - tries_left), 1317 classes=pending_classes, 1318 ) 1319 for service_root, ks in [(root, roots_map[root]) 1320 for root in sorted_roots]: 1321 if ks.finished(): 1322 continue 1323 writer_pool.add_task(ks, service_root) 1324 writer_pool.join() 1325 pool_copies, pool_classes = writer_pool.done() 1326 done_copies += pool_copies 1327 if (done_classes is not None) and (pool_classes is not None): 1328 done_classes += pool_classes 1329 loop.save_result( 1330 (done_copies >= copies and set(done_classes) == set(classes), 1331 writer_pool.total_task_nr)) 1332 else: 1333 # Old keepstore contacted without storage classes support: 1334 # success is determined only by successful copies. 1335 # 1336 # Disable storage classes tracking from this point forward. 1337 if not self._storage_classes_unsupported_warning: 1338 self._storage_classes_unsupported_warning = True 1339 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1340 done_classes = None 1341 loop.save_result( 1342 (done_copies >= copies, writer_pool.total_task_nr)) 1343 1344 if loop.success(): 1345 return writer_pool.response() 1346 if not roots_map: 1347 raise arvados.errors.KeepWriteError( 1348 "[{}] failed to write {}: no Keep services available ({})".format( 1349 request_id, data_hash, loop.last_result())) 1350 else: 1351 service_errors = ((key, roots_map[key].last_result()['error']) 1352 for key in sorted_roots 1353 if roots_map[key].last_result()['error']) 1354 raise arvados.errors.KeepWriteError( 1355 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1356 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
Save data in Keep.
This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.
Arguments:
- data: The string of data to upload.
- copies: The number of copies that the user requires be saved. Default 2.
- num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
- classes: An optional list of storage class names where copies should be written.
1382 def block_prefetch(self, locator): 1383 """ 1384 This relies on the fact that KeepClient implements a block cache, 1385 so repeated requests for the same block will not result in repeated 1386 downloads (unless the block is evicted from the cache.) This method 1387 does not block. 1388 """ 1389 1390 if self.block_cache.get(locator) is not None: 1391 return 1392 1393 self._start_prefetch_threads() 1394 self._prefetch_queue.put(locator)
This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.
1406 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1407 """A stub for put(). 1408 1409 This method is used in place of the real put() method when 1410 using local storage (see constructor's local_store argument). 1411 1412 copies and num_retries arguments are ignored: they are here 1413 only for the sake of offering the same call signature as 1414 put(). 1415 1416 Data stored this way can be retrieved via local_store_get(). 1417 """ 1418 md5 = hashlib.md5(data).hexdigest() 1419 locator = '%s+%d' % (md5, len(data)) 1420 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1421 f.write(data) 1422 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1423 os.path.join(self.local_store, md5)) 1424 return locator
A stub for put().
This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).
copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().
Data stored this way can be retrieved via local_store_get().
1426 def local_store_get(self, loc_s, num_retries=None): 1427 """Companion to local_store_put().""" 1428 try: 1429 locator = KeepLocator(loc_s) 1430 except ValueError: 1431 raise arvados.errors.NotFoundError( 1432 "Invalid data locator: '%s'" % loc_s) 1433 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1434 return b'' 1435 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1436 return f.read()
Companion to local_store_put().
1438 def local_store_head(self, loc_s, num_retries=None): 1439 """Companion to local_store_put().""" 1440 try: 1441 locator = KeepLocator(loc_s) 1442 except ValueError: 1443 raise arvados.errors.NotFoundError( 1444 "Invalid data locator: '%s'" % loc_s) 1445 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1446 return True 1447 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1448 return True
Companion to local_store_put().