arvados.keep
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5from __future__ import absolute_import 6from __future__ import division 7import copy 8from future import standard_library 9from future.utils import native_str 10standard_library.install_aliases() 11from builtins import next 12from builtins import str 13from builtins import range 14from builtins import object 15import collections 16import datetime 17import hashlib 18import errno 19import io 20import logging 21import math 22import os 23import pycurl 24import queue 25import re 26import socket 27import ssl 28import sys 29import threading 30import resource 31from . import timer 32import urllib.parse 33import traceback 34import weakref 35 36if sys.version_info >= (3, 0): 37 from io import BytesIO 38else: 39 from cStringIO import StringIO as BytesIO 40 41import arvados 42import arvados.config as config 43import arvados.errors 44import arvados.retry as retry 45import arvados.util 46import arvados.diskcache 47from arvados._pycurlhelper import PyCurlHelper 48 49_logger = logging.getLogger('arvados.keep') 50global_client_object = None 51 52 53# Monkey patch TCP constants when not available (apple). Values sourced from: 54# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h 55if sys.platform == 'darwin': 56 if not hasattr(socket, 'TCP_KEEPALIVE'): 57 socket.TCP_KEEPALIVE = 0x010 58 if not hasattr(socket, 'TCP_KEEPINTVL'): 59 socket.TCP_KEEPINTVL = 0x101 60 if not hasattr(socket, 'TCP_KEEPCNT'): 61 socket.TCP_KEEPCNT = 0x102 62 63 64class KeepLocator(object): 65 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 66 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 67 68 def __init__(self, locator_str): 69 self.hints = [] 70 self._perm_sig = None 71 self._perm_expiry = None 72 pieces = iter(locator_str.split('+')) 73 self.md5sum = next(pieces) 74 try: 75 self.size = int(next(pieces)) 76 except StopIteration: 77 self.size = None 78 for hint in pieces: 79 if self.HINT_RE.match(hint) is None: 80 raise ValueError("invalid hint format: {}".format(hint)) 81 elif hint.startswith('A'): 82 self.parse_permission_hint(hint) 83 else: 84 self.hints.append(hint) 85 86 def __str__(self): 87 return '+'.join( 88 native_str(s) 89 for s in [self.md5sum, self.size, 90 self.permission_hint()] + self.hints 91 if s is not None) 92 93 def stripped(self): 94 if self.size is not None: 95 return "%s+%i" % (self.md5sum, self.size) 96 else: 97 return self.md5sum 98 99 def _make_hex_prop(name, length): 100 # Build and return a new property with the given name that 101 # must be a hex string of the given length. 102 data_name = '_{}'.format(name) 103 def getter(self): 104 return getattr(self, data_name) 105 def setter(self, hex_str): 106 if not arvados.util.is_hex(hex_str, length): 107 raise ValueError("{} is not a {}-digit hex string: {!r}". 108 format(name, length, hex_str)) 109 setattr(self, data_name, hex_str) 110 return property(getter, setter) 111 112 md5sum = _make_hex_prop('md5sum', 32) 113 perm_sig = _make_hex_prop('perm_sig', 40) 114 115 @property 116 def perm_expiry(self): 117 return self._perm_expiry 118 119 @perm_expiry.setter 120 def perm_expiry(self, value): 121 if not arvados.util.is_hex(value, 1, 8): 122 raise ValueError( 123 "permission timestamp must be a hex Unix timestamp: {}". 124 format(value)) 125 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 126 127 def permission_hint(self): 128 data = [self.perm_sig, self.perm_expiry] 129 if None in data: 130 return None 131 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 132 return "A{}@{:08x}".format(*data) 133 134 def parse_permission_hint(self, s): 135 try: 136 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 137 except IndexError: 138 raise ValueError("bad permission hint {}".format(s)) 139 140 def permission_expired(self, as_of_dt=None): 141 if self.perm_expiry is None: 142 return False 143 elif as_of_dt is None: 144 as_of_dt = datetime.datetime.now() 145 return self.perm_expiry <= as_of_dt 146 147 148class Keep(object): 149 """Simple interface to a global KeepClient object. 150 151 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your 152 own API client. The global KeepClient will build an API client from the 153 current Arvados configuration, which may not match the one you built. 154 """ 155 _last_key = None 156 157 @classmethod 158 def global_client_object(cls): 159 global global_client_object 160 # Previously, KeepClient would change its behavior at runtime based 161 # on these configuration settings. We simulate that behavior here 162 # by checking the values and returning a new KeepClient if any of 163 # them have changed. 164 key = (config.get('ARVADOS_API_HOST'), 165 config.get('ARVADOS_API_TOKEN'), 166 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 167 config.get('ARVADOS_KEEP_PROXY'), 168 os.environ.get('KEEP_LOCAL_STORE')) 169 if (global_client_object is None) or (cls._last_key != key): 170 global_client_object = KeepClient() 171 cls._last_key = key 172 return global_client_object 173 174 @staticmethod 175 def get(locator, **kwargs): 176 return Keep.global_client_object().get(locator, **kwargs) 177 178 @staticmethod 179 def put(data, **kwargs): 180 return Keep.global_client_object().put(data, **kwargs) 181 182class KeepBlockCache(object): 183 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 184 self.cache_max = cache_max 185 self._cache = collections.OrderedDict() 186 self._cache_lock = threading.Lock() 187 self._max_slots = max_slots 188 self._disk_cache = disk_cache 189 self._disk_cache_dir = disk_cache_dir 190 self._cache_updating = threading.Condition(self._cache_lock) 191 192 if self._disk_cache and self._disk_cache_dir is None: 193 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") 194 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True) 195 196 if self._max_slots == 0: 197 if self._disk_cache: 198 # Each block uses two file descriptors, one used to 199 # open it initially and hold the flock(), and a second 200 # hidden one used by mmap(). 201 # 202 # Set max slots to 1/8 of maximum file handles. This 203 # means we'll use at most 1/4 of total file handles. 204 # 205 # NOFILE typically defaults to 1024 on Linux so this 206 # is 128 slots (256 file handles), which means we can 207 # cache up to 8 GiB of 64 MiB blocks. This leaves 208 # 768 file handles for sockets and other stuff. 209 # 210 # When we want the ability to have more cache (e.g. in 211 # arv-mount) we'll increase rlimit before calling 212 # this. 213 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 214 else: 215 # RAM cache slots 216 self._max_slots = 512 217 218 if self.cache_max == 0: 219 if self._disk_cache: 220 fs = os.statvfs(self._disk_cache_dir) 221 # Calculation of available space incorporates existing cache usage 222 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 223 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 224 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 225 # pick smallest of: 226 # 10% of total disk size 227 # 25% of available space 228 # max_slots * 64 MiB 229 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 230 else: 231 # 256 MiB in RAM 232 self.cache_max = (256 * 1024 * 1024) 233 234 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 235 236 self.cache_total = 0 237 if self._disk_cache: 238 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 239 for slot in self._cache.values(): 240 self.cache_total += slot.size() 241 self.cap_cache() 242 243 class CacheSlot(object): 244 __slots__ = ("locator", "ready", "content") 245 246 def __init__(self, locator): 247 self.locator = locator 248 self.ready = threading.Event() 249 self.content = None 250 251 def get(self): 252 self.ready.wait() 253 return self.content 254 255 def set(self, value): 256 if self.content is not None: 257 return False 258 self.content = value 259 self.ready.set() 260 return True 261 262 def size(self): 263 if self.content is None: 264 return 0 265 else: 266 return len(self.content) 267 268 def evict(self): 269 self.content = None 270 271 272 def _resize_cache(self, cache_max, max_slots): 273 # Try and make sure the contents of the cache do not exceed 274 # the supplied maximums. 275 276 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 277 return 278 279 _evict_candidates = collections.deque(self._cache.values()) 280 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 281 slot = _evict_candidates.popleft() 282 if not slot.ready.is_set(): 283 continue 284 285 sz = slot.size() 286 slot.evict() 287 self.cache_total -= sz 288 del self._cache[slot.locator] 289 290 291 def cap_cache(self): 292 '''Cap the cache size to self.cache_max''' 293 with self._cache_updating: 294 self._resize_cache(self.cache_max, self._max_slots) 295 self._cache_updating.notify_all() 296 297 def _get(self, locator): 298 # Test if the locator is already in the cache 299 if locator in self._cache: 300 n = self._cache[locator] 301 if n.ready.is_set() and n.content is None: 302 del self._cache[n.locator] 303 return None 304 self._cache.move_to_end(locator) 305 return n 306 if self._disk_cache: 307 # see if it exists on disk 308 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 309 if n is not None: 310 self._cache[n.locator] = n 311 self.cache_total += n.size() 312 return n 313 return None 314 315 def get(self, locator): 316 with self._cache_lock: 317 return self._get(locator) 318 319 def reserve_cache(self, locator): 320 '''Reserve a cache slot for the specified locator, 321 or return the existing slot.''' 322 with self._cache_updating: 323 n = self._get(locator) 324 if n: 325 return n, False 326 else: 327 # Add a new cache slot for the locator 328 self._resize_cache(self.cache_max, self._max_slots-1) 329 while len(self._cache) >= self._max_slots: 330 # If there isn't a slot available, need to wait 331 # for something to happen that releases one of the 332 # cache slots. Idle for 200 ms or woken up by 333 # another thread 334 self._cache_updating.wait(timeout=0.2) 335 self._resize_cache(self.cache_max, self._max_slots-1) 336 337 if self._disk_cache: 338 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 339 else: 340 n = KeepBlockCache.CacheSlot(locator) 341 self._cache[n.locator] = n 342 return n, True 343 344 def set(self, slot, blob): 345 try: 346 if slot.set(blob): 347 self.cache_total += slot.size() 348 return 349 except OSError as e: 350 if e.errno == errno.ENOMEM: 351 # Reduce max slots to current - 4, cap cache and retry 352 with self._cache_lock: 353 self._max_slots = max(4, len(self._cache) - 4) 354 elif e.errno == errno.ENOSPC: 355 # Reduce disk max space to current - 256 MiB, cap cache and retry 356 with self._cache_lock: 357 sm = sum(st.size() for st in self._cache.values()) 358 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 359 elif e.errno == errno.ENODEV: 360 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 361 except Exception as e: 362 pass 363 finally: 364 # Check if we should evict things from the cache. Either 365 # because we added a new thing or there was an error and 366 # we possibly adjusted the limits down, so we might need 367 # to push something out. 368 self.cap_cache() 369 370 try: 371 # Only gets here if there was an error the first time. The 372 # exception handler adjusts limits downward in some cases 373 # to free up resources, which would make the operation 374 # succeed. 375 if slot.set(blob): 376 self.cache_total += slot.size() 377 except Exception as e: 378 # It failed again. Give up. 379 slot.set(None) 380 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 381 382 self.cap_cache() 383 384class Counter(object): 385 def __init__(self, v=0): 386 self._lk = threading.Lock() 387 self._val = v 388 389 def add(self, v): 390 with self._lk: 391 self._val += v 392 393 def get(self): 394 with self._lk: 395 return self._val 396 397 398class KeepClient(object): 399 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 400 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 401 402 class KeepService(PyCurlHelper): 403 """Make requests to a single Keep service, and track results. 404 405 A KeepService is intended to last long enough to perform one 406 transaction (GET or PUT) against one Keep service. This can 407 involve calling either get() or put() multiple times in order 408 to retry after transient failures. However, calling both get() 409 and put() on a single instance -- or using the same instance 410 to access two different Keep services -- will not produce 411 sensible behavior. 412 """ 413 414 HTTP_ERRORS = ( 415 socket.error, 416 ssl.SSLError, 417 arvados.errors.HttpError, 418 ) 419 420 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 421 upload_counter=None, 422 download_counter=None, 423 headers={}, 424 insecure=False): 425 super(KeepClient.KeepService, self).__init__() 426 self.root = root 427 self._user_agent_pool = user_agent_pool 428 self._result = {'error': None} 429 self._usable = True 430 self._session = None 431 self._socket = None 432 self.get_headers = {'Accept': 'application/octet-stream'} 433 self.get_headers.update(headers) 434 self.put_headers = headers 435 self.upload_counter = upload_counter 436 self.download_counter = download_counter 437 self.insecure = insecure 438 439 def usable(self): 440 """Is it worth attempting a request?""" 441 return self._usable 442 443 def finished(self): 444 """Did the request succeed or encounter permanent failure?""" 445 return self._result['error'] == False or not self._usable 446 447 def last_result(self): 448 return self._result 449 450 def _get_user_agent(self): 451 try: 452 return self._user_agent_pool.get(block=False) 453 except queue.Empty: 454 return pycurl.Curl() 455 456 def _put_user_agent(self, ua): 457 try: 458 ua.reset() 459 self._user_agent_pool.put(ua, block=False) 460 except: 461 ua.close() 462 463 def get(self, locator, method="GET", timeout=None): 464 # locator is a KeepLocator object. 465 url = self.root + str(locator) 466 _logger.debug("Request: %s %s", method, url) 467 curl = self._get_user_agent() 468 ok = None 469 try: 470 with timer.Timer() as t: 471 self._headers = {} 472 response_body = BytesIO() 473 curl.setopt(pycurl.NOSIGNAL, 1) 474 curl.setopt(pycurl.OPENSOCKETFUNCTION, 475 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 476 curl.setopt(pycurl.URL, url.encode('utf-8')) 477 curl.setopt(pycurl.HTTPHEADER, [ 478 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 479 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 480 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 481 if self.insecure: 482 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 483 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 484 else: 485 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 486 if method == "HEAD": 487 curl.setopt(pycurl.NOBODY, True) 488 else: 489 curl.setopt(pycurl.HTTPGET, True) 490 self._setcurltimeouts(curl, timeout, method=="HEAD") 491 492 try: 493 curl.perform() 494 except Exception as e: 495 raise arvados.errors.HttpError(0, str(e)) 496 finally: 497 if self._socket: 498 self._socket.close() 499 self._socket = None 500 self._result = { 501 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 502 'body': response_body.getvalue(), 503 'headers': self._headers, 504 'error': False, 505 } 506 507 ok = retry.check_http_response_success(self._result['status_code']) 508 if not ok: 509 self._result['error'] = arvados.errors.HttpError( 510 self._result['status_code'], 511 self._headers.get('x-status-line', 'Error')) 512 except self.HTTP_ERRORS as e: 513 self._result = { 514 'error': e, 515 } 516 self._usable = ok != False 517 if self._result.get('status_code', None): 518 # The client worked well enough to get an HTTP status 519 # code, so presumably any problems are just on the 520 # server side and it's OK to reuse the client. 521 self._put_user_agent(curl) 522 else: 523 # Don't return this client to the pool, in case it's 524 # broken. 525 curl.close() 526 if not ok: 527 _logger.debug("Request fail: GET %s => %s: %s", 528 url, type(self._result['error']), str(self._result['error'])) 529 return None 530 if method == "HEAD": 531 _logger.info("HEAD %s: %s bytes", 532 self._result['status_code'], 533 self._result.get('content-length')) 534 if self._result['headers'].get('x-keep-locator'): 535 # This is a response to a remote block copy request, return 536 # the local copy block locator. 537 return self._result['headers'].get('x-keep-locator') 538 return True 539 540 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 541 self._result['status_code'], 542 len(self._result['body']), 543 t.msecs, 544 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 545 546 if self.download_counter: 547 self.download_counter.add(len(self._result['body'])) 548 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 549 if resp_md5 != locator.md5sum: 550 _logger.warning("Checksum fail: md5(%s) = %s", 551 url, resp_md5) 552 self._result['error'] = arvados.errors.HttpError( 553 0, 'Checksum fail') 554 return None 555 return self._result['body'] 556 557 def put(self, hash_s, body, timeout=None, headers={}): 558 put_headers = copy.copy(self.put_headers) 559 put_headers.update(headers) 560 url = self.root + hash_s 561 _logger.debug("Request: PUT %s", url) 562 curl = self._get_user_agent() 563 ok = None 564 try: 565 with timer.Timer() as t: 566 self._headers = {} 567 body_reader = BytesIO(body) 568 response_body = BytesIO() 569 curl.setopt(pycurl.NOSIGNAL, 1) 570 curl.setopt(pycurl.OPENSOCKETFUNCTION, 571 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 572 curl.setopt(pycurl.URL, url.encode('utf-8')) 573 # Using UPLOAD tells cURL to wait for a "go ahead" from the 574 # Keep server (in the form of a HTTP/1.1 "100 Continue" 575 # response) instead of sending the request body immediately. 576 # This allows the server to reject the request if the request 577 # is invalid or the server is read-only, without waiting for 578 # the client to send the entire block. 579 curl.setopt(pycurl.UPLOAD, True) 580 curl.setopt(pycurl.INFILESIZE, len(body)) 581 curl.setopt(pycurl.READFUNCTION, body_reader.read) 582 curl.setopt(pycurl.HTTPHEADER, [ 583 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 584 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 585 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 586 if self.insecure: 587 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 588 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 589 else: 590 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 591 self._setcurltimeouts(curl, timeout) 592 try: 593 curl.perform() 594 except Exception as e: 595 raise arvados.errors.HttpError(0, str(e)) 596 finally: 597 if self._socket: 598 self._socket.close() 599 self._socket = None 600 self._result = { 601 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 602 'body': response_body.getvalue().decode('utf-8'), 603 'headers': self._headers, 604 'error': False, 605 } 606 ok = retry.check_http_response_success(self._result['status_code']) 607 if not ok: 608 self._result['error'] = arvados.errors.HttpError( 609 self._result['status_code'], 610 self._headers.get('x-status-line', 'Error')) 611 except self.HTTP_ERRORS as e: 612 self._result = { 613 'error': e, 614 } 615 self._usable = ok != False # still usable if ok is True or None 616 if self._result.get('status_code', None): 617 # Client is functional. See comment in get(). 618 self._put_user_agent(curl) 619 else: 620 curl.close() 621 if not ok: 622 _logger.debug("Request fail: PUT %s => %s: %s", 623 url, type(self._result['error']), str(self._result['error'])) 624 return False 625 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 626 self._result['status_code'], 627 len(body), 628 t.msecs, 629 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 630 if self.upload_counter: 631 self.upload_counter.add(len(body)) 632 return True 633 634 635 class KeepWriterQueue(queue.Queue): 636 def __init__(self, copies, classes=[]): 637 queue.Queue.__init__(self) # Old-style superclass 638 self.wanted_copies = copies 639 self.wanted_storage_classes = classes 640 self.successful_copies = 0 641 self.confirmed_storage_classes = {} 642 self.response = None 643 self.storage_classes_tracking = True 644 self.queue_data_lock = threading.RLock() 645 self.pending_tries = max(copies, len(classes)) 646 self.pending_tries_notification = threading.Condition() 647 648 def write_success(self, response, replicas_nr, classes_confirmed): 649 with self.queue_data_lock: 650 self.successful_copies += replicas_nr 651 if classes_confirmed is None: 652 self.storage_classes_tracking = False 653 elif self.storage_classes_tracking: 654 for st_class, st_copies in classes_confirmed.items(): 655 try: 656 self.confirmed_storage_classes[st_class] += st_copies 657 except KeyError: 658 self.confirmed_storage_classes[st_class] = st_copies 659 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 660 self.response = response 661 with self.pending_tries_notification: 662 self.pending_tries_notification.notify_all() 663 664 def write_fail(self, ks): 665 with self.pending_tries_notification: 666 self.pending_tries += 1 667 self.pending_tries_notification.notify() 668 669 def pending_copies(self): 670 with self.queue_data_lock: 671 return self.wanted_copies - self.successful_copies 672 673 def satisfied_classes(self): 674 with self.queue_data_lock: 675 if not self.storage_classes_tracking: 676 # Notifies disabled storage classes expectation to 677 # the outer loop. 678 return None 679 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 680 681 def pending_classes(self): 682 with self.queue_data_lock: 683 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 684 return [] 685 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 686 for st_class, st_copies in self.confirmed_storage_classes.items(): 687 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 688 unsatisfied_classes.remove(st_class) 689 return unsatisfied_classes 690 691 def get_next_task(self): 692 with self.pending_tries_notification: 693 while True: 694 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 695 # This notify_all() is unnecessary -- 696 # write_success() already called notify_all() 697 # when pending<1 became true, so it's not 698 # possible for any other thread to be in 699 # wait() now -- but it's cheap insurance 700 # against deadlock so we do it anyway: 701 self.pending_tries_notification.notify_all() 702 # Drain the queue and then raise Queue.Empty 703 while True: 704 self.get_nowait() 705 self.task_done() 706 elif self.pending_tries > 0: 707 service, service_root = self.get_nowait() 708 if service.finished(): 709 self.task_done() 710 continue 711 self.pending_tries -= 1 712 return service, service_root 713 elif self.empty(): 714 self.pending_tries_notification.notify_all() 715 raise queue.Empty 716 else: 717 self.pending_tries_notification.wait() 718 719 720 class KeepWriterThreadPool(object): 721 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 722 self.total_task_nr = 0 723 if (not max_service_replicas) or (max_service_replicas >= copies): 724 num_threads = 1 725 else: 726 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 727 _logger.debug("Pool max threads is %d", num_threads) 728 self.workers = [] 729 self.queue = KeepClient.KeepWriterQueue(copies, classes) 730 # Create workers 731 for _ in range(num_threads): 732 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 733 self.workers.append(w) 734 735 def add_task(self, ks, service_root): 736 self.queue.put((ks, service_root)) 737 self.total_task_nr += 1 738 739 def done(self): 740 return self.queue.successful_copies, self.queue.satisfied_classes() 741 742 def join(self): 743 # Start workers 744 for worker in self.workers: 745 worker.start() 746 # Wait for finished work 747 self.queue.join() 748 749 def response(self): 750 return self.queue.response 751 752 753 class KeepWriterThread(threading.Thread): 754 class TaskFailed(RuntimeError): pass 755 756 def __init__(self, queue, data, data_hash, timeout=None): 757 super(KeepClient.KeepWriterThread, self).__init__() 758 self.timeout = timeout 759 self.queue = queue 760 self.data = data 761 self.data_hash = data_hash 762 self.daemon = True 763 764 def run(self): 765 while True: 766 try: 767 service, service_root = self.queue.get_next_task() 768 except queue.Empty: 769 return 770 try: 771 locator, copies, classes = self.do_task(service, service_root) 772 except Exception as e: 773 if not isinstance(e, self.TaskFailed): 774 _logger.exception("Exception in KeepWriterThread") 775 self.queue.write_fail(service) 776 else: 777 self.queue.write_success(locator, copies, classes) 778 finally: 779 self.queue.task_done() 780 781 def do_task(self, service, service_root): 782 classes = self.queue.pending_classes() 783 headers = {} 784 if len(classes) > 0: 785 classes.sort() 786 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 787 success = bool(service.put(self.data_hash, 788 self.data, 789 timeout=self.timeout, 790 headers=headers)) 791 result = service.last_result() 792 793 if not success: 794 if result.get('status_code'): 795 _logger.debug("Request fail: PUT %s => %s %s", 796 self.data_hash, 797 result.get('status_code'), 798 result.get('body')) 799 raise self.TaskFailed() 800 801 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 802 str(threading.current_thread()), 803 self.data_hash, 804 len(self.data), 805 service_root) 806 try: 807 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 808 except (KeyError, ValueError): 809 replicas_stored = 1 810 811 classes_confirmed = {} 812 try: 813 scch = result['headers']['x-keep-storage-classes-confirmed'] 814 for confirmation in scch.replace(' ', '').split(','): 815 if '=' in confirmation: 816 stored_class, stored_copies = confirmation.split('=')[:2] 817 classes_confirmed[stored_class] = int(stored_copies) 818 except (KeyError, ValueError): 819 # Storage classes confirmed header missing or corrupt 820 classes_confirmed = None 821 822 return result['body'].strip(), replicas_stored, classes_confirmed 823 824 825 def __init__(self, api_client=None, proxy=None, 826 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 827 api_token=None, local_store=None, block_cache=None, 828 num_retries=10, session=None, num_prefetch_threads=None): 829 """Initialize a new KeepClient. 830 831 Arguments: 832 :api_client: 833 The API client to use to find Keep services. If not 834 provided, KeepClient will build one from available Arvados 835 configuration. 836 837 :proxy: 838 If specified, this KeepClient will send requests to this Keep 839 proxy. Otherwise, KeepClient will fall back to the setting of the 840 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 841 If you want to KeepClient does not use a proxy, pass in an empty 842 string. 843 844 :timeout: 845 The initial timeout (in seconds) for HTTP requests to Keep 846 non-proxy servers. A tuple of three floats is interpreted as 847 (connection_timeout, read_timeout, minimum_bandwidth). A connection 848 will be aborted if the average traffic rate falls below 849 minimum_bandwidth bytes per second over an interval of read_timeout 850 seconds. Because timeouts are often a result of transient server 851 load, the actual connection timeout will be increased by a factor 852 of two on each retry. 853 Default: (2, 256, 32768). 854 855 :proxy_timeout: 856 The initial timeout (in seconds) for HTTP requests to 857 Keep proxies. A tuple of three floats is interpreted as 858 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 859 described above for adjusting connection timeouts on retry also 860 applies. 861 Default: (20, 256, 32768). 862 863 :api_token: 864 If you're not using an API client, but only talking 865 directly to a Keep proxy, this parameter specifies an API token 866 to authenticate Keep requests. It is an error to specify both 867 api_client and api_token. If you specify neither, KeepClient 868 will use one available from the Arvados configuration. 869 870 :local_store: 871 If specified, this KeepClient will bypass Keep 872 services, and save data to the named directory. If unspecified, 873 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 874 environment variable. If you want to ensure KeepClient does not 875 use local storage, pass in an empty string. This is primarily 876 intended to mock a server for testing. 877 878 :num_retries: 879 The default number of times to retry failed requests. 880 This will be used as the default num_retries value when get() and 881 put() are called. Default 10. 882 """ 883 self.lock = threading.Lock() 884 if proxy is None: 885 if config.get('ARVADOS_KEEP_SERVICES'): 886 proxy = config.get('ARVADOS_KEEP_SERVICES') 887 else: 888 proxy = config.get('ARVADOS_KEEP_PROXY') 889 if api_token is None: 890 if api_client is None: 891 api_token = config.get('ARVADOS_API_TOKEN') 892 else: 893 api_token = api_client.api_token 894 elif api_client is not None: 895 raise ValueError( 896 "can't build KeepClient with both API client and token") 897 if local_store is None: 898 local_store = os.environ.get('KEEP_LOCAL_STORE') 899 900 if api_client is None: 901 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 902 else: 903 self.insecure = api_client.insecure 904 905 self.block_cache = block_cache if block_cache else KeepBlockCache() 906 self.timeout = timeout 907 self.proxy_timeout = proxy_timeout 908 self._user_agent_pool = queue.LifoQueue() 909 self.upload_counter = Counter() 910 self.download_counter = Counter() 911 self.put_counter = Counter() 912 self.get_counter = Counter() 913 self.hits_counter = Counter() 914 self.misses_counter = Counter() 915 self._storage_classes_unsupported_warning = False 916 self._default_classes = [] 917 if num_prefetch_threads is not None: 918 self.num_prefetch_threads = num_prefetch_threads 919 else: 920 self.num_prefetch_threads = 2 921 self._prefetch_queue = None 922 self._prefetch_threads = None 923 924 if local_store: 925 self.local_store = local_store 926 self.head = self.local_store_head 927 self.get = self.local_store_get 928 self.put = self.local_store_put 929 else: 930 self.num_retries = num_retries 931 self.max_replicas_per_service = None 932 if proxy: 933 proxy_uris = proxy.split() 934 for i in range(len(proxy_uris)): 935 if not proxy_uris[i].endswith('/'): 936 proxy_uris[i] += '/' 937 # URL validation 938 url = urllib.parse.urlparse(proxy_uris[i]) 939 if not (url.scheme and url.netloc): 940 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 941 self.api_token = api_token 942 self._gateway_services = {} 943 self._keep_services = [{ 944 'uuid': "00000-bi6l4-%015d" % idx, 945 'service_type': 'proxy', 946 '_service_root': uri, 947 } for idx, uri in enumerate(proxy_uris)] 948 self._writable_services = self._keep_services 949 self.using_proxy = True 950 self._static_services_list = True 951 else: 952 # It's important to avoid instantiating an API client 953 # unless we actually need one, for testing's sake. 954 if api_client is None: 955 api_client = arvados.api('v1') 956 self.api_client = api_client 957 self.api_token = api_client.api_token 958 self._gateway_services = {} 959 self._keep_services = None 960 self._writable_services = None 961 self.using_proxy = None 962 self._static_services_list = False 963 try: 964 self._default_classes = [ 965 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 966 except KeyError: 967 # We're talking to an old cluster 968 pass 969 970 def current_timeout(self, attempt_number): 971 """Return the appropriate timeout to use for this client. 972 973 The proxy timeout setting if the backend service is currently a proxy, 974 the regular timeout setting otherwise. The `attempt_number` indicates 975 how many times the operation has been tried already (starting from 0 976 for the first try), and scales the connection timeout portion of the 977 return value accordingly. 978 979 """ 980 # TODO(twp): the timeout should be a property of a 981 # KeepService, not a KeepClient. See #4488. 982 t = self.proxy_timeout if self.using_proxy else self.timeout 983 if len(t) == 2: 984 return (t[0] * (1 << attempt_number), t[1]) 985 else: 986 return (t[0] * (1 << attempt_number), t[1], t[2]) 987 def _any_nondisk_services(self, service_list): 988 return any(ks.get('service_type', 'disk') != 'disk' 989 for ks in service_list) 990 991 def build_services_list(self, force_rebuild=False): 992 if (self._static_services_list or 993 (self._keep_services and not force_rebuild)): 994 return 995 with self.lock: 996 try: 997 keep_services = self.api_client.keep_services().accessible() 998 except Exception: # API server predates Keep services. 999 keep_services = self.api_client.keep_disks().list() 1000 1001 # Gateway services are only used when specified by UUID, 1002 # so there's nothing to gain by filtering them by 1003 # service_type. 1004 self._gateway_services = {ks['uuid']: ks for ks in 1005 keep_services.execute()['items']} 1006 if not self._gateway_services: 1007 raise arvados.errors.NoKeepServersError() 1008 1009 # Precompute the base URI for each service. 1010 for r in self._gateway_services.values(): 1011 host = r['service_host'] 1012 if not host.startswith('[') and host.find(':') >= 0: 1013 # IPv6 URIs must be formatted like http://[::1]:80/... 1014 host = '[' + host + ']' 1015 r['_service_root'] = "{}://{}:{:d}/".format( 1016 'https' if r['service_ssl_flag'] else 'http', 1017 host, 1018 r['service_port']) 1019 1020 _logger.debug(str(self._gateway_services)) 1021 self._keep_services = [ 1022 ks for ks in self._gateway_services.values() 1023 if not ks.get('service_type', '').startswith('gateway:')] 1024 self._writable_services = [ks for ks in self._keep_services 1025 if not ks.get('read_only')] 1026 1027 # For disk type services, max_replicas_per_service is 1 1028 # It is unknown (unlimited) for other service types. 1029 if self._any_nondisk_services(self._writable_services): 1030 self.max_replicas_per_service = None 1031 else: 1032 self.max_replicas_per_service = 1 1033 1034 def _service_weight(self, data_hash, service_uuid): 1035 """Compute the weight of a Keep service endpoint for a data 1036 block with a known hash. 1037 1038 The weight is md5(h + u) where u is the last 15 characters of 1039 the service endpoint's UUID. 1040 """ 1041 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1042 1043 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1044 """Return an array of Keep service endpoints, in the order in 1045 which they should be probed when reading or writing data with 1046 the given hash+hints. 1047 """ 1048 self.build_services_list(force_rebuild) 1049 1050 sorted_roots = [] 1051 # Use the services indicated by the given +K@... remote 1052 # service hints, if any are present and can be resolved to a 1053 # URI. 1054 for hint in locator.hints: 1055 if hint.startswith('K@'): 1056 if len(hint) == 7: 1057 sorted_roots.append( 1058 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1059 elif len(hint) == 29: 1060 svc = self._gateway_services.get(hint[2:]) 1061 if svc: 1062 sorted_roots.append(svc['_service_root']) 1063 1064 # Sort the available local services by weight (heaviest first) 1065 # for this locator, and return their service_roots (base URIs) 1066 # in that order. 1067 use_services = self._keep_services 1068 if need_writable: 1069 use_services = self._writable_services 1070 self.using_proxy = self._any_nondisk_services(use_services) 1071 sorted_roots.extend([ 1072 svc['_service_root'] for svc in sorted( 1073 use_services, 1074 reverse=True, 1075 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1076 _logger.debug("{}: {}".format(locator, sorted_roots)) 1077 return sorted_roots 1078 1079 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1080 # roots_map is a dictionary, mapping Keep service root strings 1081 # to KeepService objects. Poll for Keep services, and add any 1082 # new ones to roots_map. Return the current list of local 1083 # root strings. 1084 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 1085 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1086 for root in local_roots: 1087 if root not in roots_map: 1088 roots_map[root] = self.KeepService( 1089 root, self._user_agent_pool, 1090 upload_counter=self.upload_counter, 1091 download_counter=self.download_counter, 1092 headers=headers, 1093 insecure=self.insecure) 1094 return local_roots 1095 1096 @staticmethod 1097 def _check_loop_result(result): 1098 # KeepClient RetryLoops should save results as a 2-tuple: the 1099 # actual result of the request, and the number of servers available 1100 # to receive the request this round. 1101 # This method returns True if there's a real result, False if 1102 # there are no more servers available, otherwise None. 1103 if isinstance(result, Exception): 1104 return None 1105 result, tried_server_count = result 1106 if (result is not None) and (result is not False): 1107 return True 1108 elif tried_server_count < 1: 1109 _logger.info("No more Keep services to try; giving up") 1110 return False 1111 else: 1112 return None 1113 1114 def get_from_cache(self, loc_s): 1115 """Fetch a block only if is in the cache, otherwise return None.""" 1116 locator = KeepLocator(loc_s) 1117 slot = self.block_cache.get(locator.md5sum) 1118 if slot is not None and slot.ready.is_set(): 1119 return slot.get() 1120 else: 1121 return None 1122 1123 def refresh_signature(self, loc): 1124 """Ask Keep to get the remote block and return its local signature""" 1125 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1126 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1127 1128 @retry.retry_method 1129 def head(self, loc_s, **kwargs): 1130 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1131 1132 @retry.retry_method 1133 def get(self, loc_s, **kwargs): 1134 return self._get_or_head(loc_s, method="GET", **kwargs) 1135 1136 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1137 """Get data from Keep. 1138 1139 This method fetches one or more blocks of data from Keep. It 1140 sends a request each Keep service registered with the API 1141 server (or the proxy provided when this client was 1142 instantiated), then each service named in location hints, in 1143 sequence. As soon as one service provides the data, it's 1144 returned. 1145 1146 Arguments: 1147 * loc_s: A string of one or more comma-separated locators to fetch. 1148 This method returns the concatenation of these blocks. 1149 * num_retries: The number of times to retry GET requests to 1150 *each* Keep server if it returns temporary failures, with 1151 exponential backoff. Note that, in each loop, the method may try 1152 to fetch data from every available Keep service, along with any 1153 that are named in location hints in the locator. The default value 1154 is set when the KeepClient is initialized. 1155 """ 1156 if ',' in loc_s: 1157 return ''.join(self.get(x) for x in loc_s.split(',')) 1158 1159 self.get_counter.add(1) 1160 1161 request_id = (request_id or 1162 (hasattr(self, 'api_client') and self.api_client.request_id) or 1163 arvados.util.new_request_id()) 1164 if headers is None: 1165 headers = {} 1166 headers['X-Request-Id'] = request_id 1167 1168 slot = None 1169 blob = None 1170 try: 1171 locator = KeepLocator(loc_s) 1172 if method == "GET": 1173 while slot is None: 1174 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1175 if first: 1176 # Fresh and empty "first time it is used" slot 1177 break 1178 if prefetch: 1179 # this is request for a prefetch to fill in 1180 # the cache, don't need to wait for the 1181 # result, so if it is already in flight return 1182 # immediately. Clear 'slot' to prevent 1183 # finally block from calling slot.set() 1184 if slot.ready.is_set(): 1185 slot.get() 1186 slot = None 1187 return None 1188 1189 blob = slot.get() 1190 if blob is not None: 1191 self.hits_counter.add(1) 1192 return blob 1193 1194 # If blob is None, this means either 1195 # 1196 # (a) another thread was fetching this block and 1197 # failed with an error or 1198 # 1199 # (b) cache thrashing caused the slot to be 1200 # evicted (content set to None) by another thread 1201 # between the call to reserve_cache() and get(). 1202 # 1203 # We'll handle these cases by reserving a new slot 1204 # and then doing a full GET request. 1205 slot = None 1206 1207 self.misses_counter.add(1) 1208 1209 # If the locator has hints specifying a prefix (indicating a 1210 # remote keepproxy) or the UUID of a local gateway service, 1211 # read data from the indicated service(s) instead of the usual 1212 # list of local disk services. 1213 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1214 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1215 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1216 for hint in locator.hints if ( 1217 hint.startswith('K@') and 1218 len(hint) == 29 and 1219 self._gateway_services.get(hint[2:]) 1220 )]) 1221 # Map root URLs to their KeepService objects. 1222 roots_map = { 1223 root: self.KeepService(root, self._user_agent_pool, 1224 upload_counter=self.upload_counter, 1225 download_counter=self.download_counter, 1226 headers=headers, 1227 insecure=self.insecure) 1228 for root in hint_roots 1229 } 1230 1231 # See #3147 for a discussion of the loop implementation. Highlights: 1232 # * Refresh the list of Keep services after each failure, in case 1233 # it's being updated. 1234 # * Retry until we succeed, we're out of retries, or every available 1235 # service has returned permanent failure. 1236 sorted_roots = [] 1237 roots_map = {} 1238 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1239 backoff_start=2) 1240 for tries_left in loop: 1241 try: 1242 sorted_roots = self.map_new_services( 1243 roots_map, locator, 1244 force_rebuild=(tries_left < num_retries), 1245 need_writable=False, 1246 headers=headers) 1247 except Exception as error: 1248 loop.save_result(error) 1249 continue 1250 1251 # Query KeepService objects that haven't returned 1252 # permanent failure, in our specified shuffle order. 1253 services_to_try = [roots_map[root] 1254 for root in sorted_roots 1255 if roots_map[root].usable()] 1256 for keep_service in services_to_try: 1257 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1258 if blob is not None: 1259 break 1260 loop.save_result((blob, len(services_to_try))) 1261 1262 # Always cache the result, then return it if we succeeded. 1263 if loop.success(): 1264 return blob 1265 finally: 1266 if slot is not None: 1267 self.block_cache.set(slot, blob) 1268 1269 # Q: Including 403 is necessary for the Keep tests to continue 1270 # passing, but maybe they should expect KeepReadError instead? 1271 not_founds = sum(1 for key in sorted_roots 1272 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1273 service_errors = ((key, roots_map[key].last_result()['error']) 1274 for key in sorted_roots) 1275 if not roots_map: 1276 raise arvados.errors.KeepReadError( 1277 "[{}] failed to read {}: no Keep services available ({})".format( 1278 request_id, loc_s, loop.last_result())) 1279 elif not_founds == len(sorted_roots): 1280 raise arvados.errors.NotFoundError( 1281 "[{}] {} not found".format(request_id, loc_s), service_errors) 1282 else: 1283 raise arvados.errors.KeepReadError( 1284 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1285 1286 @retry.retry_method 1287 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1288 """Save data in Keep. 1289 1290 This method will get a list of Keep services from the API server, and 1291 send the data to each one simultaneously in a new thread. Once the 1292 uploads are finished, if enough copies are saved, this method returns 1293 the most recent HTTP response body. If requests fail to upload 1294 enough copies, this method raises KeepWriteError. 1295 1296 Arguments: 1297 * data: The string of data to upload. 1298 * copies: The number of copies that the user requires be saved. 1299 Default 2. 1300 * num_retries: The number of times to retry PUT requests to 1301 *each* Keep server if it returns temporary failures, with 1302 exponential backoff. The default value is set when the 1303 KeepClient is initialized. 1304 * classes: An optional list of storage class names where copies should 1305 be written. 1306 """ 1307 1308 classes = classes or self._default_classes 1309 1310 if not isinstance(data, bytes): 1311 data = data.encode() 1312 1313 self.put_counter.add(1) 1314 1315 data_hash = hashlib.md5(data).hexdigest() 1316 loc_s = data_hash + '+' + str(len(data)) 1317 if copies < 1: 1318 return loc_s 1319 locator = KeepLocator(loc_s) 1320 1321 request_id = (request_id or 1322 (hasattr(self, 'api_client') and self.api_client.request_id) or 1323 arvados.util.new_request_id()) 1324 headers = { 1325 'X-Request-Id': request_id, 1326 'X-Keep-Desired-Replicas': str(copies), 1327 } 1328 roots_map = {} 1329 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1330 backoff_start=2) 1331 done_copies = 0 1332 done_classes = [] 1333 for tries_left in loop: 1334 try: 1335 sorted_roots = self.map_new_services( 1336 roots_map, locator, 1337 force_rebuild=(tries_left < num_retries), 1338 need_writable=True, 1339 headers=headers) 1340 except Exception as error: 1341 loop.save_result(error) 1342 continue 1343 1344 pending_classes = [] 1345 if done_classes is not None: 1346 pending_classes = list(set(classes) - set(done_classes)) 1347 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1348 data_hash=data_hash, 1349 copies=copies - done_copies, 1350 max_service_replicas=self.max_replicas_per_service, 1351 timeout=self.current_timeout(num_retries - tries_left), 1352 classes=pending_classes) 1353 for service_root, ks in [(root, roots_map[root]) 1354 for root in sorted_roots]: 1355 if ks.finished(): 1356 continue 1357 writer_pool.add_task(ks, service_root) 1358 writer_pool.join() 1359 pool_copies, pool_classes = writer_pool.done() 1360 done_copies += pool_copies 1361 if (done_classes is not None) and (pool_classes is not None): 1362 done_classes += pool_classes 1363 loop.save_result( 1364 (done_copies >= copies and set(done_classes) == set(classes), 1365 writer_pool.total_task_nr)) 1366 else: 1367 # Old keepstore contacted without storage classes support: 1368 # success is determined only by successful copies. 1369 # 1370 # Disable storage classes tracking from this point forward. 1371 if not self._storage_classes_unsupported_warning: 1372 self._storage_classes_unsupported_warning = True 1373 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1374 done_classes = None 1375 loop.save_result( 1376 (done_copies >= copies, writer_pool.total_task_nr)) 1377 1378 if loop.success(): 1379 return writer_pool.response() 1380 if not roots_map: 1381 raise arvados.errors.KeepWriteError( 1382 "[{}] failed to write {}: no Keep services available ({})".format( 1383 request_id, data_hash, loop.last_result())) 1384 else: 1385 service_errors = ((key, roots_map[key].last_result()['error']) 1386 for key in sorted_roots 1387 if roots_map[key].last_result()['error']) 1388 raise arvados.errors.KeepWriteError( 1389 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1390 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1391 1392 def _block_prefetch_worker(self): 1393 """The background downloader thread.""" 1394 while True: 1395 try: 1396 b = self._prefetch_queue.get() 1397 if b is None: 1398 return 1399 self.get(b, prefetch=True) 1400 except Exception: 1401 _logger.exception("Exception doing block prefetch") 1402 1403 def _start_prefetch_threads(self): 1404 if self._prefetch_threads is None: 1405 with self.lock: 1406 if self._prefetch_threads is not None: 1407 return 1408 self._prefetch_queue = queue.Queue() 1409 self._prefetch_threads = [] 1410 for i in range(0, self.num_prefetch_threads): 1411 thread = threading.Thread(target=self._block_prefetch_worker) 1412 self._prefetch_threads.append(thread) 1413 thread.daemon = True 1414 thread.start() 1415 1416 def block_prefetch(self, locator): 1417 """ 1418 This relies on the fact that KeepClient implements a block cache, 1419 so repeated requests for the same block will not result in repeated 1420 downloads (unless the block is evicted from the cache.) This method 1421 does not block. 1422 """ 1423 1424 if self.block_cache.get(locator) is not None: 1425 return 1426 1427 self._start_prefetch_threads() 1428 self._prefetch_queue.put(locator) 1429 1430 def stop_prefetch_threads(self): 1431 with self.lock: 1432 if self._prefetch_threads is not None: 1433 for t in self._prefetch_threads: 1434 self._prefetch_queue.put(None) 1435 for t in self._prefetch_threads: 1436 t.join() 1437 self._prefetch_threads = None 1438 self._prefetch_queue = None 1439 1440 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1441 """A stub for put(). 1442 1443 This method is used in place of the real put() method when 1444 using local storage (see constructor's local_store argument). 1445 1446 copies and num_retries arguments are ignored: they are here 1447 only for the sake of offering the same call signature as 1448 put(). 1449 1450 Data stored this way can be retrieved via local_store_get(). 1451 """ 1452 md5 = hashlib.md5(data).hexdigest() 1453 locator = '%s+%d' % (md5, len(data)) 1454 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1455 f.write(data) 1456 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1457 os.path.join(self.local_store, md5)) 1458 return locator 1459 1460 def local_store_get(self, loc_s, num_retries=None): 1461 """Companion to local_store_put().""" 1462 try: 1463 locator = KeepLocator(loc_s) 1464 except ValueError: 1465 raise arvados.errors.NotFoundError( 1466 "Invalid data locator: '%s'" % loc_s) 1467 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1468 return b'' 1469 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1470 return f.read() 1471 1472 def local_store_head(self, loc_s, num_retries=None): 1473 """Companion to local_store_put().""" 1474 try: 1475 locator = KeepLocator(loc_s) 1476 except ValueError: 1477 raise arvados.errors.NotFoundError( 1478 "Invalid data locator: '%s'" % loc_s) 1479 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1480 return True 1481 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1482 return True
65class KeepLocator(object): 66 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 67 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 68 69 def __init__(self, locator_str): 70 self.hints = [] 71 self._perm_sig = None 72 self._perm_expiry = None 73 pieces = iter(locator_str.split('+')) 74 self.md5sum = next(pieces) 75 try: 76 self.size = int(next(pieces)) 77 except StopIteration: 78 self.size = None 79 for hint in pieces: 80 if self.HINT_RE.match(hint) is None: 81 raise ValueError("invalid hint format: {}".format(hint)) 82 elif hint.startswith('A'): 83 self.parse_permission_hint(hint) 84 else: 85 self.hints.append(hint) 86 87 def __str__(self): 88 return '+'.join( 89 native_str(s) 90 for s in [self.md5sum, self.size, 91 self.permission_hint()] + self.hints 92 if s is not None) 93 94 def stripped(self): 95 if self.size is not None: 96 return "%s+%i" % (self.md5sum, self.size) 97 else: 98 return self.md5sum 99 100 def _make_hex_prop(name, length): 101 # Build and return a new property with the given name that 102 # must be a hex string of the given length. 103 data_name = '_{}'.format(name) 104 def getter(self): 105 return getattr(self, data_name) 106 def setter(self, hex_str): 107 if not arvados.util.is_hex(hex_str, length): 108 raise ValueError("{} is not a {}-digit hex string: {!r}". 109 format(name, length, hex_str)) 110 setattr(self, data_name, hex_str) 111 return property(getter, setter) 112 113 md5sum = _make_hex_prop('md5sum', 32) 114 perm_sig = _make_hex_prop('perm_sig', 40) 115 116 @property 117 def perm_expiry(self): 118 return self._perm_expiry 119 120 @perm_expiry.setter 121 def perm_expiry(self, value): 122 if not arvados.util.is_hex(value, 1, 8): 123 raise ValueError( 124 "permission timestamp must be a hex Unix timestamp: {}". 125 format(value)) 126 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 127 128 def permission_hint(self): 129 data = [self.perm_sig, self.perm_expiry] 130 if None in data: 131 return None 132 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 133 return "A{}@{:08x}".format(*data) 134 135 def parse_permission_hint(self, s): 136 try: 137 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 138 except IndexError: 139 raise ValueError("bad permission hint {}".format(s)) 140 141 def permission_expired(self, as_of_dt=None): 142 if self.perm_expiry is None: 143 return False 144 elif as_of_dt is None: 145 as_of_dt = datetime.datetime.now() 146 return self.perm_expiry <= as_of_dt
69 def __init__(self, locator_str): 70 self.hints = [] 71 self._perm_sig = None 72 self._perm_expiry = None 73 pieces = iter(locator_str.split('+')) 74 self.md5sum = next(pieces) 75 try: 76 self.size = int(next(pieces)) 77 except StopIteration: 78 self.size = None 79 for hint in pieces: 80 if self.HINT_RE.match(hint) is None: 81 raise ValueError("invalid hint format: {}".format(hint)) 82 elif hint.startswith('A'): 83 self.parse_permission_hint(hint) 84 else: 85 self.hints.append(hint)
149class Keep(object): 150 """Simple interface to a global KeepClient object. 151 152 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your 153 own API client. The global KeepClient will build an API client from the 154 current Arvados configuration, which may not match the one you built. 155 """ 156 _last_key = None 157 158 @classmethod 159 def global_client_object(cls): 160 global global_client_object 161 # Previously, KeepClient would change its behavior at runtime based 162 # on these configuration settings. We simulate that behavior here 163 # by checking the values and returning a new KeepClient if any of 164 # them have changed. 165 key = (config.get('ARVADOS_API_HOST'), 166 config.get('ARVADOS_API_TOKEN'), 167 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 168 config.get('ARVADOS_KEEP_PROXY'), 169 os.environ.get('KEEP_LOCAL_STORE')) 170 if (global_client_object is None) or (cls._last_key != key): 171 global_client_object = KeepClient() 172 cls._last_key = key 173 return global_client_object 174 175 @staticmethod 176 def get(locator, **kwargs): 177 return Keep.global_client_object().get(locator, **kwargs) 178 179 @staticmethod 180 def put(data, **kwargs): 181 return Keep.global_client_object().put(data, **kwargs)
Simple interface to a global KeepClient object.
THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your own API client. The global KeepClient will build an API client from the current Arvados configuration, which may not match the one you built.
158 @classmethod 159 def global_client_object(cls): 160 global global_client_object 161 # Previously, KeepClient would change its behavior at runtime based 162 # on these configuration settings. We simulate that behavior here 163 # by checking the values and returning a new KeepClient if any of 164 # them have changed. 165 key = (config.get('ARVADOS_API_HOST'), 166 config.get('ARVADOS_API_TOKEN'), 167 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 168 config.get('ARVADOS_KEEP_PROXY'), 169 os.environ.get('KEEP_LOCAL_STORE')) 170 if (global_client_object is None) or (cls._last_key != key): 171 global_client_object = KeepClient() 172 cls._last_key = key 173 return global_client_object
183class KeepBlockCache(object): 184 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 185 self.cache_max = cache_max 186 self._cache = collections.OrderedDict() 187 self._cache_lock = threading.Lock() 188 self._max_slots = max_slots 189 self._disk_cache = disk_cache 190 self._disk_cache_dir = disk_cache_dir 191 self._cache_updating = threading.Condition(self._cache_lock) 192 193 if self._disk_cache and self._disk_cache_dir is None: 194 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") 195 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True) 196 197 if self._max_slots == 0: 198 if self._disk_cache: 199 # Each block uses two file descriptors, one used to 200 # open it initially and hold the flock(), and a second 201 # hidden one used by mmap(). 202 # 203 # Set max slots to 1/8 of maximum file handles. This 204 # means we'll use at most 1/4 of total file handles. 205 # 206 # NOFILE typically defaults to 1024 on Linux so this 207 # is 128 slots (256 file handles), which means we can 208 # cache up to 8 GiB of 64 MiB blocks. This leaves 209 # 768 file handles for sockets and other stuff. 210 # 211 # When we want the ability to have more cache (e.g. in 212 # arv-mount) we'll increase rlimit before calling 213 # this. 214 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 215 else: 216 # RAM cache slots 217 self._max_slots = 512 218 219 if self.cache_max == 0: 220 if self._disk_cache: 221 fs = os.statvfs(self._disk_cache_dir) 222 # Calculation of available space incorporates existing cache usage 223 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 224 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 225 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 226 # pick smallest of: 227 # 10% of total disk size 228 # 25% of available space 229 # max_slots * 64 MiB 230 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 231 else: 232 # 256 MiB in RAM 233 self.cache_max = (256 * 1024 * 1024) 234 235 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 236 237 self.cache_total = 0 238 if self._disk_cache: 239 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 240 for slot in self._cache.values(): 241 self.cache_total += slot.size() 242 self.cap_cache() 243 244 class CacheSlot(object): 245 __slots__ = ("locator", "ready", "content") 246 247 def __init__(self, locator): 248 self.locator = locator 249 self.ready = threading.Event() 250 self.content = None 251 252 def get(self): 253 self.ready.wait() 254 return self.content 255 256 def set(self, value): 257 if self.content is not None: 258 return False 259 self.content = value 260 self.ready.set() 261 return True 262 263 def size(self): 264 if self.content is None: 265 return 0 266 else: 267 return len(self.content) 268 269 def evict(self): 270 self.content = None 271 272 273 def _resize_cache(self, cache_max, max_slots): 274 # Try and make sure the contents of the cache do not exceed 275 # the supplied maximums. 276 277 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 278 return 279 280 _evict_candidates = collections.deque(self._cache.values()) 281 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 282 slot = _evict_candidates.popleft() 283 if not slot.ready.is_set(): 284 continue 285 286 sz = slot.size() 287 slot.evict() 288 self.cache_total -= sz 289 del self._cache[slot.locator] 290 291 292 def cap_cache(self): 293 '''Cap the cache size to self.cache_max''' 294 with self._cache_updating: 295 self._resize_cache(self.cache_max, self._max_slots) 296 self._cache_updating.notify_all() 297 298 def _get(self, locator): 299 # Test if the locator is already in the cache 300 if locator in self._cache: 301 n = self._cache[locator] 302 if n.ready.is_set() and n.content is None: 303 del self._cache[n.locator] 304 return None 305 self._cache.move_to_end(locator) 306 return n 307 if self._disk_cache: 308 # see if it exists on disk 309 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 310 if n is not None: 311 self._cache[n.locator] = n 312 self.cache_total += n.size() 313 return n 314 return None 315 316 def get(self, locator): 317 with self._cache_lock: 318 return self._get(locator) 319 320 def reserve_cache(self, locator): 321 '''Reserve a cache slot for the specified locator, 322 or return the existing slot.''' 323 with self._cache_updating: 324 n = self._get(locator) 325 if n: 326 return n, False 327 else: 328 # Add a new cache slot for the locator 329 self._resize_cache(self.cache_max, self._max_slots-1) 330 while len(self._cache) >= self._max_slots: 331 # If there isn't a slot available, need to wait 332 # for something to happen that releases one of the 333 # cache slots. Idle for 200 ms or woken up by 334 # another thread 335 self._cache_updating.wait(timeout=0.2) 336 self._resize_cache(self.cache_max, self._max_slots-1) 337 338 if self._disk_cache: 339 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 340 else: 341 n = KeepBlockCache.CacheSlot(locator) 342 self._cache[n.locator] = n 343 return n, True 344 345 def set(self, slot, blob): 346 try: 347 if slot.set(blob): 348 self.cache_total += slot.size() 349 return 350 except OSError as e: 351 if e.errno == errno.ENOMEM: 352 # Reduce max slots to current - 4, cap cache and retry 353 with self._cache_lock: 354 self._max_slots = max(4, len(self._cache) - 4) 355 elif e.errno == errno.ENOSPC: 356 # Reduce disk max space to current - 256 MiB, cap cache and retry 357 with self._cache_lock: 358 sm = sum(st.size() for st in self._cache.values()) 359 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 360 elif e.errno == errno.ENODEV: 361 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 362 except Exception as e: 363 pass 364 finally: 365 # Check if we should evict things from the cache. Either 366 # because we added a new thing or there was an error and 367 # we possibly adjusted the limits down, so we might need 368 # to push something out. 369 self.cap_cache() 370 371 try: 372 # Only gets here if there was an error the first time. The 373 # exception handler adjusts limits downward in some cases 374 # to free up resources, which would make the operation 375 # succeed. 376 if slot.set(blob): 377 self.cache_total += slot.size() 378 except Exception as e: 379 # It failed again. Give up. 380 slot.set(None) 381 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 382 383 self.cap_cache()
184 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 185 self.cache_max = cache_max 186 self._cache = collections.OrderedDict() 187 self._cache_lock = threading.Lock() 188 self._max_slots = max_slots 189 self._disk_cache = disk_cache 190 self._disk_cache_dir = disk_cache_dir 191 self._cache_updating = threading.Condition(self._cache_lock) 192 193 if self._disk_cache and self._disk_cache_dir is None: 194 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") 195 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True) 196 197 if self._max_slots == 0: 198 if self._disk_cache: 199 # Each block uses two file descriptors, one used to 200 # open it initially and hold the flock(), and a second 201 # hidden one used by mmap(). 202 # 203 # Set max slots to 1/8 of maximum file handles. This 204 # means we'll use at most 1/4 of total file handles. 205 # 206 # NOFILE typically defaults to 1024 on Linux so this 207 # is 128 slots (256 file handles), which means we can 208 # cache up to 8 GiB of 64 MiB blocks. This leaves 209 # 768 file handles for sockets and other stuff. 210 # 211 # When we want the ability to have more cache (e.g. in 212 # arv-mount) we'll increase rlimit before calling 213 # this. 214 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 215 else: 216 # RAM cache slots 217 self._max_slots = 512 218 219 if self.cache_max == 0: 220 if self._disk_cache: 221 fs = os.statvfs(self._disk_cache_dir) 222 # Calculation of available space incorporates existing cache usage 223 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 224 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 225 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 226 # pick smallest of: 227 # 10% of total disk size 228 # 25% of available space 229 # max_slots * 64 MiB 230 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 231 else: 232 # 256 MiB in RAM 233 self.cache_max = (256 * 1024 * 1024) 234 235 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 236 237 self.cache_total = 0 238 if self._disk_cache: 239 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 240 for slot in self._cache.values(): 241 self.cache_total += slot.size() 242 self.cap_cache()
292 def cap_cache(self): 293 '''Cap the cache size to self.cache_max''' 294 with self._cache_updating: 295 self._resize_cache(self.cache_max, self._max_slots) 296 self._cache_updating.notify_all()
Cap the cache size to self.cache_max
320 def reserve_cache(self, locator): 321 '''Reserve a cache slot for the specified locator, 322 or return the existing slot.''' 323 with self._cache_updating: 324 n = self._get(locator) 325 if n: 326 return n, False 327 else: 328 # Add a new cache slot for the locator 329 self._resize_cache(self.cache_max, self._max_slots-1) 330 while len(self._cache) >= self._max_slots: 331 # If there isn't a slot available, need to wait 332 # for something to happen that releases one of the 333 # cache slots. Idle for 200 ms or woken up by 334 # another thread 335 self._cache_updating.wait(timeout=0.2) 336 self._resize_cache(self.cache_max, self._max_slots-1) 337 338 if self._disk_cache: 339 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 340 else: 341 n = KeepBlockCache.CacheSlot(locator) 342 self._cache[n.locator] = n 343 return n, True
Reserve a cache slot for the specified locator, or return the existing slot.
345 def set(self, slot, blob): 346 try: 347 if slot.set(blob): 348 self.cache_total += slot.size() 349 return 350 except OSError as e: 351 if e.errno == errno.ENOMEM: 352 # Reduce max slots to current - 4, cap cache and retry 353 with self._cache_lock: 354 self._max_slots = max(4, len(self._cache) - 4) 355 elif e.errno == errno.ENOSPC: 356 # Reduce disk max space to current - 256 MiB, cap cache and retry 357 with self._cache_lock: 358 sm = sum(st.size() for st in self._cache.values()) 359 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 360 elif e.errno == errno.ENODEV: 361 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 362 except Exception as e: 363 pass 364 finally: 365 # Check if we should evict things from the cache. Either 366 # because we added a new thing or there was an error and 367 # we possibly adjusted the limits down, so we might need 368 # to push something out. 369 self.cap_cache() 370 371 try: 372 # Only gets here if there was an error the first time. The 373 # exception handler adjusts limits downward in some cases 374 # to free up resources, which would make the operation 375 # succeed. 376 if slot.set(blob): 377 self.cache_total += slot.size() 378 except Exception as e: 379 # It failed again. Give up. 380 slot.set(None) 381 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 382 383 self.cap_cache()
244 class CacheSlot(object): 245 __slots__ = ("locator", "ready", "content") 246 247 def __init__(self, locator): 248 self.locator = locator 249 self.ready = threading.Event() 250 self.content = None 251 252 def get(self): 253 self.ready.wait() 254 return self.content 255 256 def set(self, value): 257 if self.content is not None: 258 return False 259 self.content = value 260 self.ready.set() 261 return True 262 263 def size(self): 264 if self.content is None: 265 return 0 266 else: 267 return len(self.content) 268 269 def evict(self): 270 self.content = None
399class KeepClient(object): 400 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 401 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 402 403 class KeepService(PyCurlHelper): 404 """Make requests to a single Keep service, and track results. 405 406 A KeepService is intended to last long enough to perform one 407 transaction (GET or PUT) against one Keep service. This can 408 involve calling either get() or put() multiple times in order 409 to retry after transient failures. However, calling both get() 410 and put() on a single instance -- or using the same instance 411 to access two different Keep services -- will not produce 412 sensible behavior. 413 """ 414 415 HTTP_ERRORS = ( 416 socket.error, 417 ssl.SSLError, 418 arvados.errors.HttpError, 419 ) 420 421 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 422 upload_counter=None, 423 download_counter=None, 424 headers={}, 425 insecure=False): 426 super(KeepClient.KeepService, self).__init__() 427 self.root = root 428 self._user_agent_pool = user_agent_pool 429 self._result = {'error': None} 430 self._usable = True 431 self._session = None 432 self._socket = None 433 self.get_headers = {'Accept': 'application/octet-stream'} 434 self.get_headers.update(headers) 435 self.put_headers = headers 436 self.upload_counter = upload_counter 437 self.download_counter = download_counter 438 self.insecure = insecure 439 440 def usable(self): 441 """Is it worth attempting a request?""" 442 return self._usable 443 444 def finished(self): 445 """Did the request succeed or encounter permanent failure?""" 446 return self._result['error'] == False or not self._usable 447 448 def last_result(self): 449 return self._result 450 451 def _get_user_agent(self): 452 try: 453 return self._user_agent_pool.get(block=False) 454 except queue.Empty: 455 return pycurl.Curl() 456 457 def _put_user_agent(self, ua): 458 try: 459 ua.reset() 460 self._user_agent_pool.put(ua, block=False) 461 except: 462 ua.close() 463 464 def get(self, locator, method="GET", timeout=None): 465 # locator is a KeepLocator object. 466 url = self.root + str(locator) 467 _logger.debug("Request: %s %s", method, url) 468 curl = self._get_user_agent() 469 ok = None 470 try: 471 with timer.Timer() as t: 472 self._headers = {} 473 response_body = BytesIO() 474 curl.setopt(pycurl.NOSIGNAL, 1) 475 curl.setopt(pycurl.OPENSOCKETFUNCTION, 476 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 477 curl.setopt(pycurl.URL, url.encode('utf-8')) 478 curl.setopt(pycurl.HTTPHEADER, [ 479 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 480 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 481 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 482 if self.insecure: 483 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 484 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 485 else: 486 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 487 if method == "HEAD": 488 curl.setopt(pycurl.NOBODY, True) 489 else: 490 curl.setopt(pycurl.HTTPGET, True) 491 self._setcurltimeouts(curl, timeout, method=="HEAD") 492 493 try: 494 curl.perform() 495 except Exception as e: 496 raise arvados.errors.HttpError(0, str(e)) 497 finally: 498 if self._socket: 499 self._socket.close() 500 self._socket = None 501 self._result = { 502 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 503 'body': response_body.getvalue(), 504 'headers': self._headers, 505 'error': False, 506 } 507 508 ok = retry.check_http_response_success(self._result['status_code']) 509 if not ok: 510 self._result['error'] = arvados.errors.HttpError( 511 self._result['status_code'], 512 self._headers.get('x-status-line', 'Error')) 513 except self.HTTP_ERRORS as e: 514 self._result = { 515 'error': e, 516 } 517 self._usable = ok != False 518 if self._result.get('status_code', None): 519 # The client worked well enough to get an HTTP status 520 # code, so presumably any problems are just on the 521 # server side and it's OK to reuse the client. 522 self._put_user_agent(curl) 523 else: 524 # Don't return this client to the pool, in case it's 525 # broken. 526 curl.close() 527 if not ok: 528 _logger.debug("Request fail: GET %s => %s: %s", 529 url, type(self._result['error']), str(self._result['error'])) 530 return None 531 if method == "HEAD": 532 _logger.info("HEAD %s: %s bytes", 533 self._result['status_code'], 534 self._result.get('content-length')) 535 if self._result['headers'].get('x-keep-locator'): 536 # This is a response to a remote block copy request, return 537 # the local copy block locator. 538 return self._result['headers'].get('x-keep-locator') 539 return True 540 541 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 542 self._result['status_code'], 543 len(self._result['body']), 544 t.msecs, 545 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 546 547 if self.download_counter: 548 self.download_counter.add(len(self._result['body'])) 549 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 550 if resp_md5 != locator.md5sum: 551 _logger.warning("Checksum fail: md5(%s) = %s", 552 url, resp_md5) 553 self._result['error'] = arvados.errors.HttpError( 554 0, 'Checksum fail') 555 return None 556 return self._result['body'] 557 558 def put(self, hash_s, body, timeout=None, headers={}): 559 put_headers = copy.copy(self.put_headers) 560 put_headers.update(headers) 561 url = self.root + hash_s 562 _logger.debug("Request: PUT %s", url) 563 curl = self._get_user_agent() 564 ok = None 565 try: 566 with timer.Timer() as t: 567 self._headers = {} 568 body_reader = BytesIO(body) 569 response_body = BytesIO() 570 curl.setopt(pycurl.NOSIGNAL, 1) 571 curl.setopt(pycurl.OPENSOCKETFUNCTION, 572 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 573 curl.setopt(pycurl.URL, url.encode('utf-8')) 574 # Using UPLOAD tells cURL to wait for a "go ahead" from the 575 # Keep server (in the form of a HTTP/1.1 "100 Continue" 576 # response) instead of sending the request body immediately. 577 # This allows the server to reject the request if the request 578 # is invalid or the server is read-only, without waiting for 579 # the client to send the entire block. 580 curl.setopt(pycurl.UPLOAD, True) 581 curl.setopt(pycurl.INFILESIZE, len(body)) 582 curl.setopt(pycurl.READFUNCTION, body_reader.read) 583 curl.setopt(pycurl.HTTPHEADER, [ 584 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 585 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 586 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 587 if self.insecure: 588 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 589 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 590 else: 591 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 592 self._setcurltimeouts(curl, timeout) 593 try: 594 curl.perform() 595 except Exception as e: 596 raise arvados.errors.HttpError(0, str(e)) 597 finally: 598 if self._socket: 599 self._socket.close() 600 self._socket = None 601 self._result = { 602 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 603 'body': response_body.getvalue().decode('utf-8'), 604 'headers': self._headers, 605 'error': False, 606 } 607 ok = retry.check_http_response_success(self._result['status_code']) 608 if not ok: 609 self._result['error'] = arvados.errors.HttpError( 610 self._result['status_code'], 611 self._headers.get('x-status-line', 'Error')) 612 except self.HTTP_ERRORS as e: 613 self._result = { 614 'error': e, 615 } 616 self._usable = ok != False # still usable if ok is True or None 617 if self._result.get('status_code', None): 618 # Client is functional. See comment in get(). 619 self._put_user_agent(curl) 620 else: 621 curl.close() 622 if not ok: 623 _logger.debug("Request fail: PUT %s => %s: %s", 624 url, type(self._result['error']), str(self._result['error'])) 625 return False 626 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 627 self._result['status_code'], 628 len(body), 629 t.msecs, 630 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 631 if self.upload_counter: 632 self.upload_counter.add(len(body)) 633 return True 634 635 636 class KeepWriterQueue(queue.Queue): 637 def __init__(self, copies, classes=[]): 638 queue.Queue.__init__(self) # Old-style superclass 639 self.wanted_copies = copies 640 self.wanted_storage_classes = classes 641 self.successful_copies = 0 642 self.confirmed_storage_classes = {} 643 self.response = None 644 self.storage_classes_tracking = True 645 self.queue_data_lock = threading.RLock() 646 self.pending_tries = max(copies, len(classes)) 647 self.pending_tries_notification = threading.Condition() 648 649 def write_success(self, response, replicas_nr, classes_confirmed): 650 with self.queue_data_lock: 651 self.successful_copies += replicas_nr 652 if classes_confirmed is None: 653 self.storage_classes_tracking = False 654 elif self.storage_classes_tracking: 655 for st_class, st_copies in classes_confirmed.items(): 656 try: 657 self.confirmed_storage_classes[st_class] += st_copies 658 except KeyError: 659 self.confirmed_storage_classes[st_class] = st_copies 660 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 661 self.response = response 662 with self.pending_tries_notification: 663 self.pending_tries_notification.notify_all() 664 665 def write_fail(self, ks): 666 with self.pending_tries_notification: 667 self.pending_tries += 1 668 self.pending_tries_notification.notify() 669 670 def pending_copies(self): 671 with self.queue_data_lock: 672 return self.wanted_copies - self.successful_copies 673 674 def satisfied_classes(self): 675 with self.queue_data_lock: 676 if not self.storage_classes_tracking: 677 # Notifies disabled storage classes expectation to 678 # the outer loop. 679 return None 680 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 681 682 def pending_classes(self): 683 with self.queue_data_lock: 684 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 685 return [] 686 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 687 for st_class, st_copies in self.confirmed_storage_classes.items(): 688 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 689 unsatisfied_classes.remove(st_class) 690 return unsatisfied_classes 691 692 def get_next_task(self): 693 with self.pending_tries_notification: 694 while True: 695 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 696 # This notify_all() is unnecessary -- 697 # write_success() already called notify_all() 698 # when pending<1 became true, so it's not 699 # possible for any other thread to be in 700 # wait() now -- but it's cheap insurance 701 # against deadlock so we do it anyway: 702 self.pending_tries_notification.notify_all() 703 # Drain the queue and then raise Queue.Empty 704 while True: 705 self.get_nowait() 706 self.task_done() 707 elif self.pending_tries > 0: 708 service, service_root = self.get_nowait() 709 if service.finished(): 710 self.task_done() 711 continue 712 self.pending_tries -= 1 713 return service, service_root 714 elif self.empty(): 715 self.pending_tries_notification.notify_all() 716 raise queue.Empty 717 else: 718 self.pending_tries_notification.wait() 719 720 721 class KeepWriterThreadPool(object): 722 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 723 self.total_task_nr = 0 724 if (not max_service_replicas) or (max_service_replicas >= copies): 725 num_threads = 1 726 else: 727 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 728 _logger.debug("Pool max threads is %d", num_threads) 729 self.workers = [] 730 self.queue = KeepClient.KeepWriterQueue(copies, classes) 731 # Create workers 732 for _ in range(num_threads): 733 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 734 self.workers.append(w) 735 736 def add_task(self, ks, service_root): 737 self.queue.put((ks, service_root)) 738 self.total_task_nr += 1 739 740 def done(self): 741 return self.queue.successful_copies, self.queue.satisfied_classes() 742 743 def join(self): 744 # Start workers 745 for worker in self.workers: 746 worker.start() 747 # Wait for finished work 748 self.queue.join() 749 750 def response(self): 751 return self.queue.response 752 753 754 class KeepWriterThread(threading.Thread): 755 class TaskFailed(RuntimeError): pass 756 757 def __init__(self, queue, data, data_hash, timeout=None): 758 super(KeepClient.KeepWriterThread, self).__init__() 759 self.timeout = timeout 760 self.queue = queue 761 self.data = data 762 self.data_hash = data_hash 763 self.daemon = True 764 765 def run(self): 766 while True: 767 try: 768 service, service_root = self.queue.get_next_task() 769 except queue.Empty: 770 return 771 try: 772 locator, copies, classes = self.do_task(service, service_root) 773 except Exception as e: 774 if not isinstance(e, self.TaskFailed): 775 _logger.exception("Exception in KeepWriterThread") 776 self.queue.write_fail(service) 777 else: 778 self.queue.write_success(locator, copies, classes) 779 finally: 780 self.queue.task_done() 781 782 def do_task(self, service, service_root): 783 classes = self.queue.pending_classes() 784 headers = {} 785 if len(classes) > 0: 786 classes.sort() 787 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 788 success = bool(service.put(self.data_hash, 789 self.data, 790 timeout=self.timeout, 791 headers=headers)) 792 result = service.last_result() 793 794 if not success: 795 if result.get('status_code'): 796 _logger.debug("Request fail: PUT %s => %s %s", 797 self.data_hash, 798 result.get('status_code'), 799 result.get('body')) 800 raise self.TaskFailed() 801 802 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 803 str(threading.current_thread()), 804 self.data_hash, 805 len(self.data), 806 service_root) 807 try: 808 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 809 except (KeyError, ValueError): 810 replicas_stored = 1 811 812 classes_confirmed = {} 813 try: 814 scch = result['headers']['x-keep-storage-classes-confirmed'] 815 for confirmation in scch.replace(' ', '').split(','): 816 if '=' in confirmation: 817 stored_class, stored_copies = confirmation.split('=')[:2] 818 classes_confirmed[stored_class] = int(stored_copies) 819 except (KeyError, ValueError): 820 # Storage classes confirmed header missing or corrupt 821 classes_confirmed = None 822 823 return result['body'].strip(), replicas_stored, classes_confirmed 824 825 826 def __init__(self, api_client=None, proxy=None, 827 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 828 api_token=None, local_store=None, block_cache=None, 829 num_retries=10, session=None, num_prefetch_threads=None): 830 """Initialize a new KeepClient. 831 832 Arguments: 833 :api_client: 834 The API client to use to find Keep services. If not 835 provided, KeepClient will build one from available Arvados 836 configuration. 837 838 :proxy: 839 If specified, this KeepClient will send requests to this Keep 840 proxy. Otherwise, KeepClient will fall back to the setting of the 841 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 842 If you want to KeepClient does not use a proxy, pass in an empty 843 string. 844 845 :timeout: 846 The initial timeout (in seconds) for HTTP requests to Keep 847 non-proxy servers. A tuple of three floats is interpreted as 848 (connection_timeout, read_timeout, minimum_bandwidth). A connection 849 will be aborted if the average traffic rate falls below 850 minimum_bandwidth bytes per second over an interval of read_timeout 851 seconds. Because timeouts are often a result of transient server 852 load, the actual connection timeout will be increased by a factor 853 of two on each retry. 854 Default: (2, 256, 32768). 855 856 :proxy_timeout: 857 The initial timeout (in seconds) for HTTP requests to 858 Keep proxies. A tuple of three floats is interpreted as 859 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 860 described above for adjusting connection timeouts on retry also 861 applies. 862 Default: (20, 256, 32768). 863 864 :api_token: 865 If you're not using an API client, but only talking 866 directly to a Keep proxy, this parameter specifies an API token 867 to authenticate Keep requests. It is an error to specify both 868 api_client and api_token. If you specify neither, KeepClient 869 will use one available from the Arvados configuration. 870 871 :local_store: 872 If specified, this KeepClient will bypass Keep 873 services, and save data to the named directory. If unspecified, 874 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 875 environment variable. If you want to ensure KeepClient does not 876 use local storage, pass in an empty string. This is primarily 877 intended to mock a server for testing. 878 879 :num_retries: 880 The default number of times to retry failed requests. 881 This will be used as the default num_retries value when get() and 882 put() are called. Default 10. 883 """ 884 self.lock = threading.Lock() 885 if proxy is None: 886 if config.get('ARVADOS_KEEP_SERVICES'): 887 proxy = config.get('ARVADOS_KEEP_SERVICES') 888 else: 889 proxy = config.get('ARVADOS_KEEP_PROXY') 890 if api_token is None: 891 if api_client is None: 892 api_token = config.get('ARVADOS_API_TOKEN') 893 else: 894 api_token = api_client.api_token 895 elif api_client is not None: 896 raise ValueError( 897 "can't build KeepClient with both API client and token") 898 if local_store is None: 899 local_store = os.environ.get('KEEP_LOCAL_STORE') 900 901 if api_client is None: 902 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 903 else: 904 self.insecure = api_client.insecure 905 906 self.block_cache = block_cache if block_cache else KeepBlockCache() 907 self.timeout = timeout 908 self.proxy_timeout = proxy_timeout 909 self._user_agent_pool = queue.LifoQueue() 910 self.upload_counter = Counter() 911 self.download_counter = Counter() 912 self.put_counter = Counter() 913 self.get_counter = Counter() 914 self.hits_counter = Counter() 915 self.misses_counter = Counter() 916 self._storage_classes_unsupported_warning = False 917 self._default_classes = [] 918 if num_prefetch_threads is not None: 919 self.num_prefetch_threads = num_prefetch_threads 920 else: 921 self.num_prefetch_threads = 2 922 self._prefetch_queue = None 923 self._prefetch_threads = None 924 925 if local_store: 926 self.local_store = local_store 927 self.head = self.local_store_head 928 self.get = self.local_store_get 929 self.put = self.local_store_put 930 else: 931 self.num_retries = num_retries 932 self.max_replicas_per_service = None 933 if proxy: 934 proxy_uris = proxy.split() 935 for i in range(len(proxy_uris)): 936 if not proxy_uris[i].endswith('/'): 937 proxy_uris[i] += '/' 938 # URL validation 939 url = urllib.parse.urlparse(proxy_uris[i]) 940 if not (url.scheme and url.netloc): 941 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 942 self.api_token = api_token 943 self._gateway_services = {} 944 self._keep_services = [{ 945 'uuid': "00000-bi6l4-%015d" % idx, 946 'service_type': 'proxy', 947 '_service_root': uri, 948 } for idx, uri in enumerate(proxy_uris)] 949 self._writable_services = self._keep_services 950 self.using_proxy = True 951 self._static_services_list = True 952 else: 953 # It's important to avoid instantiating an API client 954 # unless we actually need one, for testing's sake. 955 if api_client is None: 956 api_client = arvados.api('v1') 957 self.api_client = api_client 958 self.api_token = api_client.api_token 959 self._gateway_services = {} 960 self._keep_services = None 961 self._writable_services = None 962 self.using_proxy = None 963 self._static_services_list = False 964 try: 965 self._default_classes = [ 966 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 967 except KeyError: 968 # We're talking to an old cluster 969 pass 970 971 def current_timeout(self, attempt_number): 972 """Return the appropriate timeout to use for this client. 973 974 The proxy timeout setting if the backend service is currently a proxy, 975 the regular timeout setting otherwise. The `attempt_number` indicates 976 how many times the operation has been tried already (starting from 0 977 for the first try), and scales the connection timeout portion of the 978 return value accordingly. 979 980 """ 981 # TODO(twp): the timeout should be a property of a 982 # KeepService, not a KeepClient. See #4488. 983 t = self.proxy_timeout if self.using_proxy else self.timeout 984 if len(t) == 2: 985 return (t[0] * (1 << attempt_number), t[1]) 986 else: 987 return (t[0] * (1 << attempt_number), t[1], t[2]) 988 def _any_nondisk_services(self, service_list): 989 return any(ks.get('service_type', 'disk') != 'disk' 990 for ks in service_list) 991 992 def build_services_list(self, force_rebuild=False): 993 if (self._static_services_list or 994 (self._keep_services and not force_rebuild)): 995 return 996 with self.lock: 997 try: 998 keep_services = self.api_client.keep_services().accessible() 999 except Exception: # API server predates Keep services. 1000 keep_services = self.api_client.keep_disks().list() 1001 1002 # Gateway services are only used when specified by UUID, 1003 # so there's nothing to gain by filtering them by 1004 # service_type. 1005 self._gateway_services = {ks['uuid']: ks for ks in 1006 keep_services.execute()['items']} 1007 if not self._gateway_services: 1008 raise arvados.errors.NoKeepServersError() 1009 1010 # Precompute the base URI for each service. 1011 for r in self._gateway_services.values(): 1012 host = r['service_host'] 1013 if not host.startswith('[') and host.find(':') >= 0: 1014 # IPv6 URIs must be formatted like http://[::1]:80/... 1015 host = '[' + host + ']' 1016 r['_service_root'] = "{}://{}:{:d}/".format( 1017 'https' if r['service_ssl_flag'] else 'http', 1018 host, 1019 r['service_port']) 1020 1021 _logger.debug(str(self._gateway_services)) 1022 self._keep_services = [ 1023 ks for ks in self._gateway_services.values() 1024 if not ks.get('service_type', '').startswith('gateway:')] 1025 self._writable_services = [ks for ks in self._keep_services 1026 if not ks.get('read_only')] 1027 1028 # For disk type services, max_replicas_per_service is 1 1029 # It is unknown (unlimited) for other service types. 1030 if self._any_nondisk_services(self._writable_services): 1031 self.max_replicas_per_service = None 1032 else: 1033 self.max_replicas_per_service = 1 1034 1035 def _service_weight(self, data_hash, service_uuid): 1036 """Compute the weight of a Keep service endpoint for a data 1037 block with a known hash. 1038 1039 The weight is md5(h + u) where u is the last 15 characters of 1040 the service endpoint's UUID. 1041 """ 1042 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1043 1044 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1045 """Return an array of Keep service endpoints, in the order in 1046 which they should be probed when reading or writing data with 1047 the given hash+hints. 1048 """ 1049 self.build_services_list(force_rebuild) 1050 1051 sorted_roots = [] 1052 # Use the services indicated by the given +K@... remote 1053 # service hints, if any are present and can be resolved to a 1054 # URI. 1055 for hint in locator.hints: 1056 if hint.startswith('K@'): 1057 if len(hint) == 7: 1058 sorted_roots.append( 1059 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1060 elif len(hint) == 29: 1061 svc = self._gateway_services.get(hint[2:]) 1062 if svc: 1063 sorted_roots.append(svc['_service_root']) 1064 1065 # Sort the available local services by weight (heaviest first) 1066 # for this locator, and return their service_roots (base URIs) 1067 # in that order. 1068 use_services = self._keep_services 1069 if need_writable: 1070 use_services = self._writable_services 1071 self.using_proxy = self._any_nondisk_services(use_services) 1072 sorted_roots.extend([ 1073 svc['_service_root'] for svc in sorted( 1074 use_services, 1075 reverse=True, 1076 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1077 _logger.debug("{}: {}".format(locator, sorted_roots)) 1078 return sorted_roots 1079 1080 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1081 # roots_map is a dictionary, mapping Keep service root strings 1082 # to KeepService objects. Poll for Keep services, and add any 1083 # new ones to roots_map. Return the current list of local 1084 # root strings. 1085 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 1086 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1087 for root in local_roots: 1088 if root not in roots_map: 1089 roots_map[root] = self.KeepService( 1090 root, self._user_agent_pool, 1091 upload_counter=self.upload_counter, 1092 download_counter=self.download_counter, 1093 headers=headers, 1094 insecure=self.insecure) 1095 return local_roots 1096 1097 @staticmethod 1098 def _check_loop_result(result): 1099 # KeepClient RetryLoops should save results as a 2-tuple: the 1100 # actual result of the request, and the number of servers available 1101 # to receive the request this round. 1102 # This method returns True if there's a real result, False if 1103 # there are no more servers available, otherwise None. 1104 if isinstance(result, Exception): 1105 return None 1106 result, tried_server_count = result 1107 if (result is not None) and (result is not False): 1108 return True 1109 elif tried_server_count < 1: 1110 _logger.info("No more Keep services to try; giving up") 1111 return False 1112 else: 1113 return None 1114 1115 def get_from_cache(self, loc_s): 1116 """Fetch a block only if is in the cache, otherwise return None.""" 1117 locator = KeepLocator(loc_s) 1118 slot = self.block_cache.get(locator.md5sum) 1119 if slot is not None and slot.ready.is_set(): 1120 return slot.get() 1121 else: 1122 return None 1123 1124 def refresh_signature(self, loc): 1125 """Ask Keep to get the remote block and return its local signature""" 1126 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1127 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1128 1129 @retry.retry_method 1130 def head(self, loc_s, **kwargs): 1131 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1132 1133 @retry.retry_method 1134 def get(self, loc_s, **kwargs): 1135 return self._get_or_head(loc_s, method="GET", **kwargs) 1136 1137 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1138 """Get data from Keep. 1139 1140 This method fetches one or more blocks of data from Keep. It 1141 sends a request each Keep service registered with the API 1142 server (or the proxy provided when this client was 1143 instantiated), then each service named in location hints, in 1144 sequence. As soon as one service provides the data, it's 1145 returned. 1146 1147 Arguments: 1148 * loc_s: A string of one or more comma-separated locators to fetch. 1149 This method returns the concatenation of these blocks. 1150 * num_retries: The number of times to retry GET requests to 1151 *each* Keep server if it returns temporary failures, with 1152 exponential backoff. Note that, in each loop, the method may try 1153 to fetch data from every available Keep service, along with any 1154 that are named in location hints in the locator. The default value 1155 is set when the KeepClient is initialized. 1156 """ 1157 if ',' in loc_s: 1158 return ''.join(self.get(x) for x in loc_s.split(',')) 1159 1160 self.get_counter.add(1) 1161 1162 request_id = (request_id or 1163 (hasattr(self, 'api_client') and self.api_client.request_id) or 1164 arvados.util.new_request_id()) 1165 if headers is None: 1166 headers = {} 1167 headers['X-Request-Id'] = request_id 1168 1169 slot = None 1170 blob = None 1171 try: 1172 locator = KeepLocator(loc_s) 1173 if method == "GET": 1174 while slot is None: 1175 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1176 if first: 1177 # Fresh and empty "first time it is used" slot 1178 break 1179 if prefetch: 1180 # this is request for a prefetch to fill in 1181 # the cache, don't need to wait for the 1182 # result, so if it is already in flight return 1183 # immediately. Clear 'slot' to prevent 1184 # finally block from calling slot.set() 1185 if slot.ready.is_set(): 1186 slot.get() 1187 slot = None 1188 return None 1189 1190 blob = slot.get() 1191 if blob is not None: 1192 self.hits_counter.add(1) 1193 return blob 1194 1195 # If blob is None, this means either 1196 # 1197 # (a) another thread was fetching this block and 1198 # failed with an error or 1199 # 1200 # (b) cache thrashing caused the slot to be 1201 # evicted (content set to None) by another thread 1202 # between the call to reserve_cache() and get(). 1203 # 1204 # We'll handle these cases by reserving a new slot 1205 # and then doing a full GET request. 1206 slot = None 1207 1208 self.misses_counter.add(1) 1209 1210 # If the locator has hints specifying a prefix (indicating a 1211 # remote keepproxy) or the UUID of a local gateway service, 1212 # read data from the indicated service(s) instead of the usual 1213 # list of local disk services. 1214 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1215 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1216 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1217 for hint in locator.hints if ( 1218 hint.startswith('K@') and 1219 len(hint) == 29 and 1220 self._gateway_services.get(hint[2:]) 1221 )]) 1222 # Map root URLs to their KeepService objects. 1223 roots_map = { 1224 root: self.KeepService(root, self._user_agent_pool, 1225 upload_counter=self.upload_counter, 1226 download_counter=self.download_counter, 1227 headers=headers, 1228 insecure=self.insecure) 1229 for root in hint_roots 1230 } 1231 1232 # See #3147 for a discussion of the loop implementation. Highlights: 1233 # * Refresh the list of Keep services after each failure, in case 1234 # it's being updated. 1235 # * Retry until we succeed, we're out of retries, or every available 1236 # service has returned permanent failure. 1237 sorted_roots = [] 1238 roots_map = {} 1239 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1240 backoff_start=2) 1241 for tries_left in loop: 1242 try: 1243 sorted_roots = self.map_new_services( 1244 roots_map, locator, 1245 force_rebuild=(tries_left < num_retries), 1246 need_writable=False, 1247 headers=headers) 1248 except Exception as error: 1249 loop.save_result(error) 1250 continue 1251 1252 # Query KeepService objects that haven't returned 1253 # permanent failure, in our specified shuffle order. 1254 services_to_try = [roots_map[root] 1255 for root in sorted_roots 1256 if roots_map[root].usable()] 1257 for keep_service in services_to_try: 1258 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1259 if blob is not None: 1260 break 1261 loop.save_result((blob, len(services_to_try))) 1262 1263 # Always cache the result, then return it if we succeeded. 1264 if loop.success(): 1265 return blob 1266 finally: 1267 if slot is not None: 1268 self.block_cache.set(slot, blob) 1269 1270 # Q: Including 403 is necessary for the Keep tests to continue 1271 # passing, but maybe they should expect KeepReadError instead? 1272 not_founds = sum(1 for key in sorted_roots 1273 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1274 service_errors = ((key, roots_map[key].last_result()['error']) 1275 for key in sorted_roots) 1276 if not roots_map: 1277 raise arvados.errors.KeepReadError( 1278 "[{}] failed to read {}: no Keep services available ({})".format( 1279 request_id, loc_s, loop.last_result())) 1280 elif not_founds == len(sorted_roots): 1281 raise arvados.errors.NotFoundError( 1282 "[{}] {} not found".format(request_id, loc_s), service_errors) 1283 else: 1284 raise arvados.errors.KeepReadError( 1285 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1286 1287 @retry.retry_method 1288 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1289 """Save data in Keep. 1290 1291 This method will get a list of Keep services from the API server, and 1292 send the data to each one simultaneously in a new thread. Once the 1293 uploads are finished, if enough copies are saved, this method returns 1294 the most recent HTTP response body. If requests fail to upload 1295 enough copies, this method raises KeepWriteError. 1296 1297 Arguments: 1298 * data: The string of data to upload. 1299 * copies: The number of copies that the user requires be saved. 1300 Default 2. 1301 * num_retries: The number of times to retry PUT requests to 1302 *each* Keep server if it returns temporary failures, with 1303 exponential backoff. The default value is set when the 1304 KeepClient is initialized. 1305 * classes: An optional list of storage class names where copies should 1306 be written. 1307 """ 1308 1309 classes = classes or self._default_classes 1310 1311 if not isinstance(data, bytes): 1312 data = data.encode() 1313 1314 self.put_counter.add(1) 1315 1316 data_hash = hashlib.md5(data).hexdigest() 1317 loc_s = data_hash + '+' + str(len(data)) 1318 if copies < 1: 1319 return loc_s 1320 locator = KeepLocator(loc_s) 1321 1322 request_id = (request_id or 1323 (hasattr(self, 'api_client') and self.api_client.request_id) or 1324 arvados.util.new_request_id()) 1325 headers = { 1326 'X-Request-Id': request_id, 1327 'X-Keep-Desired-Replicas': str(copies), 1328 } 1329 roots_map = {} 1330 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1331 backoff_start=2) 1332 done_copies = 0 1333 done_classes = [] 1334 for tries_left in loop: 1335 try: 1336 sorted_roots = self.map_new_services( 1337 roots_map, locator, 1338 force_rebuild=(tries_left < num_retries), 1339 need_writable=True, 1340 headers=headers) 1341 except Exception as error: 1342 loop.save_result(error) 1343 continue 1344 1345 pending_classes = [] 1346 if done_classes is not None: 1347 pending_classes = list(set(classes) - set(done_classes)) 1348 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1349 data_hash=data_hash, 1350 copies=copies - done_copies, 1351 max_service_replicas=self.max_replicas_per_service, 1352 timeout=self.current_timeout(num_retries - tries_left), 1353 classes=pending_classes) 1354 for service_root, ks in [(root, roots_map[root]) 1355 for root in sorted_roots]: 1356 if ks.finished(): 1357 continue 1358 writer_pool.add_task(ks, service_root) 1359 writer_pool.join() 1360 pool_copies, pool_classes = writer_pool.done() 1361 done_copies += pool_copies 1362 if (done_classes is not None) and (pool_classes is not None): 1363 done_classes += pool_classes 1364 loop.save_result( 1365 (done_copies >= copies and set(done_classes) == set(classes), 1366 writer_pool.total_task_nr)) 1367 else: 1368 # Old keepstore contacted without storage classes support: 1369 # success is determined only by successful copies. 1370 # 1371 # Disable storage classes tracking from this point forward. 1372 if not self._storage_classes_unsupported_warning: 1373 self._storage_classes_unsupported_warning = True 1374 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1375 done_classes = None 1376 loop.save_result( 1377 (done_copies >= copies, writer_pool.total_task_nr)) 1378 1379 if loop.success(): 1380 return writer_pool.response() 1381 if not roots_map: 1382 raise arvados.errors.KeepWriteError( 1383 "[{}] failed to write {}: no Keep services available ({})".format( 1384 request_id, data_hash, loop.last_result())) 1385 else: 1386 service_errors = ((key, roots_map[key].last_result()['error']) 1387 for key in sorted_roots 1388 if roots_map[key].last_result()['error']) 1389 raise arvados.errors.KeepWriteError( 1390 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1391 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1392 1393 def _block_prefetch_worker(self): 1394 """The background downloader thread.""" 1395 while True: 1396 try: 1397 b = self._prefetch_queue.get() 1398 if b is None: 1399 return 1400 self.get(b, prefetch=True) 1401 except Exception: 1402 _logger.exception("Exception doing block prefetch") 1403 1404 def _start_prefetch_threads(self): 1405 if self._prefetch_threads is None: 1406 with self.lock: 1407 if self._prefetch_threads is not None: 1408 return 1409 self._prefetch_queue = queue.Queue() 1410 self._prefetch_threads = [] 1411 for i in range(0, self.num_prefetch_threads): 1412 thread = threading.Thread(target=self._block_prefetch_worker) 1413 self._prefetch_threads.append(thread) 1414 thread.daemon = True 1415 thread.start() 1416 1417 def block_prefetch(self, locator): 1418 """ 1419 This relies on the fact that KeepClient implements a block cache, 1420 so repeated requests for the same block will not result in repeated 1421 downloads (unless the block is evicted from the cache.) This method 1422 does not block. 1423 """ 1424 1425 if self.block_cache.get(locator) is not None: 1426 return 1427 1428 self._start_prefetch_threads() 1429 self._prefetch_queue.put(locator) 1430 1431 def stop_prefetch_threads(self): 1432 with self.lock: 1433 if self._prefetch_threads is not None: 1434 for t in self._prefetch_threads: 1435 self._prefetch_queue.put(None) 1436 for t in self._prefetch_threads: 1437 t.join() 1438 self._prefetch_threads = None 1439 self._prefetch_queue = None 1440 1441 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1442 """A stub for put(). 1443 1444 This method is used in place of the real put() method when 1445 using local storage (see constructor's local_store argument). 1446 1447 copies and num_retries arguments are ignored: they are here 1448 only for the sake of offering the same call signature as 1449 put(). 1450 1451 Data stored this way can be retrieved via local_store_get(). 1452 """ 1453 md5 = hashlib.md5(data).hexdigest() 1454 locator = '%s+%d' % (md5, len(data)) 1455 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1456 f.write(data) 1457 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1458 os.path.join(self.local_store, md5)) 1459 return locator 1460 1461 def local_store_get(self, loc_s, num_retries=None): 1462 """Companion to local_store_put().""" 1463 try: 1464 locator = KeepLocator(loc_s) 1465 except ValueError: 1466 raise arvados.errors.NotFoundError( 1467 "Invalid data locator: '%s'" % loc_s) 1468 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1469 return b'' 1470 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1471 return f.read() 1472 1473 def local_store_head(self, loc_s, num_retries=None): 1474 """Companion to local_store_put().""" 1475 try: 1476 locator = KeepLocator(loc_s) 1477 except ValueError: 1478 raise arvados.errors.NotFoundError( 1479 "Invalid data locator: '%s'" % loc_s) 1480 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1481 return True 1482 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1483 return True
826 def __init__(self, api_client=None, proxy=None, 827 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 828 api_token=None, local_store=None, block_cache=None, 829 num_retries=10, session=None, num_prefetch_threads=None): 830 """Initialize a new KeepClient. 831 832 Arguments: 833 :api_client: 834 The API client to use to find Keep services. If not 835 provided, KeepClient will build one from available Arvados 836 configuration. 837 838 :proxy: 839 If specified, this KeepClient will send requests to this Keep 840 proxy. Otherwise, KeepClient will fall back to the setting of the 841 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 842 If you want to KeepClient does not use a proxy, pass in an empty 843 string. 844 845 :timeout: 846 The initial timeout (in seconds) for HTTP requests to Keep 847 non-proxy servers. A tuple of three floats is interpreted as 848 (connection_timeout, read_timeout, minimum_bandwidth). A connection 849 will be aborted if the average traffic rate falls below 850 minimum_bandwidth bytes per second over an interval of read_timeout 851 seconds. Because timeouts are often a result of transient server 852 load, the actual connection timeout will be increased by a factor 853 of two on each retry. 854 Default: (2, 256, 32768). 855 856 :proxy_timeout: 857 The initial timeout (in seconds) for HTTP requests to 858 Keep proxies. A tuple of three floats is interpreted as 859 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 860 described above for adjusting connection timeouts on retry also 861 applies. 862 Default: (20, 256, 32768). 863 864 :api_token: 865 If you're not using an API client, but only talking 866 directly to a Keep proxy, this parameter specifies an API token 867 to authenticate Keep requests. It is an error to specify both 868 api_client and api_token. If you specify neither, KeepClient 869 will use one available from the Arvados configuration. 870 871 :local_store: 872 If specified, this KeepClient will bypass Keep 873 services, and save data to the named directory. If unspecified, 874 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 875 environment variable. If you want to ensure KeepClient does not 876 use local storage, pass in an empty string. This is primarily 877 intended to mock a server for testing. 878 879 :num_retries: 880 The default number of times to retry failed requests. 881 This will be used as the default num_retries value when get() and 882 put() are called. Default 10. 883 """ 884 self.lock = threading.Lock() 885 if proxy is None: 886 if config.get('ARVADOS_KEEP_SERVICES'): 887 proxy = config.get('ARVADOS_KEEP_SERVICES') 888 else: 889 proxy = config.get('ARVADOS_KEEP_PROXY') 890 if api_token is None: 891 if api_client is None: 892 api_token = config.get('ARVADOS_API_TOKEN') 893 else: 894 api_token = api_client.api_token 895 elif api_client is not None: 896 raise ValueError( 897 "can't build KeepClient with both API client and token") 898 if local_store is None: 899 local_store = os.environ.get('KEEP_LOCAL_STORE') 900 901 if api_client is None: 902 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 903 else: 904 self.insecure = api_client.insecure 905 906 self.block_cache = block_cache if block_cache else KeepBlockCache() 907 self.timeout = timeout 908 self.proxy_timeout = proxy_timeout 909 self._user_agent_pool = queue.LifoQueue() 910 self.upload_counter = Counter() 911 self.download_counter = Counter() 912 self.put_counter = Counter() 913 self.get_counter = Counter() 914 self.hits_counter = Counter() 915 self.misses_counter = Counter() 916 self._storage_classes_unsupported_warning = False 917 self._default_classes = [] 918 if num_prefetch_threads is not None: 919 self.num_prefetch_threads = num_prefetch_threads 920 else: 921 self.num_prefetch_threads = 2 922 self._prefetch_queue = None 923 self._prefetch_threads = None 924 925 if local_store: 926 self.local_store = local_store 927 self.head = self.local_store_head 928 self.get = self.local_store_get 929 self.put = self.local_store_put 930 else: 931 self.num_retries = num_retries 932 self.max_replicas_per_service = None 933 if proxy: 934 proxy_uris = proxy.split() 935 for i in range(len(proxy_uris)): 936 if not proxy_uris[i].endswith('/'): 937 proxy_uris[i] += '/' 938 # URL validation 939 url = urllib.parse.urlparse(proxy_uris[i]) 940 if not (url.scheme and url.netloc): 941 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 942 self.api_token = api_token 943 self._gateway_services = {} 944 self._keep_services = [{ 945 'uuid': "00000-bi6l4-%015d" % idx, 946 'service_type': 'proxy', 947 '_service_root': uri, 948 } for idx, uri in enumerate(proxy_uris)] 949 self._writable_services = self._keep_services 950 self.using_proxy = True 951 self._static_services_list = True 952 else: 953 # It's important to avoid instantiating an API client 954 # unless we actually need one, for testing's sake. 955 if api_client is None: 956 api_client = arvados.api('v1') 957 self.api_client = api_client 958 self.api_token = api_client.api_token 959 self._gateway_services = {} 960 self._keep_services = None 961 self._writable_services = None 962 self.using_proxy = None 963 self._static_services_list = False 964 try: 965 self._default_classes = [ 966 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 967 except KeyError: 968 # We're talking to an old cluster 969 pass
Initialize a new KeepClient.
Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.
:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.
:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).
:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).
:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.
:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.
:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.
971 def current_timeout(self, attempt_number): 972 """Return the appropriate timeout to use for this client. 973 974 The proxy timeout setting if the backend service is currently a proxy, 975 the regular timeout setting otherwise. The `attempt_number` indicates 976 how many times the operation has been tried already (starting from 0 977 for the first try), and scales the connection timeout portion of the 978 return value accordingly. 979 980 """ 981 # TODO(twp): the timeout should be a property of a 982 # KeepService, not a KeepClient. See #4488. 983 t = self.proxy_timeout if self.using_proxy else self.timeout 984 if len(t) == 2: 985 return (t[0] * (1 << attempt_number), t[1]) 986 else: 987 return (t[0] * (1 << attempt_number), t[1], t[2])
Return the appropriate timeout to use for this client.
The proxy timeout setting if the backend service is currently a proxy,
the regular timeout setting otherwise. The attempt_number
indicates
how many times the operation has been tried already (starting from 0
for the first try), and scales the connection timeout portion of the
return value accordingly.
992 def build_services_list(self, force_rebuild=False): 993 if (self._static_services_list or 994 (self._keep_services and not force_rebuild)): 995 return 996 with self.lock: 997 try: 998 keep_services = self.api_client.keep_services().accessible() 999 except Exception: # API server predates Keep services. 1000 keep_services = self.api_client.keep_disks().list() 1001 1002 # Gateway services are only used when specified by UUID, 1003 # so there's nothing to gain by filtering them by 1004 # service_type. 1005 self._gateway_services = {ks['uuid']: ks for ks in 1006 keep_services.execute()['items']} 1007 if not self._gateway_services: 1008 raise arvados.errors.NoKeepServersError() 1009 1010 # Precompute the base URI for each service. 1011 for r in self._gateway_services.values(): 1012 host = r['service_host'] 1013 if not host.startswith('[') and host.find(':') >= 0: 1014 # IPv6 URIs must be formatted like http://[::1]:80/... 1015 host = '[' + host + ']' 1016 r['_service_root'] = "{}://{}:{:d}/".format( 1017 'https' if r['service_ssl_flag'] else 'http', 1018 host, 1019 r['service_port']) 1020 1021 _logger.debug(str(self._gateway_services)) 1022 self._keep_services = [ 1023 ks for ks in self._gateway_services.values() 1024 if not ks.get('service_type', '').startswith('gateway:')] 1025 self._writable_services = [ks for ks in self._keep_services 1026 if not ks.get('read_only')] 1027 1028 # For disk type services, max_replicas_per_service is 1 1029 # It is unknown (unlimited) for other service types. 1030 if self._any_nondisk_services(self._writable_services): 1031 self.max_replicas_per_service = None 1032 else: 1033 self.max_replicas_per_service = 1
1044 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1045 """Return an array of Keep service endpoints, in the order in 1046 which they should be probed when reading or writing data with 1047 the given hash+hints. 1048 """ 1049 self.build_services_list(force_rebuild) 1050 1051 sorted_roots = [] 1052 # Use the services indicated by the given +K@... remote 1053 # service hints, if any are present and can be resolved to a 1054 # URI. 1055 for hint in locator.hints: 1056 if hint.startswith('K@'): 1057 if len(hint) == 7: 1058 sorted_roots.append( 1059 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1060 elif len(hint) == 29: 1061 svc = self._gateway_services.get(hint[2:]) 1062 if svc: 1063 sorted_roots.append(svc['_service_root']) 1064 1065 # Sort the available local services by weight (heaviest first) 1066 # for this locator, and return their service_roots (base URIs) 1067 # in that order. 1068 use_services = self._keep_services 1069 if need_writable: 1070 use_services = self._writable_services 1071 self.using_proxy = self._any_nondisk_services(use_services) 1072 sorted_roots.extend([ 1073 svc['_service_root'] for svc in sorted( 1074 use_services, 1075 reverse=True, 1076 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1077 _logger.debug("{}: {}".format(locator, sorted_roots)) 1078 return sorted_roots
Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.
1080 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1081 # roots_map is a dictionary, mapping Keep service root strings 1082 # to KeepService objects. Poll for Keep services, and add any 1083 # new ones to roots_map. Return the current list of local 1084 # root strings. 1085 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 1086 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1087 for root in local_roots: 1088 if root not in roots_map: 1089 roots_map[root] = self.KeepService( 1090 root, self._user_agent_pool, 1091 upload_counter=self.upload_counter, 1092 download_counter=self.download_counter, 1093 headers=headers, 1094 insecure=self.insecure) 1095 return local_roots
1115 def get_from_cache(self, loc_s): 1116 """Fetch a block only if is in the cache, otherwise return None.""" 1117 locator = KeepLocator(loc_s) 1118 slot = self.block_cache.get(locator.md5sum) 1119 if slot is not None and slot.ready.is_set(): 1120 return slot.get() 1121 else: 1122 return None
Fetch a block only if is in the cache, otherwise return None.
1124 def refresh_signature(self, loc): 1125 """Ask Keep to get the remote block and return its local signature""" 1126 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1127 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
Ask Keep to get the remote block and return its local signature
1287 @retry.retry_method 1288 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1289 """Save data in Keep. 1290 1291 This method will get a list of Keep services from the API server, and 1292 send the data to each one simultaneously in a new thread. Once the 1293 uploads are finished, if enough copies are saved, this method returns 1294 the most recent HTTP response body. If requests fail to upload 1295 enough copies, this method raises KeepWriteError. 1296 1297 Arguments: 1298 * data: The string of data to upload. 1299 * copies: The number of copies that the user requires be saved. 1300 Default 2. 1301 * num_retries: The number of times to retry PUT requests to 1302 *each* Keep server if it returns temporary failures, with 1303 exponential backoff. The default value is set when the 1304 KeepClient is initialized. 1305 * classes: An optional list of storage class names where copies should 1306 be written. 1307 """ 1308 1309 classes = classes or self._default_classes 1310 1311 if not isinstance(data, bytes): 1312 data = data.encode() 1313 1314 self.put_counter.add(1) 1315 1316 data_hash = hashlib.md5(data).hexdigest() 1317 loc_s = data_hash + '+' + str(len(data)) 1318 if copies < 1: 1319 return loc_s 1320 locator = KeepLocator(loc_s) 1321 1322 request_id = (request_id or 1323 (hasattr(self, 'api_client') and self.api_client.request_id) or 1324 arvados.util.new_request_id()) 1325 headers = { 1326 'X-Request-Id': request_id, 1327 'X-Keep-Desired-Replicas': str(copies), 1328 } 1329 roots_map = {} 1330 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1331 backoff_start=2) 1332 done_copies = 0 1333 done_classes = [] 1334 for tries_left in loop: 1335 try: 1336 sorted_roots = self.map_new_services( 1337 roots_map, locator, 1338 force_rebuild=(tries_left < num_retries), 1339 need_writable=True, 1340 headers=headers) 1341 except Exception as error: 1342 loop.save_result(error) 1343 continue 1344 1345 pending_classes = [] 1346 if done_classes is not None: 1347 pending_classes = list(set(classes) - set(done_classes)) 1348 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1349 data_hash=data_hash, 1350 copies=copies - done_copies, 1351 max_service_replicas=self.max_replicas_per_service, 1352 timeout=self.current_timeout(num_retries - tries_left), 1353 classes=pending_classes) 1354 for service_root, ks in [(root, roots_map[root]) 1355 for root in sorted_roots]: 1356 if ks.finished(): 1357 continue 1358 writer_pool.add_task(ks, service_root) 1359 writer_pool.join() 1360 pool_copies, pool_classes = writer_pool.done() 1361 done_copies += pool_copies 1362 if (done_classes is not None) and (pool_classes is not None): 1363 done_classes += pool_classes 1364 loop.save_result( 1365 (done_copies >= copies and set(done_classes) == set(classes), 1366 writer_pool.total_task_nr)) 1367 else: 1368 # Old keepstore contacted without storage classes support: 1369 # success is determined only by successful copies. 1370 # 1371 # Disable storage classes tracking from this point forward. 1372 if not self._storage_classes_unsupported_warning: 1373 self._storage_classes_unsupported_warning = True 1374 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1375 done_classes = None 1376 loop.save_result( 1377 (done_copies >= copies, writer_pool.total_task_nr)) 1378 1379 if loop.success(): 1380 return writer_pool.response() 1381 if not roots_map: 1382 raise arvados.errors.KeepWriteError( 1383 "[{}] failed to write {}: no Keep services available ({})".format( 1384 request_id, data_hash, loop.last_result())) 1385 else: 1386 service_errors = ((key, roots_map[key].last_result()['error']) 1387 for key in sorted_roots 1388 if roots_map[key].last_result()['error']) 1389 raise arvados.errors.KeepWriteError( 1390 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1391 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
Save data in Keep.
This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.
Arguments:
- data: The string of data to upload.
- copies: The number of copies that the user requires be saved. Default 2.
- num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
- classes: An optional list of storage class names where copies should be written.
1417 def block_prefetch(self, locator): 1418 """ 1419 This relies on the fact that KeepClient implements a block cache, 1420 so repeated requests for the same block will not result in repeated 1421 downloads (unless the block is evicted from the cache.) This method 1422 does not block. 1423 """ 1424 1425 if self.block_cache.get(locator) is not None: 1426 return 1427 1428 self._start_prefetch_threads() 1429 self._prefetch_queue.put(locator)
This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.
1441 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1442 """A stub for put(). 1443 1444 This method is used in place of the real put() method when 1445 using local storage (see constructor's local_store argument). 1446 1447 copies and num_retries arguments are ignored: they are here 1448 only for the sake of offering the same call signature as 1449 put(). 1450 1451 Data stored this way can be retrieved via local_store_get(). 1452 """ 1453 md5 = hashlib.md5(data).hexdigest() 1454 locator = '%s+%d' % (md5, len(data)) 1455 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1456 f.write(data) 1457 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1458 os.path.join(self.local_store, md5)) 1459 return locator
A stub for put().
This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).
copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().
Data stored this way can be retrieved via local_store_get().
1461 def local_store_get(self, loc_s, num_retries=None): 1462 """Companion to local_store_put().""" 1463 try: 1464 locator = KeepLocator(loc_s) 1465 except ValueError: 1466 raise arvados.errors.NotFoundError( 1467 "Invalid data locator: '%s'" % loc_s) 1468 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1469 return b'' 1470 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1471 return f.read()
Companion to local_store_put().
1473 def local_store_head(self, loc_s, num_retries=None): 1474 """Companion to local_store_put().""" 1475 try: 1476 locator = KeepLocator(loc_s) 1477 except ValueError: 1478 raise arvados.errors.NotFoundError( 1479 "Invalid data locator: '%s'" % loc_s) 1480 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1481 return True 1482 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1483 return True
Companion to local_store_put().
403 class KeepService(PyCurlHelper): 404 """Make requests to a single Keep service, and track results. 405 406 A KeepService is intended to last long enough to perform one 407 transaction (GET or PUT) against one Keep service. This can 408 involve calling either get() or put() multiple times in order 409 to retry after transient failures. However, calling both get() 410 and put() on a single instance -- or using the same instance 411 to access two different Keep services -- will not produce 412 sensible behavior. 413 """ 414 415 HTTP_ERRORS = ( 416 socket.error, 417 ssl.SSLError, 418 arvados.errors.HttpError, 419 ) 420 421 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 422 upload_counter=None, 423 download_counter=None, 424 headers={}, 425 insecure=False): 426 super(KeepClient.KeepService, self).__init__() 427 self.root = root 428 self._user_agent_pool = user_agent_pool 429 self._result = {'error': None} 430 self._usable = True 431 self._session = None 432 self._socket = None 433 self.get_headers = {'Accept': 'application/octet-stream'} 434 self.get_headers.update(headers) 435 self.put_headers = headers 436 self.upload_counter = upload_counter 437 self.download_counter = download_counter 438 self.insecure = insecure 439 440 def usable(self): 441 """Is it worth attempting a request?""" 442 return self._usable 443 444 def finished(self): 445 """Did the request succeed or encounter permanent failure?""" 446 return self._result['error'] == False or not self._usable 447 448 def last_result(self): 449 return self._result 450 451 def _get_user_agent(self): 452 try: 453 return self._user_agent_pool.get(block=False) 454 except queue.Empty: 455 return pycurl.Curl() 456 457 def _put_user_agent(self, ua): 458 try: 459 ua.reset() 460 self._user_agent_pool.put(ua, block=False) 461 except: 462 ua.close() 463 464 def get(self, locator, method="GET", timeout=None): 465 # locator is a KeepLocator object. 466 url = self.root + str(locator) 467 _logger.debug("Request: %s %s", method, url) 468 curl = self._get_user_agent() 469 ok = None 470 try: 471 with timer.Timer() as t: 472 self._headers = {} 473 response_body = BytesIO() 474 curl.setopt(pycurl.NOSIGNAL, 1) 475 curl.setopt(pycurl.OPENSOCKETFUNCTION, 476 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 477 curl.setopt(pycurl.URL, url.encode('utf-8')) 478 curl.setopt(pycurl.HTTPHEADER, [ 479 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 480 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 481 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 482 if self.insecure: 483 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 484 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 485 else: 486 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 487 if method == "HEAD": 488 curl.setopt(pycurl.NOBODY, True) 489 else: 490 curl.setopt(pycurl.HTTPGET, True) 491 self._setcurltimeouts(curl, timeout, method=="HEAD") 492 493 try: 494 curl.perform() 495 except Exception as e: 496 raise arvados.errors.HttpError(0, str(e)) 497 finally: 498 if self._socket: 499 self._socket.close() 500 self._socket = None 501 self._result = { 502 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 503 'body': response_body.getvalue(), 504 'headers': self._headers, 505 'error': False, 506 } 507 508 ok = retry.check_http_response_success(self._result['status_code']) 509 if not ok: 510 self._result['error'] = arvados.errors.HttpError( 511 self._result['status_code'], 512 self._headers.get('x-status-line', 'Error')) 513 except self.HTTP_ERRORS as e: 514 self._result = { 515 'error': e, 516 } 517 self._usable = ok != False 518 if self._result.get('status_code', None): 519 # The client worked well enough to get an HTTP status 520 # code, so presumably any problems are just on the 521 # server side and it's OK to reuse the client. 522 self._put_user_agent(curl) 523 else: 524 # Don't return this client to the pool, in case it's 525 # broken. 526 curl.close() 527 if not ok: 528 _logger.debug("Request fail: GET %s => %s: %s", 529 url, type(self._result['error']), str(self._result['error'])) 530 return None 531 if method == "HEAD": 532 _logger.info("HEAD %s: %s bytes", 533 self._result['status_code'], 534 self._result.get('content-length')) 535 if self._result['headers'].get('x-keep-locator'): 536 # This is a response to a remote block copy request, return 537 # the local copy block locator. 538 return self._result['headers'].get('x-keep-locator') 539 return True 540 541 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 542 self._result['status_code'], 543 len(self._result['body']), 544 t.msecs, 545 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 546 547 if self.download_counter: 548 self.download_counter.add(len(self._result['body'])) 549 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 550 if resp_md5 != locator.md5sum: 551 _logger.warning("Checksum fail: md5(%s) = %s", 552 url, resp_md5) 553 self._result['error'] = arvados.errors.HttpError( 554 0, 'Checksum fail') 555 return None 556 return self._result['body'] 557 558 def put(self, hash_s, body, timeout=None, headers={}): 559 put_headers = copy.copy(self.put_headers) 560 put_headers.update(headers) 561 url = self.root + hash_s 562 _logger.debug("Request: PUT %s", url) 563 curl = self._get_user_agent() 564 ok = None 565 try: 566 with timer.Timer() as t: 567 self._headers = {} 568 body_reader = BytesIO(body) 569 response_body = BytesIO() 570 curl.setopt(pycurl.NOSIGNAL, 1) 571 curl.setopt(pycurl.OPENSOCKETFUNCTION, 572 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 573 curl.setopt(pycurl.URL, url.encode('utf-8')) 574 # Using UPLOAD tells cURL to wait for a "go ahead" from the 575 # Keep server (in the form of a HTTP/1.1 "100 Continue" 576 # response) instead of sending the request body immediately. 577 # This allows the server to reject the request if the request 578 # is invalid or the server is read-only, without waiting for 579 # the client to send the entire block. 580 curl.setopt(pycurl.UPLOAD, True) 581 curl.setopt(pycurl.INFILESIZE, len(body)) 582 curl.setopt(pycurl.READFUNCTION, body_reader.read) 583 curl.setopt(pycurl.HTTPHEADER, [ 584 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 585 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 586 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 587 if self.insecure: 588 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 589 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 590 else: 591 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 592 self._setcurltimeouts(curl, timeout) 593 try: 594 curl.perform() 595 except Exception as e: 596 raise arvados.errors.HttpError(0, str(e)) 597 finally: 598 if self._socket: 599 self._socket.close() 600 self._socket = None 601 self._result = { 602 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 603 'body': response_body.getvalue().decode('utf-8'), 604 'headers': self._headers, 605 'error': False, 606 } 607 ok = retry.check_http_response_success(self._result['status_code']) 608 if not ok: 609 self._result['error'] = arvados.errors.HttpError( 610 self._result['status_code'], 611 self._headers.get('x-status-line', 'Error')) 612 except self.HTTP_ERRORS as e: 613 self._result = { 614 'error': e, 615 } 616 self._usable = ok != False # still usable if ok is True or None 617 if self._result.get('status_code', None): 618 # Client is functional. See comment in get(). 619 self._put_user_agent(curl) 620 else: 621 curl.close() 622 if not ok: 623 _logger.debug("Request fail: PUT %s => %s: %s", 624 url, type(self._result['error']), str(self._result['error'])) 625 return False 626 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 627 self._result['status_code'], 628 len(body), 629 t.msecs, 630 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 631 if self.upload_counter: 632 self.upload_counter.add(len(body)) 633 return True
Make requests to a single Keep service, and track results.
A KeepService is intended to last long enough to perform one transaction (GET or PUT) against one Keep service. This can involve calling either get() or put() multiple times in order to retry after transient failures. However, calling both get() and put() on a single instance – or using the same instance to access two different Keep services – will not produce sensible behavior.
421 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 422 upload_counter=None, 423 download_counter=None, 424 headers={}, 425 insecure=False): 426 super(KeepClient.KeepService, self).__init__() 427 self.root = root 428 self._user_agent_pool = user_agent_pool 429 self._result = {'error': None} 430 self._usable = True 431 self._session = None 432 self._socket = None 433 self.get_headers = {'Accept': 'application/octet-stream'} 434 self.get_headers.update(headers) 435 self.put_headers = headers 436 self.upload_counter = upload_counter 437 self.download_counter = download_counter 438 self.insecure = insecure
444 def finished(self): 445 """Did the request succeed or encounter permanent failure?""" 446 return self._result['error'] == False or not self._usable
Did the request succeed or encounter permanent failure?
464 def get(self, locator, method="GET", timeout=None): 465 # locator is a KeepLocator object. 466 url = self.root + str(locator) 467 _logger.debug("Request: %s %s", method, url) 468 curl = self._get_user_agent() 469 ok = None 470 try: 471 with timer.Timer() as t: 472 self._headers = {} 473 response_body = BytesIO() 474 curl.setopt(pycurl.NOSIGNAL, 1) 475 curl.setopt(pycurl.OPENSOCKETFUNCTION, 476 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 477 curl.setopt(pycurl.URL, url.encode('utf-8')) 478 curl.setopt(pycurl.HTTPHEADER, [ 479 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 480 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 481 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 482 if self.insecure: 483 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 484 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 485 else: 486 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 487 if method == "HEAD": 488 curl.setopt(pycurl.NOBODY, True) 489 else: 490 curl.setopt(pycurl.HTTPGET, True) 491 self._setcurltimeouts(curl, timeout, method=="HEAD") 492 493 try: 494 curl.perform() 495 except Exception as e: 496 raise arvados.errors.HttpError(0, str(e)) 497 finally: 498 if self._socket: 499 self._socket.close() 500 self._socket = None 501 self._result = { 502 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 503 'body': response_body.getvalue(), 504 'headers': self._headers, 505 'error': False, 506 } 507 508 ok = retry.check_http_response_success(self._result['status_code']) 509 if not ok: 510 self._result['error'] = arvados.errors.HttpError( 511 self._result['status_code'], 512 self._headers.get('x-status-line', 'Error')) 513 except self.HTTP_ERRORS as e: 514 self._result = { 515 'error': e, 516 } 517 self._usable = ok != False 518 if self._result.get('status_code', None): 519 # The client worked well enough to get an HTTP status 520 # code, so presumably any problems are just on the 521 # server side and it's OK to reuse the client. 522 self._put_user_agent(curl) 523 else: 524 # Don't return this client to the pool, in case it's 525 # broken. 526 curl.close() 527 if not ok: 528 _logger.debug("Request fail: GET %s => %s: %s", 529 url, type(self._result['error']), str(self._result['error'])) 530 return None 531 if method == "HEAD": 532 _logger.info("HEAD %s: %s bytes", 533 self._result['status_code'], 534 self._result.get('content-length')) 535 if self._result['headers'].get('x-keep-locator'): 536 # This is a response to a remote block copy request, return 537 # the local copy block locator. 538 return self._result['headers'].get('x-keep-locator') 539 return True 540 541 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 542 self._result['status_code'], 543 len(self._result['body']), 544 t.msecs, 545 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 546 547 if self.download_counter: 548 self.download_counter.add(len(self._result['body'])) 549 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 550 if resp_md5 != locator.md5sum: 551 _logger.warning("Checksum fail: md5(%s) = %s", 552 url, resp_md5) 553 self._result['error'] = arvados.errors.HttpError( 554 0, 'Checksum fail') 555 return None 556 return self._result['body']
558 def put(self, hash_s, body, timeout=None, headers={}): 559 put_headers = copy.copy(self.put_headers) 560 put_headers.update(headers) 561 url = self.root + hash_s 562 _logger.debug("Request: PUT %s", url) 563 curl = self._get_user_agent() 564 ok = None 565 try: 566 with timer.Timer() as t: 567 self._headers = {} 568 body_reader = BytesIO(body) 569 response_body = BytesIO() 570 curl.setopt(pycurl.NOSIGNAL, 1) 571 curl.setopt(pycurl.OPENSOCKETFUNCTION, 572 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 573 curl.setopt(pycurl.URL, url.encode('utf-8')) 574 # Using UPLOAD tells cURL to wait for a "go ahead" from the 575 # Keep server (in the form of a HTTP/1.1 "100 Continue" 576 # response) instead of sending the request body immediately. 577 # This allows the server to reject the request if the request 578 # is invalid or the server is read-only, without waiting for 579 # the client to send the entire block. 580 curl.setopt(pycurl.UPLOAD, True) 581 curl.setopt(pycurl.INFILESIZE, len(body)) 582 curl.setopt(pycurl.READFUNCTION, body_reader.read) 583 curl.setopt(pycurl.HTTPHEADER, [ 584 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 585 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 586 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 587 if self.insecure: 588 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 589 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 590 else: 591 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 592 self._setcurltimeouts(curl, timeout) 593 try: 594 curl.perform() 595 except Exception as e: 596 raise arvados.errors.HttpError(0, str(e)) 597 finally: 598 if self._socket: 599 self._socket.close() 600 self._socket = None 601 self._result = { 602 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 603 'body': response_body.getvalue().decode('utf-8'), 604 'headers': self._headers, 605 'error': False, 606 } 607 ok = retry.check_http_response_success(self._result['status_code']) 608 if not ok: 609 self._result['error'] = arvados.errors.HttpError( 610 self._result['status_code'], 611 self._headers.get('x-status-line', 'Error')) 612 except self.HTTP_ERRORS as e: 613 self._result = { 614 'error': e, 615 } 616 self._usable = ok != False # still usable if ok is True or None 617 if self._result.get('status_code', None): 618 # Client is functional. See comment in get(). 619 self._put_user_agent(curl) 620 else: 621 curl.close() 622 if not ok: 623 _logger.debug("Request fail: PUT %s => %s: %s", 624 url, type(self._result['error']), str(self._result['error'])) 625 return False 626 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 627 self._result['status_code'], 628 len(body), 629 t.msecs, 630 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 631 if self.upload_counter: 632 self.upload_counter.add(len(body)) 633 return True
636 class KeepWriterQueue(queue.Queue): 637 def __init__(self, copies, classes=[]): 638 queue.Queue.__init__(self) # Old-style superclass 639 self.wanted_copies = copies 640 self.wanted_storage_classes = classes 641 self.successful_copies = 0 642 self.confirmed_storage_classes = {} 643 self.response = None 644 self.storage_classes_tracking = True 645 self.queue_data_lock = threading.RLock() 646 self.pending_tries = max(copies, len(classes)) 647 self.pending_tries_notification = threading.Condition() 648 649 def write_success(self, response, replicas_nr, classes_confirmed): 650 with self.queue_data_lock: 651 self.successful_copies += replicas_nr 652 if classes_confirmed is None: 653 self.storage_classes_tracking = False 654 elif self.storage_classes_tracking: 655 for st_class, st_copies in classes_confirmed.items(): 656 try: 657 self.confirmed_storage_classes[st_class] += st_copies 658 except KeyError: 659 self.confirmed_storage_classes[st_class] = st_copies 660 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 661 self.response = response 662 with self.pending_tries_notification: 663 self.pending_tries_notification.notify_all() 664 665 def write_fail(self, ks): 666 with self.pending_tries_notification: 667 self.pending_tries += 1 668 self.pending_tries_notification.notify() 669 670 def pending_copies(self): 671 with self.queue_data_lock: 672 return self.wanted_copies - self.successful_copies 673 674 def satisfied_classes(self): 675 with self.queue_data_lock: 676 if not self.storage_classes_tracking: 677 # Notifies disabled storage classes expectation to 678 # the outer loop. 679 return None 680 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 681 682 def pending_classes(self): 683 with self.queue_data_lock: 684 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 685 return [] 686 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 687 for st_class, st_copies in self.confirmed_storage_classes.items(): 688 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 689 unsatisfied_classes.remove(st_class) 690 return unsatisfied_classes 691 692 def get_next_task(self): 693 with self.pending_tries_notification: 694 while True: 695 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 696 # This notify_all() is unnecessary -- 697 # write_success() already called notify_all() 698 # when pending<1 became true, so it's not 699 # possible for any other thread to be in 700 # wait() now -- but it's cheap insurance 701 # against deadlock so we do it anyway: 702 self.pending_tries_notification.notify_all() 703 # Drain the queue and then raise Queue.Empty 704 while True: 705 self.get_nowait() 706 self.task_done() 707 elif self.pending_tries > 0: 708 service, service_root = self.get_nowait() 709 if service.finished(): 710 self.task_done() 711 continue 712 self.pending_tries -= 1 713 return service, service_root 714 elif self.empty(): 715 self.pending_tries_notification.notify_all() 716 raise queue.Empty 717 else: 718 self.pending_tries_notification.wait()
Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
637 def __init__(self, copies, classes=[]): 638 queue.Queue.__init__(self) # Old-style superclass 639 self.wanted_copies = copies 640 self.wanted_storage_classes = classes 641 self.successful_copies = 0 642 self.confirmed_storage_classes = {} 643 self.response = None 644 self.storage_classes_tracking = True 645 self.queue_data_lock = threading.RLock() 646 self.pending_tries = max(copies, len(classes)) 647 self.pending_tries_notification = threading.Condition()
649 def write_success(self, response, replicas_nr, classes_confirmed): 650 with self.queue_data_lock: 651 self.successful_copies += replicas_nr 652 if classes_confirmed is None: 653 self.storage_classes_tracking = False 654 elif self.storage_classes_tracking: 655 for st_class, st_copies in classes_confirmed.items(): 656 try: 657 self.confirmed_storage_classes[st_class] += st_copies 658 except KeyError: 659 self.confirmed_storage_classes[st_class] = st_copies 660 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 661 self.response = response 662 with self.pending_tries_notification: 663 self.pending_tries_notification.notify_all()
682 def pending_classes(self): 683 with self.queue_data_lock: 684 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 685 return [] 686 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 687 for st_class, st_copies in self.confirmed_storage_classes.items(): 688 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 689 unsatisfied_classes.remove(st_class) 690 return unsatisfied_classes
692 def get_next_task(self): 693 with self.pending_tries_notification: 694 while True: 695 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 696 # This notify_all() is unnecessary -- 697 # write_success() already called notify_all() 698 # when pending<1 became true, so it's not 699 # possible for any other thread to be in 700 # wait() now -- but it's cheap insurance 701 # against deadlock so we do it anyway: 702 self.pending_tries_notification.notify_all() 703 # Drain the queue and then raise Queue.Empty 704 while True: 705 self.get_nowait() 706 self.task_done() 707 elif self.pending_tries > 0: 708 service, service_root = self.get_nowait() 709 if service.finished(): 710 self.task_done() 711 continue 712 self.pending_tries -= 1 713 return service, service_root 714 elif self.empty(): 715 self.pending_tries_notification.notify_all() 716 raise queue.Empty 717 else: 718 self.pending_tries_notification.wait()
Inherited Members
- queue.Queue
- maxsize
- mutex
- not_empty
- not_full
- all_tasks_done
- unfinished_tasks
- task_done
- join
- qsize
- empty
- full
- put
- get
- put_nowait
- get_nowait
721 class KeepWriterThreadPool(object): 722 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 723 self.total_task_nr = 0 724 if (not max_service_replicas) or (max_service_replicas >= copies): 725 num_threads = 1 726 else: 727 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 728 _logger.debug("Pool max threads is %d", num_threads) 729 self.workers = [] 730 self.queue = KeepClient.KeepWriterQueue(copies, classes) 731 # Create workers 732 for _ in range(num_threads): 733 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 734 self.workers.append(w) 735 736 def add_task(self, ks, service_root): 737 self.queue.put((ks, service_root)) 738 self.total_task_nr += 1 739 740 def done(self): 741 return self.queue.successful_copies, self.queue.satisfied_classes() 742 743 def join(self): 744 # Start workers 745 for worker in self.workers: 746 worker.start() 747 # Wait for finished work 748 self.queue.join() 749 750 def response(self): 751 return self.queue.response
722 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 723 self.total_task_nr = 0 724 if (not max_service_replicas) or (max_service_replicas >= copies): 725 num_threads = 1 726 else: 727 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 728 _logger.debug("Pool max threads is %d", num_threads) 729 self.workers = [] 730 self.queue = KeepClient.KeepWriterQueue(copies, classes) 731 # Create workers 732 for _ in range(num_threads): 733 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 734 self.workers.append(w)
754 class KeepWriterThread(threading.Thread): 755 class TaskFailed(RuntimeError): pass 756 757 def __init__(self, queue, data, data_hash, timeout=None): 758 super(KeepClient.KeepWriterThread, self).__init__() 759 self.timeout = timeout 760 self.queue = queue 761 self.data = data 762 self.data_hash = data_hash 763 self.daemon = True 764 765 def run(self): 766 while True: 767 try: 768 service, service_root = self.queue.get_next_task() 769 except queue.Empty: 770 return 771 try: 772 locator, copies, classes = self.do_task(service, service_root) 773 except Exception as e: 774 if not isinstance(e, self.TaskFailed): 775 _logger.exception("Exception in KeepWriterThread") 776 self.queue.write_fail(service) 777 else: 778 self.queue.write_success(locator, copies, classes) 779 finally: 780 self.queue.task_done() 781 782 def do_task(self, service, service_root): 783 classes = self.queue.pending_classes() 784 headers = {} 785 if len(classes) > 0: 786 classes.sort() 787 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 788 success = bool(service.put(self.data_hash, 789 self.data, 790 timeout=self.timeout, 791 headers=headers)) 792 result = service.last_result() 793 794 if not success: 795 if result.get('status_code'): 796 _logger.debug("Request fail: PUT %s => %s %s", 797 self.data_hash, 798 result.get('status_code'), 799 result.get('body')) 800 raise self.TaskFailed() 801 802 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 803 str(threading.current_thread()), 804 self.data_hash, 805 len(self.data), 806 service_root) 807 try: 808 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 809 except (KeyError, ValueError): 810 replicas_stored = 1 811 812 classes_confirmed = {} 813 try: 814 scch = result['headers']['x-keep-storage-classes-confirmed'] 815 for confirmation in scch.replace(' ', '').split(','): 816 if '=' in confirmation: 817 stored_class, stored_copies = confirmation.split('=')[:2] 818 classes_confirmed[stored_class] = int(stored_copies) 819 except (KeyError, ValueError): 820 # Storage classes confirmed header missing or corrupt 821 classes_confirmed = None 822 823 return result['body'].strip(), replicas_stored, classes_confirmed
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
757 def __init__(self, queue, data, data_hash, timeout=None): 758 super(KeepClient.KeepWriterThread, self).__init__() 759 self.timeout = timeout 760 self.queue = queue 761 self.data = data 762 self.data_hash = data_hash 763 self.daemon = True
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1108 @property 1109 def daemon(self): 1110 """A boolean value indicating whether this thread is a daemon thread. 1111 1112 This must be set before start() is called, otherwise RuntimeError is 1113 raised. Its initial value is inherited from the creating thread; the 1114 main thread is not a daemon thread and therefore all threads created in 1115 the main thread default to daemon = False. 1116 1117 The entire Python program exits when only daemon threads are left. 1118 1119 """ 1120 assert self._initialized, "Thread.__init__() not called" 1121 return self._daemonic
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
765 def run(self): 766 while True: 767 try: 768 service, service_root = self.queue.get_next_task() 769 except queue.Empty: 770 return 771 try: 772 locator, copies, classes = self.do_task(service, service_root) 773 except Exception as e: 774 if not isinstance(e, self.TaskFailed): 775 _logger.exception("Exception in KeepWriterThread") 776 self.queue.write_fail(service) 777 else: 778 self.queue.write_success(locator, copies, classes) 779 finally: 780 self.queue.task_done()
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
782 def do_task(self, service, service_root): 783 classes = self.queue.pending_classes() 784 headers = {} 785 if len(classes) > 0: 786 classes.sort() 787 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 788 success = bool(service.put(self.data_hash, 789 self.data, 790 timeout=self.timeout, 791 headers=headers)) 792 result = service.last_result() 793 794 if not success: 795 if result.get('status_code'): 796 _logger.debug("Request fail: PUT %s => %s %s", 797 self.data_hash, 798 result.get('status_code'), 799 result.get('body')) 800 raise self.TaskFailed() 801 802 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 803 str(threading.current_thread()), 804 self.data_hash, 805 len(self.data), 806 service_root) 807 try: 808 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 809 except (KeyError, ValueError): 810 replicas_stored = 1 811 812 classes_confirmed = {} 813 try: 814 scch = result['headers']['x-keep-storage-classes-confirmed'] 815 for confirmation in scch.replace(' ', '').split(','): 816 if '=' in confirmation: 817 stored_class, stored_copies = confirmation.split('=')[:2] 818 classes_confirmed[stored_class] = int(stored_copies) 819 except (KeyError, ValueError): 820 # Storage classes confirmed header missing or corrupt 821 classes_confirmed = None 822 823 return result['body'].strip(), replicas_stored, classes_confirmed
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isDaemon
- setDaemon
- getName
- setName
- native_id
755 class TaskFailed(RuntimeError): pass
Unspecified run-time error.
Inherited Members
- builtins.RuntimeError
- RuntimeError
- builtins.BaseException
- with_traceback
- args