arvados.keep
1# Copyright (C) The Arvados Authors. All rights reserved. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import copy 6import collections 7import datetime 8import hashlib 9import errno 10import io 11import logging 12import math 13import os 14import pycurl 15import queue 16import re 17import socket 18import ssl 19import sys 20import threading 21import resource 22import urllib.parse 23import traceback 24import weakref 25 26from io import BytesIO 27 28import arvados 29import arvados.config as config 30import arvados.errors 31import arvados.retry as retry 32import arvados.util 33import arvados.diskcache 34from arvados._pycurlhelper import PyCurlHelper 35from . import timer 36 37_logger = logging.getLogger('arvados.keep') 38global_client_object = None 39 40# Monkey patch TCP constants when not available (apple). Values sourced from: 41# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h 42if sys.platform == 'darwin': 43 if not hasattr(socket, 'TCP_KEEPALIVE'): 44 socket.TCP_KEEPALIVE = 0x010 45 if not hasattr(socket, 'TCP_KEEPINTVL'): 46 socket.TCP_KEEPINTVL = 0x101 47 if not hasattr(socket, 'TCP_KEEPCNT'): 48 socket.TCP_KEEPCNT = 0x102 49 50class KeepLocator(object): 51 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 52 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 53 54 def __init__(self, locator_str): 55 self.hints = [] 56 self._perm_sig = None 57 self._perm_expiry = None 58 pieces = iter(locator_str.split('+')) 59 self.md5sum = next(pieces) 60 try: 61 self.size = int(next(pieces)) 62 except StopIteration: 63 self.size = None 64 for hint in pieces: 65 if self.HINT_RE.match(hint) is None: 66 raise ValueError("invalid hint format: {}".format(hint)) 67 elif hint.startswith('A'): 68 self.parse_permission_hint(hint) 69 else: 70 self.hints.append(hint) 71 72 def __str__(self): 73 return '+'.join( 74 str(s) 75 for s in [self.md5sum, self.size, 76 self.permission_hint()] + self.hints 77 if s is not None) 78 79 def stripped(self): 80 if self.size is not None: 81 return "%s+%i" % (self.md5sum, self.size) 82 else: 83 return self.md5sum 84 85 def _make_hex_prop(name, length): 86 # Build and return a new property with the given name that 87 # must be a hex string of the given length. 88 data_name = '_{}'.format(name) 89 def getter(self): 90 return getattr(self, data_name) 91 def setter(self, hex_str): 92 if not arvados.util.is_hex(hex_str, length): 93 raise ValueError("{} is not a {}-digit hex string: {!r}". 94 format(name, length, hex_str)) 95 setattr(self, data_name, hex_str) 96 return property(getter, setter) 97 98 md5sum = _make_hex_prop('md5sum', 32) 99 perm_sig = _make_hex_prop('perm_sig', 40) 100 101 @property 102 def perm_expiry(self): 103 return self._perm_expiry 104 105 @perm_expiry.setter 106 def perm_expiry(self, value): 107 if not arvados.util.is_hex(value, 1, 8): 108 raise ValueError( 109 "permission timestamp must be a hex Unix timestamp: {}". 110 format(value)) 111 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 112 113 def permission_hint(self): 114 data = [self.perm_sig, self.perm_expiry] 115 if None in data: 116 return None 117 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 118 return "A{}@{:08x}".format(*data) 119 120 def parse_permission_hint(self, s): 121 try: 122 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 123 except IndexError: 124 raise ValueError("bad permission hint {}".format(s)) 125 126 def permission_expired(self, as_of_dt=None): 127 if self.perm_expiry is None: 128 return False 129 elif as_of_dt is None: 130 as_of_dt = datetime.datetime.now() 131 return self.perm_expiry <= as_of_dt 132 133 134class Keep(object): 135 """Simple interface to a global KeepClient object. 136 137 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your 138 own API client. The global KeepClient will build an API client from the 139 current Arvados configuration, which may not match the one you built. 140 """ 141 _last_key = None 142 143 @classmethod 144 def global_client_object(cls): 145 global global_client_object 146 # Previously, KeepClient would change its behavior at runtime based 147 # on these configuration settings. We simulate that behavior here 148 # by checking the values and returning a new KeepClient if any of 149 # them have changed. 150 key = (config.get('ARVADOS_API_HOST'), 151 config.get('ARVADOS_API_TOKEN'), 152 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 153 config.get('ARVADOS_KEEP_PROXY'), 154 os.environ.get('KEEP_LOCAL_STORE')) 155 if (global_client_object is None) or (cls._last_key != key): 156 global_client_object = KeepClient() 157 cls._last_key = key 158 return global_client_object 159 160 @staticmethod 161 def get(locator, **kwargs): 162 return Keep.global_client_object().get(locator, **kwargs) 163 164 @staticmethod 165 def put(data, **kwargs): 166 return Keep.global_client_object().put(data, **kwargs) 167 168class KeepBlockCache(object): 169 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 170 self.cache_max = cache_max 171 self._cache = collections.OrderedDict() 172 self._cache_lock = threading.Lock() 173 self._max_slots = max_slots 174 self._disk_cache = disk_cache 175 self._disk_cache_dir = disk_cache_dir 176 self._cache_updating = threading.Condition(self._cache_lock) 177 178 if self._disk_cache and self._disk_cache_dir is None: 179 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") 180 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True) 181 182 if self._max_slots == 0: 183 if self._disk_cache: 184 # Each block uses two file descriptors, one used to 185 # open it initially and hold the flock(), and a second 186 # hidden one used by mmap(). 187 # 188 # Set max slots to 1/8 of maximum file handles. This 189 # means we'll use at most 1/4 of total file handles. 190 # 191 # NOFILE typically defaults to 1024 on Linux so this 192 # is 128 slots (256 file handles), which means we can 193 # cache up to 8 GiB of 64 MiB blocks. This leaves 194 # 768 file handles for sockets and other stuff. 195 # 196 # When we want the ability to have more cache (e.g. in 197 # arv-mount) we'll increase rlimit before calling 198 # this. 199 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 200 else: 201 # RAM cache slots 202 self._max_slots = 512 203 204 if self.cache_max == 0: 205 if self._disk_cache: 206 fs = os.statvfs(self._disk_cache_dir) 207 # Calculation of available space incorporates existing cache usage 208 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 209 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 210 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 211 # pick smallest of: 212 # 10% of total disk size 213 # 25% of available space 214 # max_slots * 64 MiB 215 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 216 else: 217 # 256 MiB in RAM 218 self.cache_max = (256 * 1024 * 1024) 219 220 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 221 222 self.cache_total = 0 223 if self._disk_cache: 224 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 225 for slot in self._cache.values(): 226 self.cache_total += slot.size() 227 self.cap_cache() 228 229 class CacheSlot(object): 230 __slots__ = ("locator", "ready", "content") 231 232 def __init__(self, locator): 233 self.locator = locator 234 self.ready = threading.Event() 235 self.content = None 236 237 def get(self): 238 self.ready.wait() 239 return self.content 240 241 def set(self, value): 242 if self.content is not None: 243 return False 244 self.content = value 245 self.ready.set() 246 return True 247 248 def size(self): 249 if self.content is None: 250 return 0 251 else: 252 return len(self.content) 253 254 def evict(self): 255 self.content = None 256 257 258 def _resize_cache(self, cache_max, max_slots): 259 # Try and make sure the contents of the cache do not exceed 260 # the supplied maximums. 261 262 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 263 return 264 265 _evict_candidates = collections.deque(self._cache.values()) 266 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 267 slot = _evict_candidates.popleft() 268 if not slot.ready.is_set(): 269 continue 270 271 sz = slot.size() 272 slot.evict() 273 self.cache_total -= sz 274 del self._cache[slot.locator] 275 276 277 def cap_cache(self): 278 '''Cap the cache size to self.cache_max''' 279 with self._cache_updating: 280 self._resize_cache(self.cache_max, self._max_slots) 281 self._cache_updating.notify_all() 282 283 def _get(self, locator): 284 # Test if the locator is already in the cache 285 if locator in self._cache: 286 n = self._cache[locator] 287 if n.ready.is_set() and n.content is None: 288 del self._cache[n.locator] 289 return None 290 self._cache.move_to_end(locator) 291 return n 292 if self._disk_cache: 293 # see if it exists on disk 294 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 295 if n is not None: 296 self._cache[n.locator] = n 297 self.cache_total += n.size() 298 return n 299 return None 300 301 def get(self, locator): 302 with self._cache_lock: 303 return self._get(locator) 304 305 def reserve_cache(self, locator): 306 '''Reserve a cache slot for the specified locator, 307 or return the existing slot.''' 308 with self._cache_updating: 309 n = self._get(locator) 310 if n: 311 return n, False 312 else: 313 # Add a new cache slot for the locator 314 self._resize_cache(self.cache_max, self._max_slots-1) 315 while len(self._cache) >= self._max_slots: 316 # If there isn't a slot available, need to wait 317 # for something to happen that releases one of the 318 # cache slots. Idle for 200 ms or woken up by 319 # another thread 320 self._cache_updating.wait(timeout=0.2) 321 self._resize_cache(self.cache_max, self._max_slots-1) 322 323 if self._disk_cache: 324 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 325 else: 326 n = KeepBlockCache.CacheSlot(locator) 327 self._cache[n.locator] = n 328 return n, True 329 330 def set(self, slot, blob): 331 try: 332 if slot.set(blob): 333 self.cache_total += slot.size() 334 return 335 except OSError as e: 336 if e.errno == errno.ENOMEM: 337 # Reduce max slots to current - 4, cap cache and retry 338 with self._cache_lock: 339 self._max_slots = max(4, len(self._cache) - 4) 340 elif e.errno == errno.ENOSPC: 341 # Reduce disk max space to current - 256 MiB, cap cache and retry 342 with self._cache_lock: 343 sm = sum(st.size() for st in self._cache.values()) 344 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 345 elif e.errno == errno.ENODEV: 346 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 347 except Exception as e: 348 pass 349 finally: 350 # Check if we should evict things from the cache. Either 351 # because we added a new thing or there was an error and 352 # we possibly adjusted the limits down, so we might need 353 # to push something out. 354 self.cap_cache() 355 356 try: 357 # Only gets here if there was an error the first time. The 358 # exception handler adjusts limits downward in some cases 359 # to free up resources, which would make the operation 360 # succeed. 361 if slot.set(blob): 362 self.cache_total += slot.size() 363 except Exception as e: 364 # It failed again. Give up. 365 slot.set(None) 366 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 367 368 self.cap_cache() 369 370class Counter(object): 371 def __init__(self, v=0): 372 self._lk = threading.Lock() 373 self._val = v 374 375 def add(self, v): 376 with self._lk: 377 self._val += v 378 379 def get(self): 380 with self._lk: 381 return self._val 382 383 384class KeepClient(object): 385 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 386 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 387 388 class KeepService(PyCurlHelper): 389 """Make requests to a single Keep service, and track results. 390 391 A KeepService is intended to last long enough to perform one 392 transaction (GET or PUT) against one Keep service. This can 393 involve calling either get() or put() multiple times in order 394 to retry after transient failures. However, calling both get() 395 and put() on a single instance -- or using the same instance 396 to access two different Keep services -- will not produce 397 sensible behavior. 398 """ 399 400 HTTP_ERRORS = ( 401 socket.error, 402 ssl.SSLError, 403 arvados.errors.HttpError, 404 ) 405 406 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 407 upload_counter=None, 408 download_counter=None, 409 headers={}, 410 insecure=False): 411 super(KeepClient.KeepService, self).__init__() 412 self.root = root 413 self._user_agent_pool = user_agent_pool 414 self._result = {'error': None} 415 self._usable = True 416 self._session = None 417 self._socket = None 418 self.get_headers = {'Accept': 'application/octet-stream'} 419 self.get_headers.update(headers) 420 self.put_headers = headers 421 self.upload_counter = upload_counter 422 self.download_counter = download_counter 423 self.insecure = insecure 424 425 def usable(self): 426 """Is it worth attempting a request?""" 427 return self._usable 428 429 def finished(self): 430 """Did the request succeed or encounter permanent failure?""" 431 return self._result['error'] == False or not self._usable 432 433 def last_result(self): 434 return self._result 435 436 def _get_user_agent(self): 437 try: 438 return self._user_agent_pool.get(block=False) 439 except queue.Empty: 440 return pycurl.Curl() 441 442 def _put_user_agent(self, ua): 443 try: 444 ua.reset() 445 self._user_agent_pool.put(ua, block=False) 446 except: 447 ua.close() 448 449 def get(self, locator, method="GET", timeout=None): 450 # locator is a KeepLocator object. 451 url = self.root + str(locator) 452 _logger.debug("Request: %s %s", method, url) 453 curl = self._get_user_agent() 454 ok = None 455 try: 456 with timer.Timer() as t: 457 self._headers = {} 458 response_body = BytesIO() 459 curl.setopt(pycurl.NOSIGNAL, 1) 460 curl.setopt(pycurl.OPENSOCKETFUNCTION, 461 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 462 curl.setopt(pycurl.URL, url.encode('utf-8')) 463 curl.setopt(pycurl.HTTPHEADER, [ 464 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 465 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 466 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 467 if self.insecure: 468 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 469 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 470 else: 471 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 472 if method == "HEAD": 473 curl.setopt(pycurl.NOBODY, True) 474 else: 475 curl.setopt(pycurl.HTTPGET, True) 476 self._setcurltimeouts(curl, timeout, method=="HEAD") 477 478 try: 479 curl.perform() 480 except Exception as e: 481 raise arvados.errors.HttpError(0, str(e)) 482 finally: 483 if self._socket: 484 self._socket.close() 485 self._socket = None 486 self._result = { 487 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 488 'body': response_body.getvalue(), 489 'headers': self._headers, 490 'error': False, 491 } 492 493 ok = retry.check_http_response_success(self._result['status_code']) 494 if not ok: 495 self._result['error'] = arvados.errors.HttpError( 496 self._result['status_code'], 497 self._headers.get('x-status-line', 'Error')) 498 except self.HTTP_ERRORS as e: 499 self._result = { 500 'error': e, 501 } 502 self._usable = ok != False 503 if self._result.get('status_code', None): 504 # The client worked well enough to get an HTTP status 505 # code, so presumably any problems are just on the 506 # server side and it's OK to reuse the client. 507 self._put_user_agent(curl) 508 else: 509 # Don't return this client to the pool, in case it's 510 # broken. 511 curl.close() 512 if not ok: 513 _logger.debug("Request fail: GET %s => %s: %s", 514 url, type(self._result['error']), str(self._result['error'])) 515 return None 516 if method == "HEAD": 517 _logger.info("HEAD %s: %s bytes", 518 self._result['status_code'], 519 self._result.get('content-length')) 520 if self._result['headers'].get('x-keep-locator'): 521 # This is a response to a remote block copy request, return 522 # the local copy block locator. 523 return self._result['headers'].get('x-keep-locator') 524 return True 525 526 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 527 self._result['status_code'], 528 len(self._result['body']), 529 t.msecs, 530 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 531 532 if self.download_counter: 533 self.download_counter.add(len(self._result['body'])) 534 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 535 if resp_md5 != locator.md5sum: 536 _logger.warning("Checksum fail: md5(%s) = %s", 537 url, resp_md5) 538 self._result['error'] = arvados.errors.HttpError( 539 0, 'Checksum fail') 540 return None 541 return self._result['body'] 542 543 def put(self, hash_s, body, timeout=None, headers={}): 544 put_headers = copy.copy(self.put_headers) 545 put_headers.update(headers) 546 url = self.root + hash_s 547 _logger.debug("Request: PUT %s", url) 548 curl = self._get_user_agent() 549 ok = None 550 try: 551 with timer.Timer() as t: 552 self._headers = {} 553 body_reader = BytesIO(body) 554 response_body = BytesIO() 555 curl.setopt(pycurl.NOSIGNAL, 1) 556 curl.setopt(pycurl.OPENSOCKETFUNCTION, 557 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 558 curl.setopt(pycurl.URL, url.encode('utf-8')) 559 # Using UPLOAD tells cURL to wait for a "go ahead" from the 560 # Keep server (in the form of a HTTP/1.1 "100 Continue" 561 # response) instead of sending the request body immediately. 562 # This allows the server to reject the request if the request 563 # is invalid or the server is read-only, without waiting for 564 # the client to send the entire block. 565 curl.setopt(pycurl.UPLOAD, True) 566 curl.setopt(pycurl.INFILESIZE, len(body)) 567 curl.setopt(pycurl.READFUNCTION, body_reader.read) 568 curl.setopt(pycurl.HTTPHEADER, [ 569 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 570 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 571 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 572 if self.insecure: 573 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 574 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 575 else: 576 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 577 self._setcurltimeouts(curl, timeout) 578 try: 579 curl.perform() 580 except Exception as e: 581 raise arvados.errors.HttpError(0, str(e)) 582 finally: 583 if self._socket: 584 self._socket.close() 585 self._socket = None 586 self._result = { 587 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 588 'body': response_body.getvalue().decode('utf-8'), 589 'headers': self._headers, 590 'error': False, 591 } 592 ok = retry.check_http_response_success(self._result['status_code']) 593 if not ok: 594 self._result['error'] = arvados.errors.HttpError( 595 self._result['status_code'], 596 self._headers.get('x-status-line', 'Error')) 597 except self.HTTP_ERRORS as e: 598 self._result = { 599 'error': e, 600 } 601 self._usable = ok != False # still usable if ok is True or None 602 if self._result.get('status_code', None): 603 # Client is functional. See comment in get(). 604 self._put_user_agent(curl) 605 else: 606 curl.close() 607 if not ok: 608 _logger.debug("Request fail: PUT %s => %s: %s", 609 url, type(self._result['error']), str(self._result['error'])) 610 return False 611 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 612 self._result['status_code'], 613 len(body), 614 t.msecs, 615 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 616 if self.upload_counter: 617 self.upload_counter.add(len(body)) 618 return True 619 620 621 class KeepWriterQueue(queue.Queue): 622 def __init__(self, copies, classes=[]): 623 queue.Queue.__init__(self) # Old-style superclass 624 self.wanted_copies = copies 625 self.wanted_storage_classes = classes 626 self.successful_copies = 0 627 self.confirmed_storage_classes = {} 628 self.response = None 629 self.storage_classes_tracking = True 630 self.queue_data_lock = threading.RLock() 631 self.pending_tries = max(copies, len(classes)) 632 self.pending_tries_notification = threading.Condition() 633 634 def write_success(self, response, replicas_nr, classes_confirmed): 635 with self.queue_data_lock: 636 self.successful_copies += replicas_nr 637 if classes_confirmed is None: 638 self.storage_classes_tracking = False 639 elif self.storage_classes_tracking: 640 for st_class, st_copies in classes_confirmed.items(): 641 try: 642 self.confirmed_storage_classes[st_class] += st_copies 643 except KeyError: 644 self.confirmed_storage_classes[st_class] = st_copies 645 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 646 self.response = response 647 with self.pending_tries_notification: 648 self.pending_tries_notification.notify_all() 649 650 def write_fail(self, ks): 651 with self.pending_tries_notification: 652 self.pending_tries += 1 653 self.pending_tries_notification.notify() 654 655 def pending_copies(self): 656 with self.queue_data_lock: 657 return self.wanted_copies - self.successful_copies 658 659 def satisfied_classes(self): 660 with self.queue_data_lock: 661 if not self.storage_classes_tracking: 662 # Notifies disabled storage classes expectation to 663 # the outer loop. 664 return None 665 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 666 667 def pending_classes(self): 668 with self.queue_data_lock: 669 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 670 return [] 671 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 672 for st_class, st_copies in self.confirmed_storage_classes.items(): 673 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 674 unsatisfied_classes.remove(st_class) 675 return unsatisfied_classes 676 677 def get_next_task(self): 678 with self.pending_tries_notification: 679 while True: 680 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 681 # This notify_all() is unnecessary -- 682 # write_success() already called notify_all() 683 # when pending<1 became true, so it's not 684 # possible for any other thread to be in 685 # wait() now -- but it's cheap insurance 686 # against deadlock so we do it anyway: 687 self.pending_tries_notification.notify_all() 688 # Drain the queue and then raise Queue.Empty 689 while True: 690 self.get_nowait() 691 self.task_done() 692 elif self.pending_tries > 0: 693 service, service_root = self.get_nowait() 694 if service.finished(): 695 self.task_done() 696 continue 697 self.pending_tries -= 1 698 return service, service_root 699 elif self.empty(): 700 self.pending_tries_notification.notify_all() 701 raise queue.Empty 702 else: 703 self.pending_tries_notification.wait() 704 705 706 class KeepWriterThreadPool(object): 707 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 708 self.total_task_nr = 0 709 if (not max_service_replicas) or (max_service_replicas >= copies): 710 num_threads = 1 711 else: 712 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 713 _logger.debug("Pool max threads is %d", num_threads) 714 self.workers = [] 715 self.queue = KeepClient.KeepWriterQueue(copies, classes) 716 # Create workers 717 for _ in range(num_threads): 718 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 719 self.workers.append(w) 720 721 def add_task(self, ks, service_root): 722 self.queue.put((ks, service_root)) 723 self.total_task_nr += 1 724 725 def done(self): 726 return self.queue.successful_copies, self.queue.satisfied_classes() 727 728 def join(self): 729 # Start workers 730 for worker in self.workers: 731 worker.start() 732 # Wait for finished work 733 self.queue.join() 734 735 def response(self): 736 return self.queue.response 737 738 739 class KeepWriterThread(threading.Thread): 740 class TaskFailed(RuntimeError): pass 741 742 def __init__(self, queue, data, data_hash, timeout=None): 743 super(KeepClient.KeepWriterThread, self).__init__() 744 self.timeout = timeout 745 self.queue = queue 746 self.data = data 747 self.data_hash = data_hash 748 self.daemon = True 749 750 def run(self): 751 while True: 752 try: 753 service, service_root = self.queue.get_next_task() 754 except queue.Empty: 755 return 756 try: 757 locator, copies, classes = self.do_task(service, service_root) 758 except Exception as e: 759 if not isinstance(e, self.TaskFailed): 760 _logger.exception("Exception in KeepWriterThread") 761 self.queue.write_fail(service) 762 else: 763 self.queue.write_success(locator, copies, classes) 764 finally: 765 self.queue.task_done() 766 767 def do_task(self, service, service_root): 768 classes = self.queue.pending_classes() 769 headers = {} 770 if len(classes) > 0: 771 classes.sort() 772 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 773 success = bool(service.put(self.data_hash, 774 self.data, 775 timeout=self.timeout, 776 headers=headers)) 777 result = service.last_result() 778 779 if not success: 780 if result.get('status_code'): 781 _logger.debug("Request fail: PUT %s => %s %s", 782 self.data_hash, 783 result.get('status_code'), 784 result.get('body')) 785 raise self.TaskFailed() 786 787 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 788 str(threading.current_thread()), 789 self.data_hash, 790 len(self.data), 791 service_root) 792 try: 793 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 794 except (KeyError, ValueError): 795 replicas_stored = 1 796 797 classes_confirmed = {} 798 try: 799 scch = result['headers']['x-keep-storage-classes-confirmed'] 800 for confirmation in scch.replace(' ', '').split(','): 801 if '=' in confirmation: 802 stored_class, stored_copies = confirmation.split('=')[:2] 803 classes_confirmed[stored_class] = int(stored_copies) 804 except (KeyError, ValueError): 805 # Storage classes confirmed header missing or corrupt 806 classes_confirmed = None 807 808 return result['body'].strip(), replicas_stored, classes_confirmed 809 810 811 def __init__(self, api_client=None, proxy=None, 812 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 813 api_token=None, local_store=None, block_cache=None, 814 num_retries=10, session=None, num_prefetch_threads=None): 815 """Initialize a new KeepClient. 816 817 Arguments: 818 :api_client: 819 The API client to use to find Keep services. If not 820 provided, KeepClient will build one from available Arvados 821 configuration. 822 823 :proxy: 824 If specified, this KeepClient will send requests to this Keep 825 proxy. Otherwise, KeepClient will fall back to the setting of the 826 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 827 If you want to KeepClient does not use a proxy, pass in an empty 828 string. 829 830 :timeout: 831 The initial timeout (in seconds) for HTTP requests to Keep 832 non-proxy servers. A tuple of three floats is interpreted as 833 (connection_timeout, read_timeout, minimum_bandwidth). A connection 834 will be aborted if the average traffic rate falls below 835 minimum_bandwidth bytes per second over an interval of read_timeout 836 seconds. Because timeouts are often a result of transient server 837 load, the actual connection timeout will be increased by a factor 838 of two on each retry. 839 Default: (2, 256, 32768). 840 841 :proxy_timeout: 842 The initial timeout (in seconds) for HTTP requests to 843 Keep proxies. A tuple of three floats is interpreted as 844 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 845 described above for adjusting connection timeouts on retry also 846 applies. 847 Default: (20, 256, 32768). 848 849 :api_token: 850 If you're not using an API client, but only talking 851 directly to a Keep proxy, this parameter specifies an API token 852 to authenticate Keep requests. It is an error to specify both 853 api_client and api_token. If you specify neither, KeepClient 854 will use one available from the Arvados configuration. 855 856 :local_store: 857 If specified, this KeepClient will bypass Keep 858 services, and save data to the named directory. If unspecified, 859 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 860 environment variable. If you want to ensure KeepClient does not 861 use local storage, pass in an empty string. This is primarily 862 intended to mock a server for testing. 863 864 :num_retries: 865 The default number of times to retry failed requests. 866 This will be used as the default num_retries value when get() and 867 put() are called. Default 10. 868 """ 869 self.lock = threading.Lock() 870 if proxy is None: 871 if config.get('ARVADOS_KEEP_SERVICES'): 872 proxy = config.get('ARVADOS_KEEP_SERVICES') 873 else: 874 proxy = config.get('ARVADOS_KEEP_PROXY') 875 if api_token is None: 876 if api_client is None: 877 api_token = config.get('ARVADOS_API_TOKEN') 878 else: 879 api_token = api_client.api_token 880 elif api_client is not None: 881 raise ValueError( 882 "can't build KeepClient with both API client and token") 883 if local_store is None: 884 local_store = os.environ.get('KEEP_LOCAL_STORE') 885 886 if api_client is None: 887 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 888 else: 889 self.insecure = api_client.insecure 890 891 self.block_cache = block_cache if block_cache else KeepBlockCache() 892 self.timeout = timeout 893 self.proxy_timeout = proxy_timeout 894 self._user_agent_pool = queue.LifoQueue() 895 self.upload_counter = Counter() 896 self.download_counter = Counter() 897 self.put_counter = Counter() 898 self.get_counter = Counter() 899 self.hits_counter = Counter() 900 self.misses_counter = Counter() 901 self._storage_classes_unsupported_warning = False 902 self._default_classes = [] 903 if num_prefetch_threads is not None: 904 self.num_prefetch_threads = num_prefetch_threads 905 else: 906 self.num_prefetch_threads = 2 907 self._prefetch_queue = None 908 self._prefetch_threads = None 909 910 if local_store: 911 self.local_store = local_store 912 self.head = self.local_store_head 913 self.get = self.local_store_get 914 self.put = self.local_store_put 915 else: 916 self.num_retries = num_retries 917 self.max_replicas_per_service = None 918 if proxy: 919 proxy_uris = proxy.split() 920 for i in range(len(proxy_uris)): 921 if not proxy_uris[i].endswith('/'): 922 proxy_uris[i] += '/' 923 # URL validation 924 url = urllib.parse.urlparse(proxy_uris[i]) 925 if not (url.scheme and url.netloc): 926 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 927 self.api_token = api_token 928 self._gateway_services = {} 929 self._keep_services = [{ 930 'uuid': "00000-bi6l4-%015d" % idx, 931 'service_type': 'proxy', 932 '_service_root': uri, 933 } for idx, uri in enumerate(proxy_uris)] 934 self._writable_services = self._keep_services 935 self.using_proxy = True 936 self._static_services_list = True 937 else: 938 # It's important to avoid instantiating an API client 939 # unless we actually need one, for testing's sake. 940 if api_client is None: 941 api_client = arvados.api('v1') 942 self.api_client = api_client 943 self.api_token = api_client.api_token 944 self._gateway_services = {} 945 self._keep_services = None 946 self._writable_services = None 947 self.using_proxy = None 948 self._static_services_list = False 949 try: 950 self._default_classes = [ 951 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 952 except KeyError: 953 # We're talking to an old cluster 954 pass 955 956 def current_timeout(self, attempt_number): 957 """Return the appropriate timeout to use for this client. 958 959 The proxy timeout setting if the backend service is currently a proxy, 960 the regular timeout setting otherwise. The `attempt_number` indicates 961 how many times the operation has been tried already (starting from 0 962 for the first try), and scales the connection timeout portion of the 963 return value accordingly. 964 965 """ 966 # TODO(twp): the timeout should be a property of a 967 # KeepService, not a KeepClient. See #4488. 968 t = self.proxy_timeout if self.using_proxy else self.timeout 969 if len(t) == 2: 970 return (t[0] * (1 << attempt_number), t[1]) 971 else: 972 return (t[0] * (1 << attempt_number), t[1], t[2]) 973 def _any_nondisk_services(self, service_list): 974 return any(ks.get('service_type', 'disk') != 'disk' 975 for ks in service_list) 976 977 def build_services_list(self, force_rebuild=False): 978 if (self._static_services_list or 979 (self._keep_services and not force_rebuild)): 980 return 981 with self.lock: 982 try: 983 keep_services = self.api_client.keep_services().accessible() 984 except Exception: # API server predates Keep services. 985 keep_services = self.api_client.keep_disks().list() 986 987 # Gateway services are only used when specified by UUID, 988 # so there's nothing to gain by filtering them by 989 # service_type. 990 self._gateway_services = {ks['uuid']: ks for ks in 991 keep_services.execute()['items']} 992 if not self._gateway_services: 993 raise arvados.errors.NoKeepServersError() 994 995 # Precompute the base URI for each service. 996 for r in self._gateway_services.values(): 997 host = r['service_host'] 998 if not host.startswith('[') and host.find(':') >= 0: 999 # IPv6 URIs must be formatted like http://[::1]:80/... 1000 host = '[' + host + ']' 1001 r['_service_root'] = "{}://{}:{:d}/".format( 1002 'https' if r['service_ssl_flag'] else 'http', 1003 host, 1004 r['service_port']) 1005 1006 _logger.debug(str(self._gateway_services)) 1007 self._keep_services = [ 1008 ks for ks in self._gateway_services.values() 1009 if not ks.get('service_type', '').startswith('gateway:')] 1010 self._writable_services = [ks for ks in self._keep_services 1011 if not ks.get('read_only')] 1012 1013 # For disk type services, max_replicas_per_service is 1 1014 # It is unknown (unlimited) for other service types. 1015 if self._any_nondisk_services(self._writable_services): 1016 self.max_replicas_per_service = None 1017 else: 1018 self.max_replicas_per_service = 1 1019 1020 def _service_weight(self, data_hash, service_uuid): 1021 """Compute the weight of a Keep service endpoint for a data 1022 block with a known hash. 1023 1024 The weight is md5(h + u) where u is the last 15 characters of 1025 the service endpoint's UUID. 1026 """ 1027 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1028 1029 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1030 """Return an array of Keep service endpoints, in the order in 1031 which they should be probed when reading or writing data with 1032 the given hash+hints. 1033 """ 1034 self.build_services_list(force_rebuild) 1035 1036 sorted_roots = [] 1037 # Use the services indicated by the given +K@... remote 1038 # service hints, if any are present and can be resolved to a 1039 # URI. 1040 for hint in locator.hints: 1041 if hint.startswith('K@'): 1042 if len(hint) == 7: 1043 sorted_roots.append( 1044 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1045 elif len(hint) == 29: 1046 svc = self._gateway_services.get(hint[2:]) 1047 if svc: 1048 sorted_roots.append(svc['_service_root']) 1049 1050 # Sort the available local services by weight (heaviest first) 1051 # for this locator, and return their service_roots (base URIs) 1052 # in that order. 1053 use_services = self._keep_services 1054 if need_writable: 1055 use_services = self._writable_services 1056 self.using_proxy = self._any_nondisk_services(use_services) 1057 sorted_roots.extend([ 1058 svc['_service_root'] for svc in sorted( 1059 use_services, 1060 reverse=True, 1061 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1062 _logger.debug("{}: {}".format(locator, sorted_roots)) 1063 return sorted_roots 1064 1065 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1066 # roots_map is a dictionary, mapping Keep service root strings 1067 # to KeepService objects. Poll for Keep services, and add any 1068 # new ones to roots_map. Return the current list of local 1069 # root strings. 1070 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 1071 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1072 for root in local_roots: 1073 if root not in roots_map: 1074 roots_map[root] = self.KeepService( 1075 root, self._user_agent_pool, 1076 upload_counter=self.upload_counter, 1077 download_counter=self.download_counter, 1078 headers=headers, 1079 insecure=self.insecure) 1080 return local_roots 1081 1082 @staticmethod 1083 def _check_loop_result(result): 1084 # KeepClient RetryLoops should save results as a 2-tuple: the 1085 # actual result of the request, and the number of servers available 1086 # to receive the request this round. 1087 # This method returns True if there's a real result, False if 1088 # there are no more servers available, otherwise None. 1089 if isinstance(result, Exception): 1090 return None 1091 result, tried_server_count = result 1092 if (result is not None) and (result is not False): 1093 return True 1094 elif tried_server_count < 1: 1095 _logger.info("No more Keep services to try; giving up") 1096 return False 1097 else: 1098 return None 1099 1100 def get_from_cache(self, loc_s): 1101 """Fetch a block only if is in the cache, otherwise return None.""" 1102 locator = KeepLocator(loc_s) 1103 slot = self.block_cache.get(locator.md5sum) 1104 if slot is not None and slot.ready.is_set(): 1105 return slot.get() 1106 else: 1107 return None 1108 1109 def refresh_signature(self, loc): 1110 """Ask Keep to get the remote block and return its local signature""" 1111 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1112 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1113 1114 @retry.retry_method 1115 def head(self, loc_s, **kwargs): 1116 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1117 1118 @retry.retry_method 1119 def get(self, loc_s, **kwargs): 1120 return self._get_or_head(loc_s, method="GET", **kwargs) 1121 1122 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1123 """Get data from Keep. 1124 1125 This method fetches one or more blocks of data from Keep. It 1126 sends a request each Keep service registered with the API 1127 server (or the proxy provided when this client was 1128 instantiated), then each service named in location hints, in 1129 sequence. As soon as one service provides the data, it's 1130 returned. 1131 1132 Arguments: 1133 * loc_s: A string of one or more comma-separated locators to fetch. 1134 This method returns the concatenation of these blocks. 1135 * num_retries: The number of times to retry GET requests to 1136 *each* Keep server if it returns temporary failures, with 1137 exponential backoff. Note that, in each loop, the method may try 1138 to fetch data from every available Keep service, along with any 1139 that are named in location hints in the locator. The default value 1140 is set when the KeepClient is initialized. 1141 """ 1142 if ',' in loc_s: 1143 return ''.join(self.get(x) for x in loc_s.split(',')) 1144 1145 self.get_counter.add(1) 1146 1147 request_id = (request_id or 1148 (hasattr(self, 'api_client') and self.api_client.request_id) or 1149 arvados.util.new_request_id()) 1150 if headers is None: 1151 headers = {} 1152 headers['X-Request-Id'] = request_id 1153 1154 slot = None 1155 blob = None 1156 try: 1157 locator = KeepLocator(loc_s) 1158 if method == "GET": 1159 while slot is None: 1160 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1161 if first: 1162 # Fresh and empty "first time it is used" slot 1163 break 1164 if prefetch: 1165 # this is request for a prefetch to fill in 1166 # the cache, don't need to wait for the 1167 # result, so if it is already in flight return 1168 # immediately. Clear 'slot' to prevent 1169 # finally block from calling slot.set() 1170 if slot.ready.is_set(): 1171 slot.get() 1172 slot = None 1173 return None 1174 1175 blob = slot.get() 1176 if blob is not None: 1177 self.hits_counter.add(1) 1178 return blob 1179 1180 # If blob is None, this means either 1181 # 1182 # (a) another thread was fetching this block and 1183 # failed with an error or 1184 # 1185 # (b) cache thrashing caused the slot to be 1186 # evicted (content set to None) by another thread 1187 # between the call to reserve_cache() and get(). 1188 # 1189 # We'll handle these cases by reserving a new slot 1190 # and then doing a full GET request. 1191 slot = None 1192 1193 self.misses_counter.add(1) 1194 1195 # If the locator has hints specifying a prefix (indicating a 1196 # remote keepproxy) or the UUID of a local gateway service, 1197 # read data from the indicated service(s) instead of the usual 1198 # list of local disk services. 1199 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1200 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1201 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1202 for hint in locator.hints if ( 1203 hint.startswith('K@') and 1204 len(hint) == 29 and 1205 self._gateway_services.get(hint[2:]) 1206 )]) 1207 # Map root URLs to their KeepService objects. 1208 roots_map = { 1209 root: self.KeepService(root, self._user_agent_pool, 1210 upload_counter=self.upload_counter, 1211 download_counter=self.download_counter, 1212 headers=headers, 1213 insecure=self.insecure) 1214 for root in hint_roots 1215 } 1216 1217 # See #3147 for a discussion of the loop implementation. Highlights: 1218 # * Refresh the list of Keep services after each failure, in case 1219 # it's being updated. 1220 # * Retry until we succeed, we're out of retries, or every available 1221 # service has returned permanent failure. 1222 sorted_roots = [] 1223 roots_map = {} 1224 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1225 backoff_start=2) 1226 for tries_left in loop: 1227 try: 1228 sorted_roots = self.map_new_services( 1229 roots_map, locator, 1230 force_rebuild=(tries_left < num_retries), 1231 need_writable=False, 1232 headers=headers) 1233 except Exception as error: 1234 loop.save_result(error) 1235 continue 1236 1237 # Query KeepService objects that haven't returned 1238 # permanent failure, in our specified shuffle order. 1239 services_to_try = [roots_map[root] 1240 for root in sorted_roots 1241 if roots_map[root].usable()] 1242 for keep_service in services_to_try: 1243 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1244 if blob is not None: 1245 break 1246 loop.save_result((blob, len(services_to_try))) 1247 1248 # Always cache the result, then return it if we succeeded. 1249 if loop.success(): 1250 return blob 1251 finally: 1252 if slot is not None: 1253 self.block_cache.set(slot, blob) 1254 1255 # Q: Including 403 is necessary for the Keep tests to continue 1256 # passing, but maybe they should expect KeepReadError instead? 1257 not_founds = sum(1 for key in sorted_roots 1258 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1259 service_errors = ((key, roots_map[key].last_result()['error']) 1260 for key in sorted_roots) 1261 if not roots_map: 1262 raise arvados.errors.KeepReadError( 1263 "[{}] failed to read {}: no Keep services available ({})".format( 1264 request_id, loc_s, loop.last_result())) 1265 elif not_founds == len(sorted_roots): 1266 raise arvados.errors.NotFoundError( 1267 "[{}] {} not found".format(request_id, loc_s), service_errors) 1268 else: 1269 raise arvados.errors.KeepReadError( 1270 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1271 1272 @retry.retry_method 1273 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1274 """Save data in Keep. 1275 1276 This method will get a list of Keep services from the API server, and 1277 send the data to each one simultaneously in a new thread. Once the 1278 uploads are finished, if enough copies are saved, this method returns 1279 the most recent HTTP response body. If requests fail to upload 1280 enough copies, this method raises KeepWriteError. 1281 1282 Arguments: 1283 * data: The string of data to upload. 1284 * copies: The number of copies that the user requires be saved. 1285 Default 2. 1286 * num_retries: The number of times to retry PUT requests to 1287 *each* Keep server if it returns temporary failures, with 1288 exponential backoff. The default value is set when the 1289 KeepClient is initialized. 1290 * classes: An optional list of storage class names where copies should 1291 be written. 1292 """ 1293 1294 classes = classes or self._default_classes 1295 1296 if not isinstance(data, bytes): 1297 data = data.encode() 1298 1299 self.put_counter.add(1) 1300 1301 data_hash = hashlib.md5(data).hexdigest() 1302 loc_s = data_hash + '+' + str(len(data)) 1303 if copies < 1: 1304 return loc_s 1305 locator = KeepLocator(loc_s) 1306 1307 request_id = (request_id or 1308 (hasattr(self, 'api_client') and self.api_client.request_id) or 1309 arvados.util.new_request_id()) 1310 headers = { 1311 'X-Request-Id': request_id, 1312 'X-Keep-Desired-Replicas': str(copies), 1313 } 1314 roots_map = {} 1315 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1316 backoff_start=2) 1317 done_copies = 0 1318 done_classes = [] 1319 for tries_left in loop: 1320 try: 1321 sorted_roots = self.map_new_services( 1322 roots_map, locator, 1323 force_rebuild=(tries_left < num_retries), 1324 need_writable=True, 1325 headers=headers) 1326 except Exception as error: 1327 loop.save_result(error) 1328 continue 1329 1330 pending_classes = [] 1331 if done_classes is not None: 1332 pending_classes = list(set(classes) - set(done_classes)) 1333 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1334 data_hash=data_hash, 1335 copies=copies - done_copies, 1336 max_service_replicas=self.max_replicas_per_service, 1337 timeout=self.current_timeout(num_retries - tries_left), 1338 classes=pending_classes) 1339 for service_root, ks in [(root, roots_map[root]) 1340 for root in sorted_roots]: 1341 if ks.finished(): 1342 continue 1343 writer_pool.add_task(ks, service_root) 1344 writer_pool.join() 1345 pool_copies, pool_classes = writer_pool.done() 1346 done_copies += pool_copies 1347 if (done_classes is not None) and (pool_classes is not None): 1348 done_classes += pool_classes 1349 loop.save_result( 1350 (done_copies >= copies and set(done_classes) == set(classes), 1351 writer_pool.total_task_nr)) 1352 else: 1353 # Old keepstore contacted without storage classes support: 1354 # success is determined only by successful copies. 1355 # 1356 # Disable storage classes tracking from this point forward. 1357 if not self._storage_classes_unsupported_warning: 1358 self._storage_classes_unsupported_warning = True 1359 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1360 done_classes = None 1361 loop.save_result( 1362 (done_copies >= copies, writer_pool.total_task_nr)) 1363 1364 if loop.success(): 1365 return writer_pool.response() 1366 if not roots_map: 1367 raise arvados.errors.KeepWriteError( 1368 "[{}] failed to write {}: no Keep services available ({})".format( 1369 request_id, data_hash, loop.last_result())) 1370 else: 1371 service_errors = ((key, roots_map[key].last_result()['error']) 1372 for key in sorted_roots 1373 if roots_map[key].last_result()['error']) 1374 raise arvados.errors.KeepWriteError( 1375 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1376 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1377 1378 def _block_prefetch_worker(self): 1379 """The background downloader thread.""" 1380 while True: 1381 try: 1382 b = self._prefetch_queue.get() 1383 if b is None: 1384 return 1385 self.get(b, prefetch=True) 1386 except Exception: 1387 _logger.exception("Exception doing block prefetch") 1388 1389 def _start_prefetch_threads(self): 1390 if self._prefetch_threads is None: 1391 with self.lock: 1392 if self._prefetch_threads is not None: 1393 return 1394 self._prefetch_queue = queue.Queue() 1395 self._prefetch_threads = [] 1396 for i in range(0, self.num_prefetch_threads): 1397 thread = threading.Thread(target=self._block_prefetch_worker) 1398 self._prefetch_threads.append(thread) 1399 thread.daemon = True 1400 thread.start() 1401 1402 def block_prefetch(self, locator): 1403 """ 1404 This relies on the fact that KeepClient implements a block cache, 1405 so repeated requests for the same block will not result in repeated 1406 downloads (unless the block is evicted from the cache.) This method 1407 does not block. 1408 """ 1409 1410 if self.block_cache.get(locator) is not None: 1411 return 1412 1413 self._start_prefetch_threads() 1414 self._prefetch_queue.put(locator) 1415 1416 def stop_prefetch_threads(self): 1417 with self.lock: 1418 if self._prefetch_threads is not None: 1419 for t in self._prefetch_threads: 1420 self._prefetch_queue.put(None) 1421 for t in self._prefetch_threads: 1422 t.join() 1423 self._prefetch_threads = None 1424 self._prefetch_queue = None 1425 1426 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1427 """A stub for put(). 1428 1429 This method is used in place of the real put() method when 1430 using local storage (see constructor's local_store argument). 1431 1432 copies and num_retries arguments are ignored: they are here 1433 only for the sake of offering the same call signature as 1434 put(). 1435 1436 Data stored this way can be retrieved via local_store_get(). 1437 """ 1438 md5 = hashlib.md5(data).hexdigest() 1439 locator = '%s+%d' % (md5, len(data)) 1440 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1441 f.write(data) 1442 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1443 os.path.join(self.local_store, md5)) 1444 return locator 1445 1446 def local_store_get(self, loc_s, num_retries=None): 1447 """Companion to local_store_put().""" 1448 try: 1449 locator = KeepLocator(loc_s) 1450 except ValueError: 1451 raise arvados.errors.NotFoundError( 1452 "Invalid data locator: '%s'" % loc_s) 1453 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1454 return b'' 1455 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1456 return f.read() 1457 1458 def local_store_head(self, loc_s, num_retries=None): 1459 """Companion to local_store_put().""" 1460 try: 1461 locator = KeepLocator(loc_s) 1462 except ValueError: 1463 raise arvados.errors.NotFoundError( 1464 "Invalid data locator: '%s'" % loc_s) 1465 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1466 return True 1467 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1468 return True
51class KeepLocator(object): 52 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 53 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 54 55 def __init__(self, locator_str): 56 self.hints = [] 57 self._perm_sig = None 58 self._perm_expiry = None 59 pieces = iter(locator_str.split('+')) 60 self.md5sum = next(pieces) 61 try: 62 self.size = int(next(pieces)) 63 except StopIteration: 64 self.size = None 65 for hint in pieces: 66 if self.HINT_RE.match(hint) is None: 67 raise ValueError("invalid hint format: {}".format(hint)) 68 elif hint.startswith('A'): 69 self.parse_permission_hint(hint) 70 else: 71 self.hints.append(hint) 72 73 def __str__(self): 74 return '+'.join( 75 str(s) 76 for s in [self.md5sum, self.size, 77 self.permission_hint()] + self.hints 78 if s is not None) 79 80 def stripped(self): 81 if self.size is not None: 82 return "%s+%i" % (self.md5sum, self.size) 83 else: 84 return self.md5sum 85 86 def _make_hex_prop(name, length): 87 # Build and return a new property with the given name that 88 # must be a hex string of the given length. 89 data_name = '_{}'.format(name) 90 def getter(self): 91 return getattr(self, data_name) 92 def setter(self, hex_str): 93 if not arvados.util.is_hex(hex_str, length): 94 raise ValueError("{} is not a {}-digit hex string: {!r}". 95 format(name, length, hex_str)) 96 setattr(self, data_name, hex_str) 97 return property(getter, setter) 98 99 md5sum = _make_hex_prop('md5sum', 32) 100 perm_sig = _make_hex_prop('perm_sig', 40) 101 102 @property 103 def perm_expiry(self): 104 return self._perm_expiry 105 106 @perm_expiry.setter 107 def perm_expiry(self, value): 108 if not arvados.util.is_hex(value, 1, 8): 109 raise ValueError( 110 "permission timestamp must be a hex Unix timestamp: {}". 111 format(value)) 112 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16)) 113 114 def permission_hint(self): 115 data = [self.perm_sig, self.perm_expiry] 116 if None in data: 117 return None 118 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 119 return "A{}@{:08x}".format(*data) 120 121 def parse_permission_hint(self, s): 122 try: 123 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 124 except IndexError: 125 raise ValueError("bad permission hint {}".format(s)) 126 127 def permission_expired(self, as_of_dt=None): 128 if self.perm_expiry is None: 129 return False 130 elif as_of_dt is None: 131 as_of_dt = datetime.datetime.now() 132 return self.perm_expiry <= as_of_dt
55 def __init__(self, locator_str): 56 self.hints = [] 57 self._perm_sig = None 58 self._perm_expiry = None 59 pieces = iter(locator_str.split('+')) 60 self.md5sum = next(pieces) 61 try: 62 self.size = int(next(pieces)) 63 except StopIteration: 64 self.size = None 65 for hint in pieces: 66 if self.HINT_RE.match(hint) is None: 67 raise ValueError("invalid hint format: {}".format(hint)) 68 elif hint.startswith('A'): 69 self.parse_permission_hint(hint) 70 else: 71 self.hints.append(hint)
135class Keep(object): 136 """Simple interface to a global KeepClient object. 137 138 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your 139 own API client. The global KeepClient will build an API client from the 140 current Arvados configuration, which may not match the one you built. 141 """ 142 _last_key = None 143 144 @classmethod 145 def global_client_object(cls): 146 global global_client_object 147 # Previously, KeepClient would change its behavior at runtime based 148 # on these configuration settings. We simulate that behavior here 149 # by checking the values and returning a new KeepClient if any of 150 # them have changed. 151 key = (config.get('ARVADOS_API_HOST'), 152 config.get('ARVADOS_API_TOKEN'), 153 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 154 config.get('ARVADOS_KEEP_PROXY'), 155 os.environ.get('KEEP_LOCAL_STORE')) 156 if (global_client_object is None) or (cls._last_key != key): 157 global_client_object = KeepClient() 158 cls._last_key = key 159 return global_client_object 160 161 @staticmethod 162 def get(locator, **kwargs): 163 return Keep.global_client_object().get(locator, **kwargs) 164 165 @staticmethod 166 def put(data, **kwargs): 167 return Keep.global_client_object().put(data, **kwargs)
Simple interface to a global KeepClient object.
THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your own API client. The global KeepClient will build an API client from the current Arvados configuration, which may not match the one you built.
144 @classmethod 145 def global_client_object(cls): 146 global global_client_object 147 # Previously, KeepClient would change its behavior at runtime based 148 # on these configuration settings. We simulate that behavior here 149 # by checking the values and returning a new KeepClient if any of 150 # them have changed. 151 key = (config.get('ARVADOS_API_HOST'), 152 config.get('ARVADOS_API_TOKEN'), 153 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 154 config.get('ARVADOS_KEEP_PROXY'), 155 os.environ.get('KEEP_LOCAL_STORE')) 156 if (global_client_object is None) or (cls._last_key != key): 157 global_client_object = KeepClient() 158 cls._last_key = key 159 return global_client_object
169class KeepBlockCache(object): 170 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 171 self.cache_max = cache_max 172 self._cache = collections.OrderedDict() 173 self._cache_lock = threading.Lock() 174 self._max_slots = max_slots 175 self._disk_cache = disk_cache 176 self._disk_cache_dir = disk_cache_dir 177 self._cache_updating = threading.Condition(self._cache_lock) 178 179 if self._disk_cache and self._disk_cache_dir is None: 180 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") 181 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True) 182 183 if self._max_slots == 0: 184 if self._disk_cache: 185 # Each block uses two file descriptors, one used to 186 # open it initially and hold the flock(), and a second 187 # hidden one used by mmap(). 188 # 189 # Set max slots to 1/8 of maximum file handles. This 190 # means we'll use at most 1/4 of total file handles. 191 # 192 # NOFILE typically defaults to 1024 on Linux so this 193 # is 128 slots (256 file handles), which means we can 194 # cache up to 8 GiB of 64 MiB blocks. This leaves 195 # 768 file handles for sockets and other stuff. 196 # 197 # When we want the ability to have more cache (e.g. in 198 # arv-mount) we'll increase rlimit before calling 199 # this. 200 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 201 else: 202 # RAM cache slots 203 self._max_slots = 512 204 205 if self.cache_max == 0: 206 if self._disk_cache: 207 fs = os.statvfs(self._disk_cache_dir) 208 # Calculation of available space incorporates existing cache usage 209 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 210 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 211 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 212 # pick smallest of: 213 # 10% of total disk size 214 # 25% of available space 215 # max_slots * 64 MiB 216 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 217 else: 218 # 256 MiB in RAM 219 self.cache_max = (256 * 1024 * 1024) 220 221 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 222 223 self.cache_total = 0 224 if self._disk_cache: 225 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 226 for slot in self._cache.values(): 227 self.cache_total += slot.size() 228 self.cap_cache() 229 230 class CacheSlot(object): 231 __slots__ = ("locator", "ready", "content") 232 233 def __init__(self, locator): 234 self.locator = locator 235 self.ready = threading.Event() 236 self.content = None 237 238 def get(self): 239 self.ready.wait() 240 return self.content 241 242 def set(self, value): 243 if self.content is not None: 244 return False 245 self.content = value 246 self.ready.set() 247 return True 248 249 def size(self): 250 if self.content is None: 251 return 0 252 else: 253 return len(self.content) 254 255 def evict(self): 256 self.content = None 257 258 259 def _resize_cache(self, cache_max, max_slots): 260 # Try and make sure the contents of the cache do not exceed 261 # the supplied maximums. 262 263 if self.cache_total <= cache_max and len(self._cache) <= max_slots: 264 return 265 266 _evict_candidates = collections.deque(self._cache.values()) 267 while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots): 268 slot = _evict_candidates.popleft() 269 if not slot.ready.is_set(): 270 continue 271 272 sz = slot.size() 273 slot.evict() 274 self.cache_total -= sz 275 del self._cache[slot.locator] 276 277 278 def cap_cache(self): 279 '''Cap the cache size to self.cache_max''' 280 with self._cache_updating: 281 self._resize_cache(self.cache_max, self._max_slots) 282 self._cache_updating.notify_all() 283 284 def _get(self, locator): 285 # Test if the locator is already in the cache 286 if locator in self._cache: 287 n = self._cache[locator] 288 if n.ready.is_set() and n.content is None: 289 del self._cache[n.locator] 290 return None 291 self._cache.move_to_end(locator) 292 return n 293 if self._disk_cache: 294 # see if it exists on disk 295 n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir) 296 if n is not None: 297 self._cache[n.locator] = n 298 self.cache_total += n.size() 299 return n 300 return None 301 302 def get(self, locator): 303 with self._cache_lock: 304 return self._get(locator) 305 306 def reserve_cache(self, locator): 307 '''Reserve a cache slot for the specified locator, 308 or return the existing slot.''' 309 with self._cache_updating: 310 n = self._get(locator) 311 if n: 312 return n, False 313 else: 314 # Add a new cache slot for the locator 315 self._resize_cache(self.cache_max, self._max_slots-1) 316 while len(self._cache) >= self._max_slots: 317 # If there isn't a slot available, need to wait 318 # for something to happen that releases one of the 319 # cache slots. Idle for 200 ms or woken up by 320 # another thread 321 self._cache_updating.wait(timeout=0.2) 322 self._resize_cache(self.cache_max, self._max_slots-1) 323 324 if self._disk_cache: 325 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 326 else: 327 n = KeepBlockCache.CacheSlot(locator) 328 self._cache[n.locator] = n 329 return n, True 330 331 def set(self, slot, blob): 332 try: 333 if slot.set(blob): 334 self.cache_total += slot.size() 335 return 336 except OSError as e: 337 if e.errno == errno.ENOMEM: 338 # Reduce max slots to current - 4, cap cache and retry 339 with self._cache_lock: 340 self._max_slots = max(4, len(self._cache) - 4) 341 elif e.errno == errno.ENOSPC: 342 # Reduce disk max space to current - 256 MiB, cap cache and retry 343 with self._cache_lock: 344 sm = sum(st.size() for st in self._cache.values()) 345 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 346 elif e.errno == errno.ENODEV: 347 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 348 except Exception as e: 349 pass 350 finally: 351 # Check if we should evict things from the cache. Either 352 # because we added a new thing or there was an error and 353 # we possibly adjusted the limits down, so we might need 354 # to push something out. 355 self.cap_cache() 356 357 try: 358 # Only gets here if there was an error the first time. The 359 # exception handler adjusts limits downward in some cases 360 # to free up resources, which would make the operation 361 # succeed. 362 if slot.set(blob): 363 self.cache_total += slot.size() 364 except Exception as e: 365 # It failed again. Give up. 366 slot.set(None) 367 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 368 369 self.cap_cache()
170 def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None): 171 self.cache_max = cache_max 172 self._cache = collections.OrderedDict() 173 self._cache_lock = threading.Lock() 174 self._max_slots = max_slots 175 self._disk_cache = disk_cache 176 self._disk_cache_dir = disk_cache_dir 177 self._cache_updating = threading.Condition(self._cache_lock) 178 179 if self._disk_cache and self._disk_cache_dir is None: 180 self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep") 181 os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True) 182 183 if self._max_slots == 0: 184 if self._disk_cache: 185 # Each block uses two file descriptors, one used to 186 # open it initially and hold the flock(), and a second 187 # hidden one used by mmap(). 188 # 189 # Set max slots to 1/8 of maximum file handles. This 190 # means we'll use at most 1/4 of total file handles. 191 # 192 # NOFILE typically defaults to 1024 on Linux so this 193 # is 128 slots (256 file handles), which means we can 194 # cache up to 8 GiB of 64 MiB blocks. This leaves 195 # 768 file handles for sockets and other stuff. 196 # 197 # When we want the ability to have more cache (e.g. in 198 # arv-mount) we'll increase rlimit before calling 199 # this. 200 self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) 201 else: 202 # RAM cache slots 203 self._max_slots = 512 204 205 if self.cache_max == 0: 206 if self._disk_cache: 207 fs = os.statvfs(self._disk_cache_dir) 208 # Calculation of available space incorporates existing cache usage 209 existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir) 210 avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4 211 maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10) 212 # pick smallest of: 213 # 10% of total disk size 214 # 25% of available space 215 # max_slots * 64 MiB 216 self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024)) 217 else: 218 # 256 MiB in RAM 219 self.cache_max = (256 * 1024 * 1024) 220 221 self.cache_max = max(self.cache_max, 64 * 1024 * 1024) 222 223 self.cache_total = 0 224 if self._disk_cache: 225 self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots) 226 for slot in self._cache.values(): 227 self.cache_total += slot.size() 228 self.cap_cache()
278 def cap_cache(self): 279 '''Cap the cache size to self.cache_max''' 280 with self._cache_updating: 281 self._resize_cache(self.cache_max, self._max_slots) 282 self._cache_updating.notify_all()
Cap the cache size to self.cache_max
306 def reserve_cache(self, locator): 307 '''Reserve a cache slot for the specified locator, 308 or return the existing slot.''' 309 with self._cache_updating: 310 n = self._get(locator) 311 if n: 312 return n, False 313 else: 314 # Add a new cache slot for the locator 315 self._resize_cache(self.cache_max, self._max_slots-1) 316 while len(self._cache) >= self._max_slots: 317 # If there isn't a slot available, need to wait 318 # for something to happen that releases one of the 319 # cache slots. Idle for 200 ms or woken up by 320 # another thread 321 self._cache_updating.wait(timeout=0.2) 322 self._resize_cache(self.cache_max, self._max_slots-1) 323 324 if self._disk_cache: 325 n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir) 326 else: 327 n = KeepBlockCache.CacheSlot(locator) 328 self._cache[n.locator] = n 329 return n, True
Reserve a cache slot for the specified locator, or return the existing slot.
331 def set(self, slot, blob): 332 try: 333 if slot.set(blob): 334 self.cache_total += slot.size() 335 return 336 except OSError as e: 337 if e.errno == errno.ENOMEM: 338 # Reduce max slots to current - 4, cap cache and retry 339 with self._cache_lock: 340 self._max_slots = max(4, len(self._cache) - 4) 341 elif e.errno == errno.ENOSPC: 342 # Reduce disk max space to current - 256 MiB, cap cache and retry 343 with self._cache_lock: 344 sm = sum(st.size() for st in self._cache.values()) 345 self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024)) 346 elif e.errno == errno.ENODEV: 347 _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.") 348 except Exception as e: 349 pass 350 finally: 351 # Check if we should evict things from the cache. Either 352 # because we added a new thing or there was an error and 353 # we possibly adjusted the limits down, so we might need 354 # to push something out. 355 self.cap_cache() 356 357 try: 358 # Only gets here if there was an error the first time. The 359 # exception handler adjusts limits downward in some cases 360 # to free up resources, which would make the operation 361 # succeed. 362 if slot.set(blob): 363 self.cache_total += slot.size() 364 except Exception as e: 365 # It failed again. Give up. 366 slot.set(None) 367 raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e)) 368 369 self.cap_cache()
230 class CacheSlot(object): 231 __slots__ = ("locator", "ready", "content") 232 233 def __init__(self, locator): 234 self.locator = locator 235 self.ready = threading.Event() 236 self.content = None 237 238 def get(self): 239 self.ready.wait() 240 return self.content 241 242 def set(self, value): 243 if self.content is not None: 244 return False 245 self.content = value 246 self.ready.set() 247 return True 248 249 def size(self): 250 if self.content is None: 251 return 0 252 else: 253 return len(self.content) 254 255 def evict(self): 256 self.content = None
385class KeepClient(object): 386 DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT 387 DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT 388 389 class KeepService(PyCurlHelper): 390 """Make requests to a single Keep service, and track results. 391 392 A KeepService is intended to last long enough to perform one 393 transaction (GET or PUT) against one Keep service. This can 394 involve calling either get() or put() multiple times in order 395 to retry after transient failures. However, calling both get() 396 and put() on a single instance -- or using the same instance 397 to access two different Keep services -- will not produce 398 sensible behavior. 399 """ 400 401 HTTP_ERRORS = ( 402 socket.error, 403 ssl.SSLError, 404 arvados.errors.HttpError, 405 ) 406 407 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 408 upload_counter=None, 409 download_counter=None, 410 headers={}, 411 insecure=False): 412 super(KeepClient.KeepService, self).__init__() 413 self.root = root 414 self._user_agent_pool = user_agent_pool 415 self._result = {'error': None} 416 self._usable = True 417 self._session = None 418 self._socket = None 419 self.get_headers = {'Accept': 'application/octet-stream'} 420 self.get_headers.update(headers) 421 self.put_headers = headers 422 self.upload_counter = upload_counter 423 self.download_counter = download_counter 424 self.insecure = insecure 425 426 def usable(self): 427 """Is it worth attempting a request?""" 428 return self._usable 429 430 def finished(self): 431 """Did the request succeed or encounter permanent failure?""" 432 return self._result['error'] == False or not self._usable 433 434 def last_result(self): 435 return self._result 436 437 def _get_user_agent(self): 438 try: 439 return self._user_agent_pool.get(block=False) 440 except queue.Empty: 441 return pycurl.Curl() 442 443 def _put_user_agent(self, ua): 444 try: 445 ua.reset() 446 self._user_agent_pool.put(ua, block=False) 447 except: 448 ua.close() 449 450 def get(self, locator, method="GET", timeout=None): 451 # locator is a KeepLocator object. 452 url = self.root + str(locator) 453 _logger.debug("Request: %s %s", method, url) 454 curl = self._get_user_agent() 455 ok = None 456 try: 457 with timer.Timer() as t: 458 self._headers = {} 459 response_body = BytesIO() 460 curl.setopt(pycurl.NOSIGNAL, 1) 461 curl.setopt(pycurl.OPENSOCKETFUNCTION, 462 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 463 curl.setopt(pycurl.URL, url.encode('utf-8')) 464 curl.setopt(pycurl.HTTPHEADER, [ 465 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 466 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 467 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 468 if self.insecure: 469 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 470 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 471 else: 472 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 473 if method == "HEAD": 474 curl.setopt(pycurl.NOBODY, True) 475 else: 476 curl.setopt(pycurl.HTTPGET, True) 477 self._setcurltimeouts(curl, timeout, method=="HEAD") 478 479 try: 480 curl.perform() 481 except Exception as e: 482 raise arvados.errors.HttpError(0, str(e)) 483 finally: 484 if self._socket: 485 self._socket.close() 486 self._socket = None 487 self._result = { 488 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 489 'body': response_body.getvalue(), 490 'headers': self._headers, 491 'error': False, 492 } 493 494 ok = retry.check_http_response_success(self._result['status_code']) 495 if not ok: 496 self._result['error'] = arvados.errors.HttpError( 497 self._result['status_code'], 498 self._headers.get('x-status-line', 'Error')) 499 except self.HTTP_ERRORS as e: 500 self._result = { 501 'error': e, 502 } 503 self._usable = ok != False 504 if self._result.get('status_code', None): 505 # The client worked well enough to get an HTTP status 506 # code, so presumably any problems are just on the 507 # server side and it's OK to reuse the client. 508 self._put_user_agent(curl) 509 else: 510 # Don't return this client to the pool, in case it's 511 # broken. 512 curl.close() 513 if not ok: 514 _logger.debug("Request fail: GET %s => %s: %s", 515 url, type(self._result['error']), str(self._result['error'])) 516 return None 517 if method == "HEAD": 518 _logger.info("HEAD %s: %s bytes", 519 self._result['status_code'], 520 self._result.get('content-length')) 521 if self._result['headers'].get('x-keep-locator'): 522 # This is a response to a remote block copy request, return 523 # the local copy block locator. 524 return self._result['headers'].get('x-keep-locator') 525 return True 526 527 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 528 self._result['status_code'], 529 len(self._result['body']), 530 t.msecs, 531 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 532 533 if self.download_counter: 534 self.download_counter.add(len(self._result['body'])) 535 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 536 if resp_md5 != locator.md5sum: 537 _logger.warning("Checksum fail: md5(%s) = %s", 538 url, resp_md5) 539 self._result['error'] = arvados.errors.HttpError( 540 0, 'Checksum fail') 541 return None 542 return self._result['body'] 543 544 def put(self, hash_s, body, timeout=None, headers={}): 545 put_headers = copy.copy(self.put_headers) 546 put_headers.update(headers) 547 url = self.root + hash_s 548 _logger.debug("Request: PUT %s", url) 549 curl = self._get_user_agent() 550 ok = None 551 try: 552 with timer.Timer() as t: 553 self._headers = {} 554 body_reader = BytesIO(body) 555 response_body = BytesIO() 556 curl.setopt(pycurl.NOSIGNAL, 1) 557 curl.setopt(pycurl.OPENSOCKETFUNCTION, 558 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 559 curl.setopt(pycurl.URL, url.encode('utf-8')) 560 # Using UPLOAD tells cURL to wait for a "go ahead" from the 561 # Keep server (in the form of a HTTP/1.1 "100 Continue" 562 # response) instead of sending the request body immediately. 563 # This allows the server to reject the request if the request 564 # is invalid or the server is read-only, without waiting for 565 # the client to send the entire block. 566 curl.setopt(pycurl.UPLOAD, True) 567 curl.setopt(pycurl.INFILESIZE, len(body)) 568 curl.setopt(pycurl.READFUNCTION, body_reader.read) 569 curl.setopt(pycurl.HTTPHEADER, [ 570 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 571 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 572 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 573 if self.insecure: 574 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 575 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 576 else: 577 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 578 self._setcurltimeouts(curl, timeout) 579 try: 580 curl.perform() 581 except Exception as e: 582 raise arvados.errors.HttpError(0, str(e)) 583 finally: 584 if self._socket: 585 self._socket.close() 586 self._socket = None 587 self._result = { 588 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 589 'body': response_body.getvalue().decode('utf-8'), 590 'headers': self._headers, 591 'error': False, 592 } 593 ok = retry.check_http_response_success(self._result['status_code']) 594 if not ok: 595 self._result['error'] = arvados.errors.HttpError( 596 self._result['status_code'], 597 self._headers.get('x-status-line', 'Error')) 598 except self.HTTP_ERRORS as e: 599 self._result = { 600 'error': e, 601 } 602 self._usable = ok != False # still usable if ok is True or None 603 if self._result.get('status_code', None): 604 # Client is functional. See comment in get(). 605 self._put_user_agent(curl) 606 else: 607 curl.close() 608 if not ok: 609 _logger.debug("Request fail: PUT %s => %s: %s", 610 url, type(self._result['error']), str(self._result['error'])) 611 return False 612 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 613 self._result['status_code'], 614 len(body), 615 t.msecs, 616 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 617 if self.upload_counter: 618 self.upload_counter.add(len(body)) 619 return True 620 621 622 class KeepWriterQueue(queue.Queue): 623 def __init__(self, copies, classes=[]): 624 queue.Queue.__init__(self) # Old-style superclass 625 self.wanted_copies = copies 626 self.wanted_storage_classes = classes 627 self.successful_copies = 0 628 self.confirmed_storage_classes = {} 629 self.response = None 630 self.storage_classes_tracking = True 631 self.queue_data_lock = threading.RLock() 632 self.pending_tries = max(copies, len(classes)) 633 self.pending_tries_notification = threading.Condition() 634 635 def write_success(self, response, replicas_nr, classes_confirmed): 636 with self.queue_data_lock: 637 self.successful_copies += replicas_nr 638 if classes_confirmed is None: 639 self.storage_classes_tracking = False 640 elif self.storage_classes_tracking: 641 for st_class, st_copies in classes_confirmed.items(): 642 try: 643 self.confirmed_storage_classes[st_class] += st_copies 644 except KeyError: 645 self.confirmed_storage_classes[st_class] = st_copies 646 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 647 self.response = response 648 with self.pending_tries_notification: 649 self.pending_tries_notification.notify_all() 650 651 def write_fail(self, ks): 652 with self.pending_tries_notification: 653 self.pending_tries += 1 654 self.pending_tries_notification.notify() 655 656 def pending_copies(self): 657 with self.queue_data_lock: 658 return self.wanted_copies - self.successful_copies 659 660 def satisfied_classes(self): 661 with self.queue_data_lock: 662 if not self.storage_classes_tracking: 663 # Notifies disabled storage classes expectation to 664 # the outer loop. 665 return None 666 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 667 668 def pending_classes(self): 669 with self.queue_data_lock: 670 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 671 return [] 672 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 673 for st_class, st_copies in self.confirmed_storage_classes.items(): 674 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 675 unsatisfied_classes.remove(st_class) 676 return unsatisfied_classes 677 678 def get_next_task(self): 679 with self.pending_tries_notification: 680 while True: 681 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 682 # This notify_all() is unnecessary -- 683 # write_success() already called notify_all() 684 # when pending<1 became true, so it's not 685 # possible for any other thread to be in 686 # wait() now -- but it's cheap insurance 687 # against deadlock so we do it anyway: 688 self.pending_tries_notification.notify_all() 689 # Drain the queue and then raise Queue.Empty 690 while True: 691 self.get_nowait() 692 self.task_done() 693 elif self.pending_tries > 0: 694 service, service_root = self.get_nowait() 695 if service.finished(): 696 self.task_done() 697 continue 698 self.pending_tries -= 1 699 return service, service_root 700 elif self.empty(): 701 self.pending_tries_notification.notify_all() 702 raise queue.Empty 703 else: 704 self.pending_tries_notification.wait() 705 706 707 class KeepWriterThreadPool(object): 708 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 709 self.total_task_nr = 0 710 if (not max_service_replicas) or (max_service_replicas >= copies): 711 num_threads = 1 712 else: 713 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 714 _logger.debug("Pool max threads is %d", num_threads) 715 self.workers = [] 716 self.queue = KeepClient.KeepWriterQueue(copies, classes) 717 # Create workers 718 for _ in range(num_threads): 719 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 720 self.workers.append(w) 721 722 def add_task(self, ks, service_root): 723 self.queue.put((ks, service_root)) 724 self.total_task_nr += 1 725 726 def done(self): 727 return self.queue.successful_copies, self.queue.satisfied_classes() 728 729 def join(self): 730 # Start workers 731 for worker in self.workers: 732 worker.start() 733 # Wait for finished work 734 self.queue.join() 735 736 def response(self): 737 return self.queue.response 738 739 740 class KeepWriterThread(threading.Thread): 741 class TaskFailed(RuntimeError): pass 742 743 def __init__(self, queue, data, data_hash, timeout=None): 744 super(KeepClient.KeepWriterThread, self).__init__() 745 self.timeout = timeout 746 self.queue = queue 747 self.data = data 748 self.data_hash = data_hash 749 self.daemon = True 750 751 def run(self): 752 while True: 753 try: 754 service, service_root = self.queue.get_next_task() 755 except queue.Empty: 756 return 757 try: 758 locator, copies, classes = self.do_task(service, service_root) 759 except Exception as e: 760 if not isinstance(e, self.TaskFailed): 761 _logger.exception("Exception in KeepWriterThread") 762 self.queue.write_fail(service) 763 else: 764 self.queue.write_success(locator, copies, classes) 765 finally: 766 self.queue.task_done() 767 768 def do_task(self, service, service_root): 769 classes = self.queue.pending_classes() 770 headers = {} 771 if len(classes) > 0: 772 classes.sort() 773 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 774 success = bool(service.put(self.data_hash, 775 self.data, 776 timeout=self.timeout, 777 headers=headers)) 778 result = service.last_result() 779 780 if not success: 781 if result.get('status_code'): 782 _logger.debug("Request fail: PUT %s => %s %s", 783 self.data_hash, 784 result.get('status_code'), 785 result.get('body')) 786 raise self.TaskFailed() 787 788 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 789 str(threading.current_thread()), 790 self.data_hash, 791 len(self.data), 792 service_root) 793 try: 794 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 795 except (KeyError, ValueError): 796 replicas_stored = 1 797 798 classes_confirmed = {} 799 try: 800 scch = result['headers']['x-keep-storage-classes-confirmed'] 801 for confirmation in scch.replace(' ', '').split(','): 802 if '=' in confirmation: 803 stored_class, stored_copies = confirmation.split('=')[:2] 804 classes_confirmed[stored_class] = int(stored_copies) 805 except (KeyError, ValueError): 806 # Storage classes confirmed header missing or corrupt 807 classes_confirmed = None 808 809 return result['body'].strip(), replicas_stored, classes_confirmed 810 811 812 def __init__(self, api_client=None, proxy=None, 813 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 814 api_token=None, local_store=None, block_cache=None, 815 num_retries=10, session=None, num_prefetch_threads=None): 816 """Initialize a new KeepClient. 817 818 Arguments: 819 :api_client: 820 The API client to use to find Keep services. If not 821 provided, KeepClient will build one from available Arvados 822 configuration. 823 824 :proxy: 825 If specified, this KeepClient will send requests to this Keep 826 proxy. Otherwise, KeepClient will fall back to the setting of the 827 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 828 If you want to KeepClient does not use a proxy, pass in an empty 829 string. 830 831 :timeout: 832 The initial timeout (in seconds) for HTTP requests to Keep 833 non-proxy servers. A tuple of three floats is interpreted as 834 (connection_timeout, read_timeout, minimum_bandwidth). A connection 835 will be aborted if the average traffic rate falls below 836 minimum_bandwidth bytes per second over an interval of read_timeout 837 seconds. Because timeouts are often a result of transient server 838 load, the actual connection timeout will be increased by a factor 839 of two on each retry. 840 Default: (2, 256, 32768). 841 842 :proxy_timeout: 843 The initial timeout (in seconds) for HTTP requests to 844 Keep proxies. A tuple of three floats is interpreted as 845 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 846 described above for adjusting connection timeouts on retry also 847 applies. 848 Default: (20, 256, 32768). 849 850 :api_token: 851 If you're not using an API client, but only talking 852 directly to a Keep proxy, this parameter specifies an API token 853 to authenticate Keep requests. It is an error to specify both 854 api_client and api_token. If you specify neither, KeepClient 855 will use one available from the Arvados configuration. 856 857 :local_store: 858 If specified, this KeepClient will bypass Keep 859 services, and save data to the named directory. If unspecified, 860 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 861 environment variable. If you want to ensure KeepClient does not 862 use local storage, pass in an empty string. This is primarily 863 intended to mock a server for testing. 864 865 :num_retries: 866 The default number of times to retry failed requests. 867 This will be used as the default num_retries value when get() and 868 put() are called. Default 10. 869 """ 870 self.lock = threading.Lock() 871 if proxy is None: 872 if config.get('ARVADOS_KEEP_SERVICES'): 873 proxy = config.get('ARVADOS_KEEP_SERVICES') 874 else: 875 proxy = config.get('ARVADOS_KEEP_PROXY') 876 if api_token is None: 877 if api_client is None: 878 api_token = config.get('ARVADOS_API_TOKEN') 879 else: 880 api_token = api_client.api_token 881 elif api_client is not None: 882 raise ValueError( 883 "can't build KeepClient with both API client and token") 884 if local_store is None: 885 local_store = os.environ.get('KEEP_LOCAL_STORE') 886 887 if api_client is None: 888 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 889 else: 890 self.insecure = api_client.insecure 891 892 self.block_cache = block_cache if block_cache else KeepBlockCache() 893 self.timeout = timeout 894 self.proxy_timeout = proxy_timeout 895 self._user_agent_pool = queue.LifoQueue() 896 self.upload_counter = Counter() 897 self.download_counter = Counter() 898 self.put_counter = Counter() 899 self.get_counter = Counter() 900 self.hits_counter = Counter() 901 self.misses_counter = Counter() 902 self._storage_classes_unsupported_warning = False 903 self._default_classes = [] 904 if num_prefetch_threads is not None: 905 self.num_prefetch_threads = num_prefetch_threads 906 else: 907 self.num_prefetch_threads = 2 908 self._prefetch_queue = None 909 self._prefetch_threads = None 910 911 if local_store: 912 self.local_store = local_store 913 self.head = self.local_store_head 914 self.get = self.local_store_get 915 self.put = self.local_store_put 916 else: 917 self.num_retries = num_retries 918 self.max_replicas_per_service = None 919 if proxy: 920 proxy_uris = proxy.split() 921 for i in range(len(proxy_uris)): 922 if not proxy_uris[i].endswith('/'): 923 proxy_uris[i] += '/' 924 # URL validation 925 url = urllib.parse.urlparse(proxy_uris[i]) 926 if not (url.scheme and url.netloc): 927 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 928 self.api_token = api_token 929 self._gateway_services = {} 930 self._keep_services = [{ 931 'uuid': "00000-bi6l4-%015d" % idx, 932 'service_type': 'proxy', 933 '_service_root': uri, 934 } for idx, uri in enumerate(proxy_uris)] 935 self._writable_services = self._keep_services 936 self.using_proxy = True 937 self._static_services_list = True 938 else: 939 # It's important to avoid instantiating an API client 940 # unless we actually need one, for testing's sake. 941 if api_client is None: 942 api_client = arvados.api('v1') 943 self.api_client = api_client 944 self.api_token = api_client.api_token 945 self._gateway_services = {} 946 self._keep_services = None 947 self._writable_services = None 948 self.using_proxy = None 949 self._static_services_list = False 950 try: 951 self._default_classes = [ 952 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 953 except KeyError: 954 # We're talking to an old cluster 955 pass 956 957 def current_timeout(self, attempt_number): 958 """Return the appropriate timeout to use for this client. 959 960 The proxy timeout setting if the backend service is currently a proxy, 961 the regular timeout setting otherwise. The `attempt_number` indicates 962 how many times the operation has been tried already (starting from 0 963 for the first try), and scales the connection timeout portion of the 964 return value accordingly. 965 966 """ 967 # TODO(twp): the timeout should be a property of a 968 # KeepService, not a KeepClient. See #4488. 969 t = self.proxy_timeout if self.using_proxy else self.timeout 970 if len(t) == 2: 971 return (t[0] * (1 << attempt_number), t[1]) 972 else: 973 return (t[0] * (1 << attempt_number), t[1], t[2]) 974 def _any_nondisk_services(self, service_list): 975 return any(ks.get('service_type', 'disk') != 'disk' 976 for ks in service_list) 977 978 def build_services_list(self, force_rebuild=False): 979 if (self._static_services_list or 980 (self._keep_services and not force_rebuild)): 981 return 982 with self.lock: 983 try: 984 keep_services = self.api_client.keep_services().accessible() 985 except Exception: # API server predates Keep services. 986 keep_services = self.api_client.keep_disks().list() 987 988 # Gateway services are only used when specified by UUID, 989 # so there's nothing to gain by filtering them by 990 # service_type. 991 self._gateway_services = {ks['uuid']: ks for ks in 992 keep_services.execute()['items']} 993 if not self._gateway_services: 994 raise arvados.errors.NoKeepServersError() 995 996 # Precompute the base URI for each service. 997 for r in self._gateway_services.values(): 998 host = r['service_host'] 999 if not host.startswith('[') and host.find(':') >= 0: 1000 # IPv6 URIs must be formatted like http://[::1]:80/... 1001 host = '[' + host + ']' 1002 r['_service_root'] = "{}://{}:{:d}/".format( 1003 'https' if r['service_ssl_flag'] else 'http', 1004 host, 1005 r['service_port']) 1006 1007 _logger.debug(str(self._gateway_services)) 1008 self._keep_services = [ 1009 ks for ks in self._gateway_services.values() 1010 if not ks.get('service_type', '').startswith('gateway:')] 1011 self._writable_services = [ks for ks in self._keep_services 1012 if not ks.get('read_only')] 1013 1014 # For disk type services, max_replicas_per_service is 1 1015 # It is unknown (unlimited) for other service types. 1016 if self._any_nondisk_services(self._writable_services): 1017 self.max_replicas_per_service = None 1018 else: 1019 self.max_replicas_per_service = 1 1020 1021 def _service_weight(self, data_hash, service_uuid): 1022 """Compute the weight of a Keep service endpoint for a data 1023 block with a known hash. 1024 1025 The weight is md5(h + u) where u is the last 15 characters of 1026 the service endpoint's UUID. 1027 """ 1028 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest() 1029 1030 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1031 """Return an array of Keep service endpoints, in the order in 1032 which they should be probed when reading or writing data with 1033 the given hash+hints. 1034 """ 1035 self.build_services_list(force_rebuild) 1036 1037 sorted_roots = [] 1038 # Use the services indicated by the given +K@... remote 1039 # service hints, if any are present and can be resolved to a 1040 # URI. 1041 for hint in locator.hints: 1042 if hint.startswith('K@'): 1043 if len(hint) == 7: 1044 sorted_roots.append( 1045 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1046 elif len(hint) == 29: 1047 svc = self._gateway_services.get(hint[2:]) 1048 if svc: 1049 sorted_roots.append(svc['_service_root']) 1050 1051 # Sort the available local services by weight (heaviest first) 1052 # for this locator, and return their service_roots (base URIs) 1053 # in that order. 1054 use_services = self._keep_services 1055 if need_writable: 1056 use_services = self._writable_services 1057 self.using_proxy = self._any_nondisk_services(use_services) 1058 sorted_roots.extend([ 1059 svc['_service_root'] for svc in sorted( 1060 use_services, 1061 reverse=True, 1062 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1063 _logger.debug("{}: {}".format(locator, sorted_roots)) 1064 return sorted_roots 1065 1066 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1067 # roots_map is a dictionary, mapping Keep service root strings 1068 # to KeepService objects. Poll for Keep services, and add any 1069 # new ones to roots_map. Return the current list of local 1070 # root strings. 1071 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 1072 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1073 for root in local_roots: 1074 if root not in roots_map: 1075 roots_map[root] = self.KeepService( 1076 root, self._user_agent_pool, 1077 upload_counter=self.upload_counter, 1078 download_counter=self.download_counter, 1079 headers=headers, 1080 insecure=self.insecure) 1081 return local_roots 1082 1083 @staticmethod 1084 def _check_loop_result(result): 1085 # KeepClient RetryLoops should save results as a 2-tuple: the 1086 # actual result of the request, and the number of servers available 1087 # to receive the request this round. 1088 # This method returns True if there's a real result, False if 1089 # there are no more servers available, otherwise None. 1090 if isinstance(result, Exception): 1091 return None 1092 result, tried_server_count = result 1093 if (result is not None) and (result is not False): 1094 return True 1095 elif tried_server_count < 1: 1096 _logger.info("No more Keep services to try; giving up") 1097 return False 1098 else: 1099 return None 1100 1101 def get_from_cache(self, loc_s): 1102 """Fetch a block only if is in the cache, otherwise return None.""" 1103 locator = KeepLocator(loc_s) 1104 slot = self.block_cache.get(locator.md5sum) 1105 if slot is not None and slot.ready.is_set(): 1106 return slot.get() 1107 else: 1108 return None 1109 1110 def refresh_signature(self, loc): 1111 """Ask Keep to get the remote block and return its local signature""" 1112 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1113 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)}) 1114 1115 @retry.retry_method 1116 def head(self, loc_s, **kwargs): 1117 return self._get_or_head(loc_s, method="HEAD", **kwargs) 1118 1119 @retry.retry_method 1120 def get(self, loc_s, **kwargs): 1121 return self._get_or_head(loc_s, method="GET", **kwargs) 1122 1123 def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False): 1124 """Get data from Keep. 1125 1126 This method fetches one or more blocks of data from Keep. It 1127 sends a request each Keep service registered with the API 1128 server (or the proxy provided when this client was 1129 instantiated), then each service named in location hints, in 1130 sequence. As soon as one service provides the data, it's 1131 returned. 1132 1133 Arguments: 1134 * loc_s: A string of one or more comma-separated locators to fetch. 1135 This method returns the concatenation of these blocks. 1136 * num_retries: The number of times to retry GET requests to 1137 *each* Keep server if it returns temporary failures, with 1138 exponential backoff. Note that, in each loop, the method may try 1139 to fetch data from every available Keep service, along with any 1140 that are named in location hints in the locator. The default value 1141 is set when the KeepClient is initialized. 1142 """ 1143 if ',' in loc_s: 1144 return ''.join(self.get(x) for x in loc_s.split(',')) 1145 1146 self.get_counter.add(1) 1147 1148 request_id = (request_id or 1149 (hasattr(self, 'api_client') and self.api_client.request_id) or 1150 arvados.util.new_request_id()) 1151 if headers is None: 1152 headers = {} 1153 headers['X-Request-Id'] = request_id 1154 1155 slot = None 1156 blob = None 1157 try: 1158 locator = KeepLocator(loc_s) 1159 if method == "GET": 1160 while slot is None: 1161 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1162 if first: 1163 # Fresh and empty "first time it is used" slot 1164 break 1165 if prefetch: 1166 # this is request for a prefetch to fill in 1167 # the cache, don't need to wait for the 1168 # result, so if it is already in flight return 1169 # immediately. Clear 'slot' to prevent 1170 # finally block from calling slot.set() 1171 if slot.ready.is_set(): 1172 slot.get() 1173 slot = None 1174 return None 1175 1176 blob = slot.get() 1177 if blob is not None: 1178 self.hits_counter.add(1) 1179 return blob 1180 1181 # If blob is None, this means either 1182 # 1183 # (a) another thread was fetching this block and 1184 # failed with an error or 1185 # 1186 # (b) cache thrashing caused the slot to be 1187 # evicted (content set to None) by another thread 1188 # between the call to reserve_cache() and get(). 1189 # 1190 # We'll handle these cases by reserving a new slot 1191 # and then doing a full GET request. 1192 slot = None 1193 1194 self.misses_counter.add(1) 1195 1196 # If the locator has hints specifying a prefix (indicating a 1197 # remote keepproxy) or the UUID of a local gateway service, 1198 # read data from the indicated service(s) instead of the usual 1199 # list of local disk services. 1200 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1201 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1202 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1203 for hint in locator.hints if ( 1204 hint.startswith('K@') and 1205 len(hint) == 29 and 1206 self._gateway_services.get(hint[2:]) 1207 )]) 1208 # Map root URLs to their KeepService objects. 1209 roots_map = { 1210 root: self.KeepService(root, self._user_agent_pool, 1211 upload_counter=self.upload_counter, 1212 download_counter=self.download_counter, 1213 headers=headers, 1214 insecure=self.insecure) 1215 for root in hint_roots 1216 } 1217 1218 # See #3147 for a discussion of the loop implementation. Highlights: 1219 # * Refresh the list of Keep services after each failure, in case 1220 # it's being updated. 1221 # * Retry until we succeed, we're out of retries, or every available 1222 # service has returned permanent failure. 1223 sorted_roots = [] 1224 roots_map = {} 1225 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1226 backoff_start=2) 1227 for tries_left in loop: 1228 try: 1229 sorted_roots = self.map_new_services( 1230 roots_map, locator, 1231 force_rebuild=(tries_left < num_retries), 1232 need_writable=False, 1233 headers=headers) 1234 except Exception as error: 1235 loop.save_result(error) 1236 continue 1237 1238 # Query KeepService objects that haven't returned 1239 # permanent failure, in our specified shuffle order. 1240 services_to_try = [roots_map[root] 1241 for root in sorted_roots 1242 if roots_map[root].usable()] 1243 for keep_service in services_to_try: 1244 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1245 if blob is not None: 1246 break 1247 loop.save_result((blob, len(services_to_try))) 1248 1249 # Always cache the result, then return it if we succeeded. 1250 if loop.success(): 1251 return blob 1252 finally: 1253 if slot is not None: 1254 self.block_cache.set(slot, blob) 1255 1256 # Q: Including 403 is necessary for the Keep tests to continue 1257 # passing, but maybe they should expect KeepReadError instead? 1258 not_founds = sum(1 for key in sorted_roots 1259 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1260 service_errors = ((key, roots_map[key].last_result()['error']) 1261 for key in sorted_roots) 1262 if not roots_map: 1263 raise arvados.errors.KeepReadError( 1264 "[{}] failed to read {}: no Keep services available ({})".format( 1265 request_id, loc_s, loop.last_result())) 1266 elif not_founds == len(sorted_roots): 1267 raise arvados.errors.NotFoundError( 1268 "[{}] {} not found".format(request_id, loc_s), service_errors) 1269 else: 1270 raise arvados.errors.KeepReadError( 1271 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service") 1272 1273 @retry.retry_method 1274 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1275 """Save data in Keep. 1276 1277 This method will get a list of Keep services from the API server, and 1278 send the data to each one simultaneously in a new thread. Once the 1279 uploads are finished, if enough copies are saved, this method returns 1280 the most recent HTTP response body. If requests fail to upload 1281 enough copies, this method raises KeepWriteError. 1282 1283 Arguments: 1284 * data: The string of data to upload. 1285 * copies: The number of copies that the user requires be saved. 1286 Default 2. 1287 * num_retries: The number of times to retry PUT requests to 1288 *each* Keep server if it returns temporary failures, with 1289 exponential backoff. The default value is set when the 1290 KeepClient is initialized. 1291 * classes: An optional list of storage class names where copies should 1292 be written. 1293 """ 1294 1295 classes = classes or self._default_classes 1296 1297 if not isinstance(data, bytes): 1298 data = data.encode() 1299 1300 self.put_counter.add(1) 1301 1302 data_hash = hashlib.md5(data).hexdigest() 1303 loc_s = data_hash + '+' + str(len(data)) 1304 if copies < 1: 1305 return loc_s 1306 locator = KeepLocator(loc_s) 1307 1308 request_id = (request_id or 1309 (hasattr(self, 'api_client') and self.api_client.request_id) or 1310 arvados.util.new_request_id()) 1311 headers = { 1312 'X-Request-Id': request_id, 1313 'X-Keep-Desired-Replicas': str(copies), 1314 } 1315 roots_map = {} 1316 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1317 backoff_start=2) 1318 done_copies = 0 1319 done_classes = [] 1320 for tries_left in loop: 1321 try: 1322 sorted_roots = self.map_new_services( 1323 roots_map, locator, 1324 force_rebuild=(tries_left < num_retries), 1325 need_writable=True, 1326 headers=headers) 1327 except Exception as error: 1328 loop.save_result(error) 1329 continue 1330 1331 pending_classes = [] 1332 if done_classes is not None: 1333 pending_classes = list(set(classes) - set(done_classes)) 1334 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1335 data_hash=data_hash, 1336 copies=copies - done_copies, 1337 max_service_replicas=self.max_replicas_per_service, 1338 timeout=self.current_timeout(num_retries - tries_left), 1339 classes=pending_classes) 1340 for service_root, ks in [(root, roots_map[root]) 1341 for root in sorted_roots]: 1342 if ks.finished(): 1343 continue 1344 writer_pool.add_task(ks, service_root) 1345 writer_pool.join() 1346 pool_copies, pool_classes = writer_pool.done() 1347 done_copies += pool_copies 1348 if (done_classes is not None) and (pool_classes is not None): 1349 done_classes += pool_classes 1350 loop.save_result( 1351 (done_copies >= copies and set(done_classes) == set(classes), 1352 writer_pool.total_task_nr)) 1353 else: 1354 # Old keepstore contacted without storage classes support: 1355 # success is determined only by successful copies. 1356 # 1357 # Disable storage classes tracking from this point forward. 1358 if not self._storage_classes_unsupported_warning: 1359 self._storage_classes_unsupported_warning = True 1360 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1361 done_classes = None 1362 loop.save_result( 1363 (done_copies >= copies, writer_pool.total_task_nr)) 1364 1365 if loop.success(): 1366 return writer_pool.response() 1367 if not roots_map: 1368 raise arvados.errors.KeepWriteError( 1369 "[{}] failed to write {}: no Keep services available ({})".format( 1370 request_id, data_hash, loop.last_result())) 1371 else: 1372 service_errors = ((key, roots_map[key].last_result()['error']) 1373 for key in sorted_roots 1374 if roots_map[key].last_result()['error']) 1375 raise arvados.errors.KeepWriteError( 1376 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1377 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service") 1378 1379 def _block_prefetch_worker(self): 1380 """The background downloader thread.""" 1381 while True: 1382 try: 1383 b = self._prefetch_queue.get() 1384 if b is None: 1385 return 1386 self.get(b, prefetch=True) 1387 except Exception: 1388 _logger.exception("Exception doing block prefetch") 1389 1390 def _start_prefetch_threads(self): 1391 if self._prefetch_threads is None: 1392 with self.lock: 1393 if self._prefetch_threads is not None: 1394 return 1395 self._prefetch_queue = queue.Queue() 1396 self._prefetch_threads = [] 1397 for i in range(0, self.num_prefetch_threads): 1398 thread = threading.Thread(target=self._block_prefetch_worker) 1399 self._prefetch_threads.append(thread) 1400 thread.daemon = True 1401 thread.start() 1402 1403 def block_prefetch(self, locator): 1404 """ 1405 This relies on the fact that KeepClient implements a block cache, 1406 so repeated requests for the same block will not result in repeated 1407 downloads (unless the block is evicted from the cache.) This method 1408 does not block. 1409 """ 1410 1411 if self.block_cache.get(locator) is not None: 1412 return 1413 1414 self._start_prefetch_threads() 1415 self._prefetch_queue.put(locator) 1416 1417 def stop_prefetch_threads(self): 1418 with self.lock: 1419 if self._prefetch_threads is not None: 1420 for t in self._prefetch_threads: 1421 self._prefetch_queue.put(None) 1422 for t in self._prefetch_threads: 1423 t.join() 1424 self._prefetch_threads = None 1425 self._prefetch_queue = None 1426 1427 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1428 """A stub for put(). 1429 1430 This method is used in place of the real put() method when 1431 using local storage (see constructor's local_store argument). 1432 1433 copies and num_retries arguments are ignored: they are here 1434 only for the sake of offering the same call signature as 1435 put(). 1436 1437 Data stored this way can be retrieved via local_store_get(). 1438 """ 1439 md5 = hashlib.md5(data).hexdigest() 1440 locator = '%s+%d' % (md5, len(data)) 1441 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1442 f.write(data) 1443 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1444 os.path.join(self.local_store, md5)) 1445 return locator 1446 1447 def local_store_get(self, loc_s, num_retries=None): 1448 """Companion to local_store_put().""" 1449 try: 1450 locator = KeepLocator(loc_s) 1451 except ValueError: 1452 raise arvados.errors.NotFoundError( 1453 "Invalid data locator: '%s'" % loc_s) 1454 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1455 return b'' 1456 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1457 return f.read() 1458 1459 def local_store_head(self, loc_s, num_retries=None): 1460 """Companion to local_store_put().""" 1461 try: 1462 locator = KeepLocator(loc_s) 1463 except ValueError: 1464 raise arvados.errors.NotFoundError( 1465 "Invalid data locator: '%s'" % loc_s) 1466 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1467 return True 1468 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1469 return True
812 def __init__(self, api_client=None, proxy=None, 813 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 814 api_token=None, local_store=None, block_cache=None, 815 num_retries=10, session=None, num_prefetch_threads=None): 816 """Initialize a new KeepClient. 817 818 Arguments: 819 :api_client: 820 The API client to use to find Keep services. If not 821 provided, KeepClient will build one from available Arvados 822 configuration. 823 824 :proxy: 825 If specified, this KeepClient will send requests to this Keep 826 proxy. Otherwise, KeepClient will fall back to the setting of the 827 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 828 If you want to KeepClient does not use a proxy, pass in an empty 829 string. 830 831 :timeout: 832 The initial timeout (in seconds) for HTTP requests to Keep 833 non-proxy servers. A tuple of three floats is interpreted as 834 (connection_timeout, read_timeout, minimum_bandwidth). A connection 835 will be aborted if the average traffic rate falls below 836 minimum_bandwidth bytes per second over an interval of read_timeout 837 seconds. Because timeouts are often a result of transient server 838 load, the actual connection timeout will be increased by a factor 839 of two on each retry. 840 Default: (2, 256, 32768). 841 842 :proxy_timeout: 843 The initial timeout (in seconds) for HTTP requests to 844 Keep proxies. A tuple of three floats is interpreted as 845 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 846 described above for adjusting connection timeouts on retry also 847 applies. 848 Default: (20, 256, 32768). 849 850 :api_token: 851 If you're not using an API client, but only talking 852 directly to a Keep proxy, this parameter specifies an API token 853 to authenticate Keep requests. It is an error to specify both 854 api_client and api_token. If you specify neither, KeepClient 855 will use one available from the Arvados configuration. 856 857 :local_store: 858 If specified, this KeepClient will bypass Keep 859 services, and save data to the named directory. If unspecified, 860 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 861 environment variable. If you want to ensure KeepClient does not 862 use local storage, pass in an empty string. This is primarily 863 intended to mock a server for testing. 864 865 :num_retries: 866 The default number of times to retry failed requests. 867 This will be used as the default num_retries value when get() and 868 put() are called. Default 10. 869 """ 870 self.lock = threading.Lock() 871 if proxy is None: 872 if config.get('ARVADOS_KEEP_SERVICES'): 873 proxy = config.get('ARVADOS_KEEP_SERVICES') 874 else: 875 proxy = config.get('ARVADOS_KEEP_PROXY') 876 if api_token is None: 877 if api_client is None: 878 api_token = config.get('ARVADOS_API_TOKEN') 879 else: 880 api_token = api_client.api_token 881 elif api_client is not None: 882 raise ValueError( 883 "can't build KeepClient with both API client and token") 884 if local_store is None: 885 local_store = os.environ.get('KEEP_LOCAL_STORE') 886 887 if api_client is None: 888 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 889 else: 890 self.insecure = api_client.insecure 891 892 self.block_cache = block_cache if block_cache else KeepBlockCache() 893 self.timeout = timeout 894 self.proxy_timeout = proxy_timeout 895 self._user_agent_pool = queue.LifoQueue() 896 self.upload_counter = Counter() 897 self.download_counter = Counter() 898 self.put_counter = Counter() 899 self.get_counter = Counter() 900 self.hits_counter = Counter() 901 self.misses_counter = Counter() 902 self._storage_classes_unsupported_warning = False 903 self._default_classes = [] 904 if num_prefetch_threads is not None: 905 self.num_prefetch_threads = num_prefetch_threads 906 else: 907 self.num_prefetch_threads = 2 908 self._prefetch_queue = None 909 self._prefetch_threads = None 910 911 if local_store: 912 self.local_store = local_store 913 self.head = self.local_store_head 914 self.get = self.local_store_get 915 self.put = self.local_store_put 916 else: 917 self.num_retries = num_retries 918 self.max_replicas_per_service = None 919 if proxy: 920 proxy_uris = proxy.split() 921 for i in range(len(proxy_uris)): 922 if not proxy_uris[i].endswith('/'): 923 proxy_uris[i] += '/' 924 # URL validation 925 url = urllib.parse.urlparse(proxy_uris[i]) 926 if not (url.scheme and url.netloc): 927 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 928 self.api_token = api_token 929 self._gateway_services = {} 930 self._keep_services = [{ 931 'uuid': "00000-bi6l4-%015d" % idx, 932 'service_type': 'proxy', 933 '_service_root': uri, 934 } for idx, uri in enumerate(proxy_uris)] 935 self._writable_services = self._keep_services 936 self.using_proxy = True 937 self._static_services_list = True 938 else: 939 # It's important to avoid instantiating an API client 940 # unless we actually need one, for testing's sake. 941 if api_client is None: 942 api_client = arvados.api('v1') 943 self.api_client = api_client 944 self.api_token = api_client.api_token 945 self._gateway_services = {} 946 self._keep_services = None 947 self._writable_services = None 948 self.using_proxy = None 949 self._static_services_list = False 950 try: 951 self._default_classes = [ 952 k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']] 953 except KeyError: 954 # We're talking to an old cluster 955 pass
Initialize a new KeepClient.
Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.
:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.
:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).
:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).
:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.
:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.
:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.
957 def current_timeout(self, attempt_number): 958 """Return the appropriate timeout to use for this client. 959 960 The proxy timeout setting if the backend service is currently a proxy, 961 the regular timeout setting otherwise. The `attempt_number` indicates 962 how many times the operation has been tried already (starting from 0 963 for the first try), and scales the connection timeout portion of the 964 return value accordingly. 965 966 """ 967 # TODO(twp): the timeout should be a property of a 968 # KeepService, not a KeepClient. See #4488. 969 t = self.proxy_timeout if self.using_proxy else self.timeout 970 if len(t) == 2: 971 return (t[0] * (1 << attempt_number), t[1]) 972 else: 973 return (t[0] * (1 << attempt_number), t[1], t[2])
Return the appropriate timeout to use for this client.
The proxy timeout setting if the backend service is currently a proxy,
the regular timeout setting otherwise. The attempt_number
indicates
how many times the operation has been tried already (starting from 0
for the first try), and scales the connection timeout portion of the
return value accordingly.
978 def build_services_list(self, force_rebuild=False): 979 if (self._static_services_list or 980 (self._keep_services and not force_rebuild)): 981 return 982 with self.lock: 983 try: 984 keep_services = self.api_client.keep_services().accessible() 985 except Exception: # API server predates Keep services. 986 keep_services = self.api_client.keep_disks().list() 987 988 # Gateway services are only used when specified by UUID, 989 # so there's nothing to gain by filtering them by 990 # service_type. 991 self._gateway_services = {ks['uuid']: ks for ks in 992 keep_services.execute()['items']} 993 if not self._gateway_services: 994 raise arvados.errors.NoKeepServersError() 995 996 # Precompute the base URI for each service. 997 for r in self._gateway_services.values(): 998 host = r['service_host'] 999 if not host.startswith('[') and host.find(':') >= 0: 1000 # IPv6 URIs must be formatted like http://[::1]:80/... 1001 host = '[' + host + ']' 1002 r['_service_root'] = "{}://{}:{:d}/".format( 1003 'https' if r['service_ssl_flag'] else 'http', 1004 host, 1005 r['service_port']) 1006 1007 _logger.debug(str(self._gateway_services)) 1008 self._keep_services = [ 1009 ks for ks in self._gateway_services.values() 1010 if not ks.get('service_type', '').startswith('gateway:')] 1011 self._writable_services = [ks for ks in self._keep_services 1012 if not ks.get('read_only')] 1013 1014 # For disk type services, max_replicas_per_service is 1 1015 # It is unknown (unlimited) for other service types. 1016 if self._any_nondisk_services(self._writable_services): 1017 self.max_replicas_per_service = None 1018 else: 1019 self.max_replicas_per_service = 1
1030 def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): 1031 """Return an array of Keep service endpoints, in the order in 1032 which they should be probed when reading or writing data with 1033 the given hash+hints. 1034 """ 1035 self.build_services_list(force_rebuild) 1036 1037 sorted_roots = [] 1038 # Use the services indicated by the given +K@... remote 1039 # service hints, if any are present and can be resolved to a 1040 # URI. 1041 for hint in locator.hints: 1042 if hint.startswith('K@'): 1043 if len(hint) == 7: 1044 sorted_roots.append( 1045 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 1046 elif len(hint) == 29: 1047 svc = self._gateway_services.get(hint[2:]) 1048 if svc: 1049 sorted_roots.append(svc['_service_root']) 1050 1051 # Sort the available local services by weight (heaviest first) 1052 # for this locator, and return their service_roots (base URIs) 1053 # in that order. 1054 use_services = self._keep_services 1055 if need_writable: 1056 use_services = self._writable_services 1057 self.using_proxy = self._any_nondisk_services(use_services) 1058 sorted_roots.extend([ 1059 svc['_service_root'] for svc in sorted( 1060 use_services, 1061 reverse=True, 1062 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 1063 _logger.debug("{}: {}".format(locator, sorted_roots)) 1064 return sorted_roots
Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.
1066 def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers): 1067 # roots_map is a dictionary, mapping Keep service root strings 1068 # to KeepService objects. Poll for Keep services, and add any 1069 # new ones to roots_map. Return the current list of local 1070 # root strings. 1071 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 1072 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 1073 for root in local_roots: 1074 if root not in roots_map: 1075 roots_map[root] = self.KeepService( 1076 root, self._user_agent_pool, 1077 upload_counter=self.upload_counter, 1078 download_counter=self.download_counter, 1079 headers=headers, 1080 insecure=self.insecure) 1081 return local_roots
1101 def get_from_cache(self, loc_s): 1102 """Fetch a block only if is in the cache, otherwise return None.""" 1103 locator = KeepLocator(loc_s) 1104 slot = self.block_cache.get(locator.md5sum) 1105 if slot is not None and slot.ready.is_set(): 1106 return slot.get() 1107 else: 1108 return None
Fetch a block only if is in the cache, otherwise return None.
1110 def refresh_signature(self, loc): 1111 """Ask Keep to get the remote block and return its local signature""" 1112 now = datetime.datetime.utcnow().isoformat("T") + 'Z' 1113 return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
Ask Keep to get the remote block and return its local signature
1273 @retry.retry_method 1274 def put(self, data, copies=2, num_retries=None, request_id=None, classes=None): 1275 """Save data in Keep. 1276 1277 This method will get a list of Keep services from the API server, and 1278 send the data to each one simultaneously in a new thread. Once the 1279 uploads are finished, if enough copies are saved, this method returns 1280 the most recent HTTP response body. If requests fail to upload 1281 enough copies, this method raises KeepWriteError. 1282 1283 Arguments: 1284 * data: The string of data to upload. 1285 * copies: The number of copies that the user requires be saved. 1286 Default 2. 1287 * num_retries: The number of times to retry PUT requests to 1288 *each* Keep server if it returns temporary failures, with 1289 exponential backoff. The default value is set when the 1290 KeepClient is initialized. 1291 * classes: An optional list of storage class names where copies should 1292 be written. 1293 """ 1294 1295 classes = classes or self._default_classes 1296 1297 if not isinstance(data, bytes): 1298 data = data.encode() 1299 1300 self.put_counter.add(1) 1301 1302 data_hash = hashlib.md5(data).hexdigest() 1303 loc_s = data_hash + '+' + str(len(data)) 1304 if copies < 1: 1305 return loc_s 1306 locator = KeepLocator(loc_s) 1307 1308 request_id = (request_id or 1309 (hasattr(self, 'api_client') and self.api_client.request_id) or 1310 arvados.util.new_request_id()) 1311 headers = { 1312 'X-Request-Id': request_id, 1313 'X-Keep-Desired-Replicas': str(copies), 1314 } 1315 roots_map = {} 1316 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1317 backoff_start=2) 1318 done_copies = 0 1319 done_classes = [] 1320 for tries_left in loop: 1321 try: 1322 sorted_roots = self.map_new_services( 1323 roots_map, locator, 1324 force_rebuild=(tries_left < num_retries), 1325 need_writable=True, 1326 headers=headers) 1327 except Exception as error: 1328 loop.save_result(error) 1329 continue 1330 1331 pending_classes = [] 1332 if done_classes is not None: 1333 pending_classes = list(set(classes) - set(done_classes)) 1334 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1335 data_hash=data_hash, 1336 copies=copies - done_copies, 1337 max_service_replicas=self.max_replicas_per_service, 1338 timeout=self.current_timeout(num_retries - tries_left), 1339 classes=pending_classes) 1340 for service_root, ks in [(root, roots_map[root]) 1341 for root in sorted_roots]: 1342 if ks.finished(): 1343 continue 1344 writer_pool.add_task(ks, service_root) 1345 writer_pool.join() 1346 pool_copies, pool_classes = writer_pool.done() 1347 done_copies += pool_copies 1348 if (done_classes is not None) and (pool_classes is not None): 1349 done_classes += pool_classes 1350 loop.save_result( 1351 (done_copies >= copies and set(done_classes) == set(classes), 1352 writer_pool.total_task_nr)) 1353 else: 1354 # Old keepstore contacted without storage classes support: 1355 # success is determined only by successful copies. 1356 # 1357 # Disable storage classes tracking from this point forward. 1358 if not self._storage_classes_unsupported_warning: 1359 self._storage_classes_unsupported_warning = True 1360 _logger.warning("X-Keep-Storage-Classes header not supported by the cluster") 1361 done_classes = None 1362 loop.save_result( 1363 (done_copies >= copies, writer_pool.total_task_nr)) 1364 1365 if loop.success(): 1366 return writer_pool.response() 1367 if not roots_map: 1368 raise arvados.errors.KeepWriteError( 1369 "[{}] failed to write {}: no Keep services available ({})".format( 1370 request_id, data_hash, loop.last_result())) 1371 else: 1372 service_errors = ((key, roots_map[key].last_result()['error']) 1373 for key in sorted_roots 1374 if roots_map[key].last_result()['error']) 1375 raise arvados.errors.KeepWriteError( 1376 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format( 1377 request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
Save data in Keep.
This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.
Arguments:
- data: The string of data to upload.
- copies: The number of copies that the user requires be saved. Default 2.
- num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
- classes: An optional list of storage class names where copies should be written.
1403 def block_prefetch(self, locator): 1404 """ 1405 This relies on the fact that KeepClient implements a block cache, 1406 so repeated requests for the same block will not result in repeated 1407 downloads (unless the block is evicted from the cache.) This method 1408 does not block. 1409 """ 1410 1411 if self.block_cache.get(locator) is not None: 1412 return 1413 1414 self._start_prefetch_threads() 1415 self._prefetch_queue.put(locator)
This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.
1427 def local_store_put(self, data, copies=1, num_retries=None, classes=[]): 1428 """A stub for put(). 1429 1430 This method is used in place of the real put() method when 1431 using local storage (see constructor's local_store argument). 1432 1433 copies and num_retries arguments are ignored: they are here 1434 only for the sake of offering the same call signature as 1435 put(). 1436 1437 Data stored this way can be retrieved via local_store_get(). 1438 """ 1439 md5 = hashlib.md5(data).hexdigest() 1440 locator = '%s+%d' % (md5, len(data)) 1441 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1442 f.write(data) 1443 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1444 os.path.join(self.local_store, md5)) 1445 return locator
A stub for put().
This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).
copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().
Data stored this way can be retrieved via local_store_get().
1447 def local_store_get(self, loc_s, num_retries=None): 1448 """Companion to local_store_put().""" 1449 try: 1450 locator = KeepLocator(loc_s) 1451 except ValueError: 1452 raise arvados.errors.NotFoundError( 1453 "Invalid data locator: '%s'" % loc_s) 1454 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1455 return b'' 1456 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1457 return f.read()
Companion to local_store_put().
1459 def local_store_head(self, loc_s, num_retries=None): 1460 """Companion to local_store_put().""" 1461 try: 1462 locator = KeepLocator(loc_s) 1463 except ValueError: 1464 raise arvados.errors.NotFoundError( 1465 "Invalid data locator: '%s'" % loc_s) 1466 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1467 return True 1468 if os.path.exists(os.path.join(self.local_store, locator.md5sum)): 1469 return True
Companion to local_store_put().
389 class KeepService(PyCurlHelper): 390 """Make requests to a single Keep service, and track results. 391 392 A KeepService is intended to last long enough to perform one 393 transaction (GET or PUT) against one Keep service. This can 394 involve calling either get() or put() multiple times in order 395 to retry after transient failures. However, calling both get() 396 and put() on a single instance -- or using the same instance 397 to access two different Keep services -- will not produce 398 sensible behavior. 399 """ 400 401 HTTP_ERRORS = ( 402 socket.error, 403 ssl.SSLError, 404 arvados.errors.HttpError, 405 ) 406 407 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 408 upload_counter=None, 409 download_counter=None, 410 headers={}, 411 insecure=False): 412 super(KeepClient.KeepService, self).__init__() 413 self.root = root 414 self._user_agent_pool = user_agent_pool 415 self._result = {'error': None} 416 self._usable = True 417 self._session = None 418 self._socket = None 419 self.get_headers = {'Accept': 'application/octet-stream'} 420 self.get_headers.update(headers) 421 self.put_headers = headers 422 self.upload_counter = upload_counter 423 self.download_counter = download_counter 424 self.insecure = insecure 425 426 def usable(self): 427 """Is it worth attempting a request?""" 428 return self._usable 429 430 def finished(self): 431 """Did the request succeed or encounter permanent failure?""" 432 return self._result['error'] == False or not self._usable 433 434 def last_result(self): 435 return self._result 436 437 def _get_user_agent(self): 438 try: 439 return self._user_agent_pool.get(block=False) 440 except queue.Empty: 441 return pycurl.Curl() 442 443 def _put_user_agent(self, ua): 444 try: 445 ua.reset() 446 self._user_agent_pool.put(ua, block=False) 447 except: 448 ua.close() 449 450 def get(self, locator, method="GET", timeout=None): 451 # locator is a KeepLocator object. 452 url = self.root + str(locator) 453 _logger.debug("Request: %s %s", method, url) 454 curl = self._get_user_agent() 455 ok = None 456 try: 457 with timer.Timer() as t: 458 self._headers = {} 459 response_body = BytesIO() 460 curl.setopt(pycurl.NOSIGNAL, 1) 461 curl.setopt(pycurl.OPENSOCKETFUNCTION, 462 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 463 curl.setopt(pycurl.URL, url.encode('utf-8')) 464 curl.setopt(pycurl.HTTPHEADER, [ 465 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 466 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 467 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 468 if self.insecure: 469 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 470 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 471 else: 472 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 473 if method == "HEAD": 474 curl.setopt(pycurl.NOBODY, True) 475 else: 476 curl.setopt(pycurl.HTTPGET, True) 477 self._setcurltimeouts(curl, timeout, method=="HEAD") 478 479 try: 480 curl.perform() 481 except Exception as e: 482 raise arvados.errors.HttpError(0, str(e)) 483 finally: 484 if self._socket: 485 self._socket.close() 486 self._socket = None 487 self._result = { 488 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 489 'body': response_body.getvalue(), 490 'headers': self._headers, 491 'error': False, 492 } 493 494 ok = retry.check_http_response_success(self._result['status_code']) 495 if not ok: 496 self._result['error'] = arvados.errors.HttpError( 497 self._result['status_code'], 498 self._headers.get('x-status-line', 'Error')) 499 except self.HTTP_ERRORS as e: 500 self._result = { 501 'error': e, 502 } 503 self._usable = ok != False 504 if self._result.get('status_code', None): 505 # The client worked well enough to get an HTTP status 506 # code, so presumably any problems are just on the 507 # server side and it's OK to reuse the client. 508 self._put_user_agent(curl) 509 else: 510 # Don't return this client to the pool, in case it's 511 # broken. 512 curl.close() 513 if not ok: 514 _logger.debug("Request fail: GET %s => %s: %s", 515 url, type(self._result['error']), str(self._result['error'])) 516 return None 517 if method == "HEAD": 518 _logger.info("HEAD %s: %s bytes", 519 self._result['status_code'], 520 self._result.get('content-length')) 521 if self._result['headers'].get('x-keep-locator'): 522 # This is a response to a remote block copy request, return 523 # the local copy block locator. 524 return self._result['headers'].get('x-keep-locator') 525 return True 526 527 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 528 self._result['status_code'], 529 len(self._result['body']), 530 t.msecs, 531 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 532 533 if self.download_counter: 534 self.download_counter.add(len(self._result['body'])) 535 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 536 if resp_md5 != locator.md5sum: 537 _logger.warning("Checksum fail: md5(%s) = %s", 538 url, resp_md5) 539 self._result['error'] = arvados.errors.HttpError( 540 0, 'Checksum fail') 541 return None 542 return self._result['body'] 543 544 def put(self, hash_s, body, timeout=None, headers={}): 545 put_headers = copy.copy(self.put_headers) 546 put_headers.update(headers) 547 url = self.root + hash_s 548 _logger.debug("Request: PUT %s", url) 549 curl = self._get_user_agent() 550 ok = None 551 try: 552 with timer.Timer() as t: 553 self._headers = {} 554 body_reader = BytesIO(body) 555 response_body = BytesIO() 556 curl.setopt(pycurl.NOSIGNAL, 1) 557 curl.setopt(pycurl.OPENSOCKETFUNCTION, 558 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 559 curl.setopt(pycurl.URL, url.encode('utf-8')) 560 # Using UPLOAD tells cURL to wait for a "go ahead" from the 561 # Keep server (in the form of a HTTP/1.1 "100 Continue" 562 # response) instead of sending the request body immediately. 563 # This allows the server to reject the request if the request 564 # is invalid or the server is read-only, without waiting for 565 # the client to send the entire block. 566 curl.setopt(pycurl.UPLOAD, True) 567 curl.setopt(pycurl.INFILESIZE, len(body)) 568 curl.setopt(pycurl.READFUNCTION, body_reader.read) 569 curl.setopt(pycurl.HTTPHEADER, [ 570 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 571 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 572 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 573 if self.insecure: 574 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 575 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 576 else: 577 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 578 self._setcurltimeouts(curl, timeout) 579 try: 580 curl.perform() 581 except Exception as e: 582 raise arvados.errors.HttpError(0, str(e)) 583 finally: 584 if self._socket: 585 self._socket.close() 586 self._socket = None 587 self._result = { 588 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 589 'body': response_body.getvalue().decode('utf-8'), 590 'headers': self._headers, 591 'error': False, 592 } 593 ok = retry.check_http_response_success(self._result['status_code']) 594 if not ok: 595 self._result['error'] = arvados.errors.HttpError( 596 self._result['status_code'], 597 self._headers.get('x-status-line', 'Error')) 598 except self.HTTP_ERRORS as e: 599 self._result = { 600 'error': e, 601 } 602 self._usable = ok != False # still usable if ok is True or None 603 if self._result.get('status_code', None): 604 # Client is functional. See comment in get(). 605 self._put_user_agent(curl) 606 else: 607 curl.close() 608 if not ok: 609 _logger.debug("Request fail: PUT %s => %s: %s", 610 url, type(self._result['error']), str(self._result['error'])) 611 return False 612 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 613 self._result['status_code'], 614 len(body), 615 t.msecs, 616 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 617 if self.upload_counter: 618 self.upload_counter.add(len(body)) 619 return True
Make requests to a single Keep service, and track results.
A KeepService is intended to last long enough to perform one transaction (GET or PUT) against one Keep service. This can involve calling either get() or put() multiple times in order to retry after transient failures. However, calling both get() and put() on a single instance – or using the same instance to access two different Keep services – will not produce sensible behavior.
407 def __init__(self, root, user_agent_pool=queue.LifoQueue(), 408 upload_counter=None, 409 download_counter=None, 410 headers={}, 411 insecure=False): 412 super(KeepClient.KeepService, self).__init__() 413 self.root = root 414 self._user_agent_pool = user_agent_pool 415 self._result = {'error': None} 416 self._usable = True 417 self._session = None 418 self._socket = None 419 self.get_headers = {'Accept': 'application/octet-stream'} 420 self.get_headers.update(headers) 421 self.put_headers = headers 422 self.upload_counter = upload_counter 423 self.download_counter = download_counter 424 self.insecure = insecure
430 def finished(self): 431 """Did the request succeed or encounter permanent failure?""" 432 return self._result['error'] == False or not self._usable
Did the request succeed or encounter permanent failure?
450 def get(self, locator, method="GET", timeout=None): 451 # locator is a KeepLocator object. 452 url = self.root + str(locator) 453 _logger.debug("Request: %s %s", method, url) 454 curl = self._get_user_agent() 455 ok = None 456 try: 457 with timer.Timer() as t: 458 self._headers = {} 459 response_body = BytesIO() 460 curl.setopt(pycurl.NOSIGNAL, 1) 461 curl.setopt(pycurl.OPENSOCKETFUNCTION, 462 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 463 curl.setopt(pycurl.URL, url.encode('utf-8')) 464 curl.setopt(pycurl.HTTPHEADER, [ 465 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 466 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 467 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 468 if self.insecure: 469 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 470 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 471 else: 472 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 473 if method == "HEAD": 474 curl.setopt(pycurl.NOBODY, True) 475 else: 476 curl.setopt(pycurl.HTTPGET, True) 477 self._setcurltimeouts(curl, timeout, method=="HEAD") 478 479 try: 480 curl.perform() 481 except Exception as e: 482 raise arvados.errors.HttpError(0, str(e)) 483 finally: 484 if self._socket: 485 self._socket.close() 486 self._socket = None 487 self._result = { 488 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 489 'body': response_body.getvalue(), 490 'headers': self._headers, 491 'error': False, 492 } 493 494 ok = retry.check_http_response_success(self._result['status_code']) 495 if not ok: 496 self._result['error'] = arvados.errors.HttpError( 497 self._result['status_code'], 498 self._headers.get('x-status-line', 'Error')) 499 except self.HTTP_ERRORS as e: 500 self._result = { 501 'error': e, 502 } 503 self._usable = ok != False 504 if self._result.get('status_code', None): 505 # The client worked well enough to get an HTTP status 506 # code, so presumably any problems are just on the 507 # server side and it's OK to reuse the client. 508 self._put_user_agent(curl) 509 else: 510 # Don't return this client to the pool, in case it's 511 # broken. 512 curl.close() 513 if not ok: 514 _logger.debug("Request fail: GET %s => %s: %s", 515 url, type(self._result['error']), str(self._result['error'])) 516 return None 517 if method == "HEAD": 518 _logger.info("HEAD %s: %s bytes", 519 self._result['status_code'], 520 self._result.get('content-length')) 521 if self._result['headers'].get('x-keep-locator'): 522 # This is a response to a remote block copy request, return 523 # the local copy block locator. 524 return self._result['headers'].get('x-keep-locator') 525 return True 526 527 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 528 self._result['status_code'], 529 len(self._result['body']), 530 t.msecs, 531 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 532 533 if self.download_counter: 534 self.download_counter.add(len(self._result['body'])) 535 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 536 if resp_md5 != locator.md5sum: 537 _logger.warning("Checksum fail: md5(%s) = %s", 538 url, resp_md5) 539 self._result['error'] = arvados.errors.HttpError( 540 0, 'Checksum fail') 541 return None 542 return self._result['body']
544 def put(self, hash_s, body, timeout=None, headers={}): 545 put_headers = copy.copy(self.put_headers) 546 put_headers.update(headers) 547 url = self.root + hash_s 548 _logger.debug("Request: PUT %s", url) 549 curl = self._get_user_agent() 550 ok = None 551 try: 552 with timer.Timer() as t: 553 self._headers = {} 554 body_reader = BytesIO(body) 555 response_body = BytesIO() 556 curl.setopt(pycurl.NOSIGNAL, 1) 557 curl.setopt(pycurl.OPENSOCKETFUNCTION, 558 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 559 curl.setopt(pycurl.URL, url.encode('utf-8')) 560 # Using UPLOAD tells cURL to wait for a "go ahead" from the 561 # Keep server (in the form of a HTTP/1.1 "100 Continue" 562 # response) instead of sending the request body immediately. 563 # This allows the server to reject the request if the request 564 # is invalid or the server is read-only, without waiting for 565 # the client to send the entire block. 566 curl.setopt(pycurl.UPLOAD, True) 567 curl.setopt(pycurl.INFILESIZE, len(body)) 568 curl.setopt(pycurl.READFUNCTION, body_reader.read) 569 curl.setopt(pycurl.HTTPHEADER, [ 570 '{}: {}'.format(k,v) for k,v in put_headers.items()]) 571 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 572 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 573 if self.insecure: 574 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 575 curl.setopt(pycurl.SSL_VERIFYHOST, 0) 576 else: 577 curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) 578 self._setcurltimeouts(curl, timeout) 579 try: 580 curl.perform() 581 except Exception as e: 582 raise arvados.errors.HttpError(0, str(e)) 583 finally: 584 if self._socket: 585 self._socket.close() 586 self._socket = None 587 self._result = { 588 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 589 'body': response_body.getvalue().decode('utf-8'), 590 'headers': self._headers, 591 'error': False, 592 } 593 ok = retry.check_http_response_success(self._result['status_code']) 594 if not ok: 595 self._result['error'] = arvados.errors.HttpError( 596 self._result['status_code'], 597 self._headers.get('x-status-line', 'Error')) 598 except self.HTTP_ERRORS as e: 599 self._result = { 600 'error': e, 601 } 602 self._usable = ok != False # still usable if ok is True or None 603 if self._result.get('status_code', None): 604 # Client is functional. See comment in get(). 605 self._put_user_agent(curl) 606 else: 607 curl.close() 608 if not ok: 609 _logger.debug("Request fail: PUT %s => %s: %s", 610 url, type(self._result['error']), str(self._result['error'])) 611 return False 612 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 613 self._result['status_code'], 614 len(body), 615 t.msecs, 616 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 617 if self.upload_counter: 618 self.upload_counter.add(len(body)) 619 return True
622 class KeepWriterQueue(queue.Queue): 623 def __init__(self, copies, classes=[]): 624 queue.Queue.__init__(self) # Old-style superclass 625 self.wanted_copies = copies 626 self.wanted_storage_classes = classes 627 self.successful_copies = 0 628 self.confirmed_storage_classes = {} 629 self.response = None 630 self.storage_classes_tracking = True 631 self.queue_data_lock = threading.RLock() 632 self.pending_tries = max(copies, len(classes)) 633 self.pending_tries_notification = threading.Condition() 634 635 def write_success(self, response, replicas_nr, classes_confirmed): 636 with self.queue_data_lock: 637 self.successful_copies += replicas_nr 638 if classes_confirmed is None: 639 self.storage_classes_tracking = False 640 elif self.storage_classes_tracking: 641 for st_class, st_copies in classes_confirmed.items(): 642 try: 643 self.confirmed_storage_classes[st_class] += st_copies 644 except KeyError: 645 self.confirmed_storage_classes[st_class] = st_copies 646 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 647 self.response = response 648 with self.pending_tries_notification: 649 self.pending_tries_notification.notify_all() 650 651 def write_fail(self, ks): 652 with self.pending_tries_notification: 653 self.pending_tries += 1 654 self.pending_tries_notification.notify() 655 656 def pending_copies(self): 657 with self.queue_data_lock: 658 return self.wanted_copies - self.successful_copies 659 660 def satisfied_classes(self): 661 with self.queue_data_lock: 662 if not self.storage_classes_tracking: 663 # Notifies disabled storage classes expectation to 664 # the outer loop. 665 return None 666 return list(set(self.wanted_storage_classes) - set(self.pending_classes())) 667 668 def pending_classes(self): 669 with self.queue_data_lock: 670 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 671 return [] 672 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 673 for st_class, st_copies in self.confirmed_storage_classes.items(): 674 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 675 unsatisfied_classes.remove(st_class) 676 return unsatisfied_classes 677 678 def get_next_task(self): 679 with self.pending_tries_notification: 680 while True: 681 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 682 # This notify_all() is unnecessary -- 683 # write_success() already called notify_all() 684 # when pending<1 became true, so it's not 685 # possible for any other thread to be in 686 # wait() now -- but it's cheap insurance 687 # against deadlock so we do it anyway: 688 self.pending_tries_notification.notify_all() 689 # Drain the queue and then raise Queue.Empty 690 while True: 691 self.get_nowait() 692 self.task_done() 693 elif self.pending_tries > 0: 694 service, service_root = self.get_nowait() 695 if service.finished(): 696 self.task_done() 697 continue 698 self.pending_tries -= 1 699 return service, service_root 700 elif self.empty(): 701 self.pending_tries_notification.notify_all() 702 raise queue.Empty 703 else: 704 self.pending_tries_notification.wait()
Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
623 def __init__(self, copies, classes=[]): 624 queue.Queue.__init__(self) # Old-style superclass 625 self.wanted_copies = copies 626 self.wanted_storage_classes = classes 627 self.successful_copies = 0 628 self.confirmed_storage_classes = {} 629 self.response = None 630 self.storage_classes_tracking = True 631 self.queue_data_lock = threading.RLock() 632 self.pending_tries = max(copies, len(classes)) 633 self.pending_tries_notification = threading.Condition()
635 def write_success(self, response, replicas_nr, classes_confirmed): 636 with self.queue_data_lock: 637 self.successful_copies += replicas_nr 638 if classes_confirmed is None: 639 self.storage_classes_tracking = False 640 elif self.storage_classes_tracking: 641 for st_class, st_copies in classes_confirmed.items(): 642 try: 643 self.confirmed_storage_classes[st_class] += st_copies 644 except KeyError: 645 self.confirmed_storage_classes[st_class] = st_copies 646 self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes())) 647 self.response = response 648 with self.pending_tries_notification: 649 self.pending_tries_notification.notify_all()
668 def pending_classes(self): 669 with self.queue_data_lock: 670 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None): 671 return [] 672 unsatisfied_classes = copy.copy(self.wanted_storage_classes) 673 for st_class, st_copies in self.confirmed_storage_classes.items(): 674 if st_class in unsatisfied_classes and st_copies >= self.wanted_copies: 675 unsatisfied_classes.remove(st_class) 676 return unsatisfied_classes
678 def get_next_task(self): 679 with self.pending_tries_notification: 680 while True: 681 if self.pending_copies() < 1 and len(self.pending_classes()) == 0: 682 # This notify_all() is unnecessary -- 683 # write_success() already called notify_all() 684 # when pending<1 became true, so it's not 685 # possible for any other thread to be in 686 # wait() now -- but it's cheap insurance 687 # against deadlock so we do it anyway: 688 self.pending_tries_notification.notify_all() 689 # Drain the queue and then raise Queue.Empty 690 while True: 691 self.get_nowait() 692 self.task_done() 693 elif self.pending_tries > 0: 694 service, service_root = self.get_nowait() 695 if service.finished(): 696 self.task_done() 697 continue 698 self.pending_tries -= 1 699 return service, service_root 700 elif self.empty(): 701 self.pending_tries_notification.notify_all() 702 raise queue.Empty 703 else: 704 self.pending_tries_notification.wait()
Inherited Members
- queue.Queue
- maxsize
- mutex
- not_empty
- not_full
- all_tasks_done
- unfinished_tasks
- task_done
- join
- qsize
- empty
- full
- put
- get
- put_nowait
- get_nowait
707 class KeepWriterThreadPool(object): 708 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 709 self.total_task_nr = 0 710 if (not max_service_replicas) or (max_service_replicas >= copies): 711 num_threads = 1 712 else: 713 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 714 _logger.debug("Pool max threads is %d", num_threads) 715 self.workers = [] 716 self.queue = KeepClient.KeepWriterQueue(copies, classes) 717 # Create workers 718 for _ in range(num_threads): 719 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 720 self.workers.append(w) 721 722 def add_task(self, ks, service_root): 723 self.queue.put((ks, service_root)) 724 self.total_task_nr += 1 725 726 def done(self): 727 return self.queue.successful_copies, self.queue.satisfied_classes() 728 729 def join(self): 730 # Start workers 731 for worker in self.workers: 732 worker.start() 733 # Wait for finished work 734 self.queue.join() 735 736 def response(self): 737 return self.queue.response
708 def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]): 709 self.total_task_nr = 0 710 if (not max_service_replicas) or (max_service_replicas >= copies): 711 num_threads = 1 712 else: 713 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 714 _logger.debug("Pool max threads is %d", num_threads) 715 self.workers = [] 716 self.queue = KeepClient.KeepWriterQueue(copies, classes) 717 # Create workers 718 for _ in range(num_threads): 719 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 720 self.workers.append(w)
740 class KeepWriterThread(threading.Thread): 741 class TaskFailed(RuntimeError): pass 742 743 def __init__(self, queue, data, data_hash, timeout=None): 744 super(KeepClient.KeepWriterThread, self).__init__() 745 self.timeout = timeout 746 self.queue = queue 747 self.data = data 748 self.data_hash = data_hash 749 self.daemon = True 750 751 def run(self): 752 while True: 753 try: 754 service, service_root = self.queue.get_next_task() 755 except queue.Empty: 756 return 757 try: 758 locator, copies, classes = self.do_task(service, service_root) 759 except Exception as e: 760 if not isinstance(e, self.TaskFailed): 761 _logger.exception("Exception in KeepWriterThread") 762 self.queue.write_fail(service) 763 else: 764 self.queue.write_success(locator, copies, classes) 765 finally: 766 self.queue.task_done() 767 768 def do_task(self, service, service_root): 769 classes = self.queue.pending_classes() 770 headers = {} 771 if len(classes) > 0: 772 classes.sort() 773 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 774 success = bool(service.put(self.data_hash, 775 self.data, 776 timeout=self.timeout, 777 headers=headers)) 778 result = service.last_result() 779 780 if not success: 781 if result.get('status_code'): 782 _logger.debug("Request fail: PUT %s => %s %s", 783 self.data_hash, 784 result.get('status_code'), 785 result.get('body')) 786 raise self.TaskFailed() 787 788 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 789 str(threading.current_thread()), 790 self.data_hash, 791 len(self.data), 792 service_root) 793 try: 794 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 795 except (KeyError, ValueError): 796 replicas_stored = 1 797 798 classes_confirmed = {} 799 try: 800 scch = result['headers']['x-keep-storage-classes-confirmed'] 801 for confirmation in scch.replace(' ', '').split(','): 802 if '=' in confirmation: 803 stored_class, stored_copies = confirmation.split('=')[:2] 804 classes_confirmed[stored_class] = int(stored_copies) 805 except (KeyError, ValueError): 806 # Storage classes confirmed header missing or corrupt 807 classes_confirmed = None 808 809 return result['body'].strip(), replicas_stored, classes_confirmed
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
743 def __init__(self, queue, data, data_hash, timeout=None): 744 super(KeepClient.KeepWriterThread, self).__init__() 745 self.timeout = timeout 746 self.queue = queue 747 self.data = data 748 self.data_hash = data_hash 749 self.daemon = True
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
1108 @property 1109 def daemon(self): 1110 """A boolean value indicating whether this thread is a daemon thread. 1111 1112 This must be set before start() is called, otherwise RuntimeError is 1113 raised. Its initial value is inherited from the creating thread; the 1114 main thread is not a daemon thread and therefore all threads created in 1115 the main thread default to daemon = False. 1116 1117 The entire Python program exits when only daemon threads are left. 1118 1119 """ 1120 assert self._initialized, "Thread.__init__() not called" 1121 return self._daemonic
A boolean value indicating whether this thread is a daemon thread.
This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when only daemon threads are left.
751 def run(self): 752 while True: 753 try: 754 service, service_root = self.queue.get_next_task() 755 except queue.Empty: 756 return 757 try: 758 locator, copies, classes = self.do_task(service, service_root) 759 except Exception as e: 760 if not isinstance(e, self.TaskFailed): 761 _logger.exception("Exception in KeepWriterThread") 762 self.queue.write_fail(service) 763 else: 764 self.queue.write_success(locator, copies, classes) 765 finally: 766 self.queue.task_done()
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
768 def do_task(self, service, service_root): 769 classes = self.queue.pending_classes() 770 headers = {} 771 if len(classes) > 0: 772 classes.sort() 773 headers['X-Keep-Storage-Classes'] = ', '.join(classes) 774 success = bool(service.put(self.data_hash, 775 self.data, 776 timeout=self.timeout, 777 headers=headers)) 778 result = service.last_result() 779 780 if not success: 781 if result.get('status_code'): 782 _logger.debug("Request fail: PUT %s => %s %s", 783 self.data_hash, 784 result.get('status_code'), 785 result.get('body')) 786 raise self.TaskFailed() 787 788 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 789 str(threading.current_thread()), 790 self.data_hash, 791 len(self.data), 792 service_root) 793 try: 794 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 795 except (KeyError, ValueError): 796 replicas_stored = 1 797 798 classes_confirmed = {} 799 try: 800 scch = result['headers']['x-keep-storage-classes-confirmed'] 801 for confirmation in scch.replace(' ', '').split(','): 802 if '=' in confirmation: 803 stored_class, stored_copies = confirmation.split('=')[:2] 804 classes_confirmed[stored_class] = int(stored_copies) 805 except (KeyError, ValueError): 806 # Storage classes confirmed header missing or corrupt 807 classes_confirmed = None 808 809 return result['body'].strip(), replicas_stored, classes_confirmed
Inherited Members
- threading.Thread
- start
- join
- name
- ident
- is_alive
- isDaemon
- setDaemon
- getName
- setName
- native_id
741 class TaskFailed(RuntimeError): pass
Unspecified run-time error.
Inherited Members
- builtins.RuntimeError
- RuntimeError
- builtins.BaseException
- with_traceback
- args