Package arvados :: Module keep
[hide private]
[frames] | no frames]

Source Code for Module arvados.keep

   1  # Copyright (C) The Arvados Authors. All rights reserved. 
   2  # 
   3  # SPDX-License-Identifier: Apache-2.0 
   4   
   5  from __future__ import absolute_import 
   6  from __future__ import division 
   7  from future import standard_library 
   8  from future.utils import native_str 
   9  standard_library.install_aliases() 
  10  from builtins import next 
  11  from builtins import str 
  12  from builtins import range 
  13  from builtins import object 
  14  import collections 
  15  import datetime 
  16  import hashlib 
  17  import io 
  18  import logging 
  19  import math 
  20  import os 
  21  import pycurl 
  22  import queue 
  23  import re 
  24  import socket 
  25  import ssl 
  26  import sys 
  27  import threading 
  28  from . import timer 
  29  import urllib.parse 
  30   
  31  if sys.version_info >= (3, 0): 
  32      from io import BytesIO 
  33  else: 
  34      from cStringIO import StringIO as BytesIO 
  35   
  36  import arvados 
  37  import arvados.config as config 
  38  import arvados.errors 
  39  import arvados.retry as retry 
  40  import arvados.util 
  41   
  42  _logger = logging.getLogger('arvados.keep') 
  43  global_client_object = None 
  44   
  45   
  46  # Monkey patch TCP constants when not available (apple). Values sourced from: 
  47  # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h 
  48  if sys.platform == 'darwin': 
  49      if not hasattr(socket, 'TCP_KEEPALIVE'): 
  50          socket.TCP_KEEPALIVE = 0x010 
  51      if not hasattr(socket, 'TCP_KEEPINTVL'): 
  52          socket.TCP_KEEPINTVL = 0x101 
  53      if not hasattr(socket, 'TCP_KEEPCNT'): 
  54          socket.TCP_KEEPCNT = 0x102 
55 56 57 -class KeepLocator(object):
58 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) 59 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') 60
61 - def __init__(self, locator_str):
62 self.hints = [] 63 self._perm_sig = None 64 self._perm_expiry = None 65 pieces = iter(locator_str.split('+')) 66 self.md5sum = next(pieces) 67 try: 68 self.size = int(next(pieces)) 69 except StopIteration: 70 self.size = None 71 for hint in pieces: 72 if self.HINT_RE.match(hint) is None: 73 raise ValueError("invalid hint format: {}".format(hint)) 74 elif hint.startswith('A'): 75 self.parse_permission_hint(hint) 76 else: 77 self.hints.append(hint)
78
79 - def __str__(self):
80 return '+'.join( 81 native_str(s) 82 for s in [self.md5sum, self.size, 83 self.permission_hint()] + self.hints 84 if s is not None)
85
86 - def stripped(self):
87 if self.size is not None: 88 return "%s+%i" % (self.md5sum, self.size) 89 else: 90 return self.md5sum
91
92 - def _make_hex_prop(name, length):
93 # Build and return a new property with the given name that 94 # must be a hex string of the given length. 95 data_name = '_{}'.format(name) 96 def getter(self): 97 return getattr(self, data_name)
98 def setter(self, hex_str): 99 if not arvados.util.is_hex(hex_str, length): 100 raise ValueError("{} is not a {}-digit hex string: {!r}". 101 format(name, length, hex_str)) 102 setattr(self, data_name, hex_str)
103 return property(getter, setter) 104 105 md5sum = _make_hex_prop('md5sum', 32) 106 perm_sig = _make_hex_prop('perm_sig', 40) 107 108 @property
109 - def perm_expiry(self):
110 return self._perm_expiry
111 112 @perm_expiry.setter
113 - def perm_expiry(self, value):
114 if not arvados.util.is_hex(value, 1, 8): 115 raise ValueError( 116 "permission timestamp must be a hex Unix timestamp: {}". 117 format(value)) 118 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
119
120 - def permission_hint(self):
121 data = [self.perm_sig, self.perm_expiry] 122 if None in data: 123 return None 124 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds()) 125 return "A{}@{:08x}".format(*data)
126
127 - def parse_permission_hint(self, s):
128 try: 129 self.perm_sig, self.perm_expiry = s[1:].split('@', 1) 130 except IndexError: 131 raise ValueError("bad permission hint {}".format(s))
132
133 - def permission_expired(self, as_of_dt=None):
134 if self.perm_expiry is None: 135 return False 136 elif as_of_dt is None: 137 as_of_dt = datetime.datetime.now() 138 return self.perm_expiry <= as_of_dt
139
140 141 -class Keep(object):
142 """Simple interface to a global KeepClient object. 143 144 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your 145 own API client. The global KeepClient will build an API client from the 146 current Arvados configuration, which may not match the one you built. 147 """ 148 _last_key = None 149 150 @classmethod
151 - def global_client_object(cls):
152 global global_client_object 153 # Previously, KeepClient would change its behavior at runtime based 154 # on these configuration settings. We simulate that behavior here 155 # by checking the values and returning a new KeepClient if any of 156 # them have changed. 157 key = (config.get('ARVADOS_API_HOST'), 158 config.get('ARVADOS_API_TOKEN'), 159 config.flag_is_true('ARVADOS_API_HOST_INSECURE'), 160 config.get('ARVADOS_KEEP_PROXY'), 161 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true', 162 os.environ.get('KEEP_LOCAL_STORE')) 163 if (global_client_object is None) or (cls._last_key != key): 164 global_client_object = KeepClient() 165 cls._last_key = key 166 return global_client_object
167 168 @staticmethod
169 - def get(locator, **kwargs):
170 return Keep.global_client_object().get(locator, **kwargs)
171 172 @staticmethod
173 - def put(data, **kwargs):
174 return Keep.global_client_object().put(data, **kwargs)
175
176 -class KeepBlockCache(object):
177 # Default RAM cache is 256MiB
178 - def __init__(self, cache_max=(256 * 1024 * 1024)):
179 self.cache_max = cache_max 180 self._cache = [] 181 self._cache_lock = threading.Lock()
182
183 - class CacheSlot(object):
184 __slots__ = ("locator", "ready", "content") 185
186 - def __init__(self, locator):
187 self.locator = locator 188 self.ready = threading.Event() 189 self.content = None
190
191 - def get(self):
192 self.ready.wait() 193 return self.content
194
195 - def set(self, value):
196 self.content = value 197 self.ready.set()
198
199 - def size(self):
200 if self.content is None: 201 return 0 202 else: 203 return len(self.content)
204
205 - def cap_cache(self):
206 '''Cap the cache size to self.cache_max''' 207 with self._cache_lock: 208 # Select all slots except those where ready.is_set() and content is 209 # None (that means there was an error reading the block). 210 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)] 211 sm = sum([slot.size() for slot in self._cache]) 212 while len(self._cache) > 0 and sm > self.cache_max: 213 for i in range(len(self._cache)-1, -1, -1): 214 if self._cache[i].ready.is_set(): 215 del self._cache[i] 216 break 217 sm = sum([slot.size() for slot in self._cache])
218
219 - def _get(self, locator):
220 # Test if the locator is already in the cache 221 for i in range(0, len(self._cache)): 222 if self._cache[i].locator == locator: 223 n = self._cache[i] 224 if i != 0: 225 # move it to the front 226 del self._cache[i] 227 self._cache.insert(0, n) 228 return n 229 return None
230
231 - def get(self, locator):
232 with self._cache_lock: 233 return self._get(locator)
234
235 - def reserve_cache(self, locator):
236 '''Reserve a cache slot for the specified locator, 237 or return the existing slot.''' 238 with self._cache_lock: 239 n = self._get(locator) 240 if n: 241 return n, False 242 else: 243 # Add a new cache slot for the locator 244 n = KeepBlockCache.CacheSlot(locator) 245 self._cache.insert(0, n) 246 return n, True
247
248 -class Counter(object):
249 - def __init__(self, v=0):
250 self._lk = threading.Lock() 251 self._val = v
252
253 - def add(self, v):
254 with self._lk: 255 self._val += v
256
257 - def get(self):
258 with self._lk: 259 return self._val
260
261 262 -class KeepClient(object):
263 264 # Default Keep server connection timeout: 2 seconds 265 # Default Keep server read timeout: 256 seconds 266 # Default Keep server bandwidth minimum: 32768 bytes per second 267 # Default Keep proxy connection timeout: 20 seconds 268 # Default Keep proxy read timeout: 256 seconds 269 # Default Keep proxy bandwidth minimum: 32768 bytes per second 270 DEFAULT_TIMEOUT = (2, 256, 32768) 271 DEFAULT_PROXY_TIMEOUT = (20, 256, 32768) 272 273
274 - class KeepService(object):
275 """Make requests to a single Keep service, and track results. 276 277 A KeepService is intended to last long enough to perform one 278 transaction (GET or PUT) against one Keep service. This can 279 involve calling either get() or put() multiple times in order 280 to retry after transient failures. However, calling both get() 281 and put() on a single instance -- or using the same instance 282 to access two different Keep services -- will not produce 283 sensible behavior. 284 """ 285 286 HTTP_ERRORS = ( 287 socket.error, 288 ssl.SSLError, 289 arvados.errors.HttpError, 290 ) 291
292 - def __init__(self, root, user_agent_pool=queue.LifoQueue(), 293 upload_counter=None, 294 download_counter=None, 295 headers={}, 296 insecure=False):
297 self.root = root 298 self._user_agent_pool = user_agent_pool 299 self._result = {'error': None} 300 self._usable = True 301 self._session = None 302 self._socket = None 303 self.get_headers = {'Accept': 'application/octet-stream'} 304 self.get_headers.update(headers) 305 self.put_headers = headers 306 self.upload_counter = upload_counter 307 self.download_counter = download_counter 308 self.insecure = insecure
309
310 - def usable(self):
311 """Is it worth attempting a request?""" 312 return self._usable
313
314 - def finished(self):
315 """Did the request succeed or encounter permanent failure?""" 316 return self._result['error'] == False or not self._usable
317
318 - def last_result(self):
319 return self._result
320
321 - def _get_user_agent(self):
322 try: 323 return self._user_agent_pool.get(block=False) 324 except queue.Empty: 325 return pycurl.Curl()
326
327 - def _put_user_agent(self, ua):
328 try: 329 ua.reset() 330 self._user_agent_pool.put(ua, block=False) 331 except: 332 ua.close()
333
334 - def _socket_open(self, *args, **kwargs):
335 if len(args) + len(kwargs) == 2: 336 return self._socket_open_pycurl_7_21_5(*args, **kwargs) 337 else: 338 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
339
340 - def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
341 return self._socket_open_pycurl_7_21_5( 342 purpose=None, 343 address=collections.namedtuple( 344 'Address', ['family', 'socktype', 'protocol', 'addr'], 345 )(family, socktype, protocol, address))
346
347 - def _socket_open_pycurl_7_21_5(self, purpose, address):
348 """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE""" 349 s = socket.socket(address.family, address.socktype, address.protocol) 350 s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 351 # Will throw invalid protocol error on mac. This test prevents that. 352 if hasattr(socket, 'TCP_KEEPIDLE'): 353 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75) 354 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75) 355 self._socket = s 356 return s
357
358 - def get(self, locator, method="GET", timeout=None):
359 # locator is a KeepLocator object. 360 url = self.root + str(locator) 361 _logger.debug("Request: %s %s", method, url) 362 curl = self._get_user_agent() 363 ok = None 364 try: 365 with timer.Timer() as t: 366 self._headers = {} 367 response_body = BytesIO() 368 curl.setopt(pycurl.NOSIGNAL, 1) 369 curl.setopt(pycurl.OPENSOCKETFUNCTION, 370 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 371 curl.setopt(pycurl.URL, url.encode('utf-8')) 372 curl.setopt(pycurl.HTTPHEADER, [ 373 '{}: {}'.format(k,v) for k,v in self.get_headers.items()]) 374 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 375 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 376 if self.insecure: 377 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 378 if method == "HEAD": 379 curl.setopt(pycurl.NOBODY, True) 380 self._setcurltimeouts(curl, timeout) 381 382 try: 383 curl.perform() 384 except Exception as e: 385 raise arvados.errors.HttpError(0, str(e)) 386 finally: 387 if self._socket: 388 self._socket.close() 389 self._socket = None 390 self._result = { 391 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 392 'body': response_body.getvalue(), 393 'headers': self._headers, 394 'error': False, 395 } 396 397 ok = retry.check_http_response_success(self._result['status_code']) 398 if not ok: 399 self._result['error'] = arvados.errors.HttpError( 400 self._result['status_code'], 401 self._headers.get('x-status-line', 'Error')) 402 except self.HTTP_ERRORS as e: 403 self._result = { 404 'error': e, 405 } 406 self._usable = ok != False 407 if self._result.get('status_code', None): 408 # The client worked well enough to get an HTTP status 409 # code, so presumably any problems are just on the 410 # server side and it's OK to reuse the client. 411 self._put_user_agent(curl) 412 else: 413 # Don't return this client to the pool, in case it's 414 # broken. 415 curl.close() 416 if not ok: 417 _logger.debug("Request fail: GET %s => %s: %s", 418 url, type(self._result['error']), str(self._result['error'])) 419 return None 420 if method == "HEAD": 421 _logger.info("HEAD %s: %s bytes", 422 self._result['status_code'], 423 self._result.get('content-length')) 424 return True 425 426 _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", 427 self._result['status_code'], 428 len(self._result['body']), 429 t.msecs, 430 1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0) 431 432 if self.download_counter: 433 self.download_counter.add(len(self._result['body'])) 434 resp_md5 = hashlib.md5(self._result['body']).hexdigest() 435 if resp_md5 != locator.md5sum: 436 _logger.warning("Checksum fail: md5(%s) = %s", 437 url, resp_md5) 438 self._result['error'] = arvados.errors.HttpError( 439 0, 'Checksum fail') 440 return None 441 return self._result['body']
442
443 - def put(self, hash_s, body, timeout=None):
444 url = self.root + hash_s 445 _logger.debug("Request: PUT %s", url) 446 curl = self._get_user_agent() 447 ok = None 448 try: 449 with timer.Timer() as t: 450 self._headers = {} 451 body_reader = BytesIO(body) 452 response_body = BytesIO() 453 curl.setopt(pycurl.NOSIGNAL, 1) 454 curl.setopt(pycurl.OPENSOCKETFUNCTION, 455 lambda *args, **kwargs: self._socket_open(*args, **kwargs)) 456 curl.setopt(pycurl.URL, url.encode('utf-8')) 457 # Using UPLOAD tells cURL to wait for a "go ahead" from the 458 # Keep server (in the form of a HTTP/1.1 "100 Continue" 459 # response) instead of sending the request body immediately. 460 # This allows the server to reject the request if the request 461 # is invalid or the server is read-only, without waiting for 462 # the client to send the entire block. 463 curl.setopt(pycurl.UPLOAD, True) 464 curl.setopt(pycurl.INFILESIZE, len(body)) 465 curl.setopt(pycurl.READFUNCTION, body_reader.read) 466 curl.setopt(pycurl.HTTPHEADER, [ 467 '{}: {}'.format(k,v) for k,v in self.put_headers.items()]) 468 curl.setopt(pycurl.WRITEFUNCTION, response_body.write) 469 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) 470 if self.insecure: 471 curl.setopt(pycurl.SSL_VERIFYPEER, 0) 472 self._setcurltimeouts(curl, timeout) 473 try: 474 curl.perform() 475 except Exception as e: 476 raise arvados.errors.HttpError(0, str(e)) 477 finally: 478 if self._socket: 479 self._socket.close() 480 self._socket = None 481 self._result = { 482 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), 483 'body': response_body.getvalue().decode('utf-8'), 484 'headers': self._headers, 485 'error': False, 486 } 487 ok = retry.check_http_response_success(self._result['status_code']) 488 if not ok: 489 self._result['error'] = arvados.errors.HttpError( 490 self._result['status_code'], 491 self._headers.get('x-status-line', 'Error')) 492 except self.HTTP_ERRORS as e: 493 self._result = { 494 'error': e, 495 } 496 self._usable = ok != False # still usable if ok is True or None 497 if self._result.get('status_code', None): 498 # Client is functional. See comment in get(). 499 self._put_user_agent(curl) 500 else: 501 curl.close() 502 if not ok: 503 _logger.debug("Request fail: PUT %s => %s: %s", 504 url, type(self._result['error']), str(self._result['error'])) 505 return False 506 _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", 507 self._result['status_code'], 508 len(body), 509 t.msecs, 510 1.0*len(body)/2**20/t.secs if t.secs > 0 else 0) 511 if self.upload_counter: 512 self.upload_counter.add(len(body)) 513 return True
514
515 - def _setcurltimeouts(self, curl, timeouts):
516 if not timeouts: 517 return 518 elif isinstance(timeouts, tuple): 519 if len(timeouts) == 2: 520 conn_t, xfer_t = timeouts 521 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] 522 else: 523 conn_t, xfer_t, bandwidth_bps = timeouts 524 else: 525 conn_t, xfer_t = (timeouts, timeouts) 526 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] 527 curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000)) 528 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t))) 529 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
530
531 - def _headerfunction(self, header_line):
532 if isinstance(header_line, bytes): 533 header_line = header_line.decode('iso-8859-1') 534 if ':' in header_line: 535 name, value = header_line.split(':', 1) 536 name = name.strip().lower() 537 value = value.strip() 538 elif self._headers: 539 name = self._lastheadername 540 value = self._headers[name] + ' ' + header_line.strip() 541 elif header_line.startswith('HTTP/'): 542 name = 'x-status-line' 543 value = header_line 544 else: 545 _logger.error("Unexpected header line: %s", header_line) 546 return 547 self._lastheadername = name 548 self._headers[name] = value
549 # Returning None implies all bytes were written 550 551
552 - class KeepWriterQueue(queue.Queue):
553 - def __init__(self, copies):
554 queue.Queue.__init__(self) # Old-style superclass 555 self.wanted_copies = copies 556 self.successful_copies = 0 557 self.response = None 558 self.successful_copies_lock = threading.Lock() 559 self.pending_tries = copies 560 self.pending_tries_notification = threading.Condition()
561
562 - def write_success(self, response, replicas_nr):
563 with self.successful_copies_lock: 564 self.successful_copies += replicas_nr 565 self.response = response 566 with self.pending_tries_notification: 567 self.pending_tries_notification.notify_all()
568
569 - def write_fail(self, ks):
570 with self.pending_tries_notification: 571 self.pending_tries += 1 572 self.pending_tries_notification.notify()
573
574 - def pending_copies(self):
575 with self.successful_copies_lock: 576 return self.wanted_copies - self.successful_copies
577
578 - def get_next_task(self):
579 with self.pending_tries_notification: 580 while True: 581 if self.pending_copies() < 1: 582 # This notify_all() is unnecessary -- 583 # write_success() already called notify_all() 584 # when pending<1 became true, so it's not 585 # possible for any other thread to be in 586 # wait() now -- but it's cheap insurance 587 # against deadlock so we do it anyway: 588 self.pending_tries_notification.notify_all() 589 # Drain the queue and then raise Queue.Empty 590 while True: 591 self.get_nowait() 592 self.task_done() 593 elif self.pending_tries > 0: 594 service, service_root = self.get_nowait() 595 if service.finished(): 596 self.task_done() 597 continue 598 self.pending_tries -= 1 599 return service, service_root 600 elif self.empty(): 601 self.pending_tries_notification.notify_all() 602 raise queue.Empty 603 else: 604 self.pending_tries_notification.wait()
605 606
607 - class KeepWriterThreadPool(object):
608 - def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
609 self.total_task_nr = 0 610 self.wanted_copies = copies 611 if (not max_service_replicas) or (max_service_replicas >= copies): 612 num_threads = 1 613 else: 614 num_threads = int(math.ceil(1.0*copies/max_service_replicas)) 615 _logger.debug("Pool max threads is %d", num_threads) 616 self.workers = [] 617 self.queue = KeepClient.KeepWriterQueue(copies) 618 # Create workers 619 for _ in range(num_threads): 620 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) 621 self.workers.append(w)
622
623 - def add_task(self, ks, service_root):
624 self.queue.put((ks, service_root)) 625 self.total_task_nr += 1
626
627 - def done(self):
628 return self.queue.successful_copies
629
630 - def join(self):
631 # Start workers 632 for worker in self.workers: 633 worker.start() 634 # Wait for finished work 635 self.queue.join()
636
637 - def response(self):
638 return self.queue.response
639 640
641 - class KeepWriterThread(threading.Thread):
642 TaskFailed = RuntimeError() 643
644 - def __init__(self, queue, data, data_hash, timeout=None):
645 super(KeepClient.KeepWriterThread, self).__init__() 646 self.timeout = timeout 647 self.queue = queue 648 self.data = data 649 self.data_hash = data_hash 650 self.daemon = True
651
652 - def run(self):
653 while True: 654 try: 655 service, service_root = self.queue.get_next_task() 656 except queue.Empty: 657 return 658 try: 659 locator, copies = self.do_task(service, service_root) 660 except Exception as e: 661 if e is not self.TaskFailed: 662 _logger.exception("Exception in KeepWriterThread") 663 self.queue.write_fail(service) 664 else: 665 self.queue.write_success(locator, copies) 666 finally: 667 self.queue.task_done()
668
669 - def do_task(self, service, service_root):
670 success = bool(service.put(self.data_hash, 671 self.data, 672 timeout=self.timeout)) 673 result = service.last_result() 674 675 if not success: 676 if result.get('status_code', None): 677 _logger.debug("Request fail: PUT %s => %s %s", 678 self.data_hash, 679 result['status_code'], 680 result['body']) 681 raise self.TaskFailed 682 683 _logger.debug("KeepWriterThread %s succeeded %s+%i %s", 684 str(threading.current_thread()), 685 self.data_hash, 686 len(self.data), 687 service_root) 688 try: 689 replicas_stored = int(result['headers']['x-keep-replicas-stored']) 690 except (KeyError, ValueError): 691 replicas_stored = 1 692 693 return result['body'].strip(), replicas_stored
694 695
696 - def __init__(self, api_client=None, proxy=None, 697 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, 698 api_token=None, local_store=None, block_cache=None, 699 num_retries=0, session=None):
700 """Initialize a new KeepClient. 701 702 Arguments: 703 :api_client: 704 The API client to use to find Keep services. If not 705 provided, KeepClient will build one from available Arvados 706 configuration. 707 708 :proxy: 709 If specified, this KeepClient will send requests to this Keep 710 proxy. Otherwise, KeepClient will fall back to the setting of the 711 ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. 712 If you want to KeepClient does not use a proxy, pass in an empty 713 string. 714 715 :timeout: 716 The initial timeout (in seconds) for HTTP requests to Keep 717 non-proxy servers. A tuple of three floats is interpreted as 718 (connection_timeout, read_timeout, minimum_bandwidth). A connection 719 will be aborted if the average traffic rate falls below 720 minimum_bandwidth bytes per second over an interval of read_timeout 721 seconds. Because timeouts are often a result of transient server 722 load, the actual connection timeout will be increased by a factor 723 of two on each retry. 724 Default: (2, 256, 32768). 725 726 :proxy_timeout: 727 The initial timeout (in seconds) for HTTP requests to 728 Keep proxies. A tuple of three floats is interpreted as 729 (connection_timeout, read_timeout, minimum_bandwidth). The behavior 730 described above for adjusting connection timeouts on retry also 731 applies. 732 Default: (20, 256, 32768). 733 734 :api_token: 735 If you're not using an API client, but only talking 736 directly to a Keep proxy, this parameter specifies an API token 737 to authenticate Keep requests. It is an error to specify both 738 api_client and api_token. If you specify neither, KeepClient 739 will use one available from the Arvados configuration. 740 741 :local_store: 742 If specified, this KeepClient will bypass Keep 743 services, and save data to the named directory. If unspecified, 744 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE 745 environment variable. If you want to ensure KeepClient does not 746 use local storage, pass in an empty string. This is primarily 747 intended to mock a server for testing. 748 749 :num_retries: 750 The default number of times to retry failed requests. 751 This will be used as the default num_retries value when get() and 752 put() are called. Default 0. 753 """ 754 self.lock = threading.Lock() 755 if proxy is None: 756 if config.get('ARVADOS_KEEP_SERVICES'): 757 proxy = config.get('ARVADOS_KEEP_SERVICES') 758 else: 759 proxy = config.get('ARVADOS_KEEP_PROXY') 760 if api_token is None: 761 if api_client is None: 762 api_token = config.get('ARVADOS_API_TOKEN') 763 else: 764 api_token = api_client.api_token 765 elif api_client is not None: 766 raise ValueError( 767 "can't build KeepClient with both API client and token") 768 if local_store is None: 769 local_store = os.environ.get('KEEP_LOCAL_STORE') 770 771 if api_client is None: 772 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') 773 else: 774 self.insecure = api_client.insecure 775 776 self.block_cache = block_cache if block_cache else KeepBlockCache() 777 self.timeout = timeout 778 self.proxy_timeout = proxy_timeout 779 self._user_agent_pool = queue.LifoQueue() 780 self.upload_counter = Counter() 781 self.download_counter = Counter() 782 self.put_counter = Counter() 783 self.get_counter = Counter() 784 self.hits_counter = Counter() 785 self.misses_counter = Counter() 786 787 if local_store: 788 self.local_store = local_store 789 self.get = self.local_store_get 790 self.put = self.local_store_put 791 else: 792 self.num_retries = num_retries 793 self.max_replicas_per_service = None 794 if proxy: 795 proxy_uris = proxy.split() 796 for i in range(len(proxy_uris)): 797 if not proxy_uris[i].endswith('/'): 798 proxy_uris[i] += '/' 799 # URL validation 800 url = urllib.parse.urlparse(proxy_uris[i]) 801 if not (url.scheme and url.netloc): 802 raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i])) 803 self.api_token = api_token 804 self._gateway_services = {} 805 self._keep_services = [{ 806 'uuid': "00000-bi6l4-%015d" % idx, 807 'service_type': 'proxy', 808 '_service_root': uri, 809 } for idx, uri in enumerate(proxy_uris)] 810 self._writable_services = self._keep_services 811 self.using_proxy = True 812 self._static_services_list = True 813 else: 814 # It's important to avoid instantiating an API client 815 # unless we actually need one, for testing's sake. 816 if api_client is None: 817 api_client = arvados.api('v1') 818 self.api_client = api_client 819 self.api_token = api_client.api_token 820 self._gateway_services = {} 821 self._keep_services = None 822 self._writable_services = None 823 self.using_proxy = None 824 self._static_services_list = False
825
826 - def current_timeout(self, attempt_number):
827 """Return the appropriate timeout to use for this client. 828 829 The proxy timeout setting if the backend service is currently a proxy, 830 the regular timeout setting otherwise. The `attempt_number` indicates 831 how many times the operation has been tried already (starting from 0 832 for the first try), and scales the connection timeout portion of the 833 return value accordingly. 834 835 """ 836 # TODO(twp): the timeout should be a property of a 837 # KeepService, not a KeepClient. See #4488. 838 t = self.proxy_timeout if self.using_proxy else self.timeout 839 if len(t) == 2: 840 return (t[0] * (1 << attempt_number), t[1]) 841 else: 842 return (t[0] * (1 << attempt_number), t[1], t[2])
843 - def _any_nondisk_services(self, service_list):
844 return any(ks.get('service_type', 'disk') != 'disk' 845 for ks in service_list)
846
847 - def build_services_list(self, force_rebuild=False):
848 if (self._static_services_list or 849 (self._keep_services and not force_rebuild)): 850 return 851 with self.lock: 852 try: 853 keep_services = self.api_client.keep_services().accessible() 854 except Exception: # API server predates Keep services. 855 keep_services = self.api_client.keep_disks().list() 856 857 # Gateway services are only used when specified by UUID, 858 # so there's nothing to gain by filtering them by 859 # service_type. 860 self._gateway_services = {ks['uuid']: ks for ks in 861 keep_services.execute()['items']} 862 if not self._gateway_services: 863 raise arvados.errors.NoKeepServersError() 864 865 # Precompute the base URI for each service. 866 for r in self._gateway_services.values(): 867 host = r['service_host'] 868 if not host.startswith('[') and host.find(':') >= 0: 869 # IPv6 URIs must be formatted like http://[::1]:80/... 870 host = '[' + host + ']' 871 r['_service_root'] = "{}://{}:{:d}/".format( 872 'https' if r['service_ssl_flag'] else 'http', 873 host, 874 r['service_port']) 875 876 _logger.debug(str(self._gateway_services)) 877 self._keep_services = [ 878 ks for ks in self._gateway_services.values() 879 if not ks.get('service_type', '').startswith('gateway:')] 880 self._writable_services = [ks for ks in self._keep_services 881 if not ks.get('read_only')] 882 883 # For disk type services, max_replicas_per_service is 1 884 # It is unknown (unlimited) for other service types. 885 if self._any_nondisk_services(self._writable_services): 886 self.max_replicas_per_service = None 887 else: 888 self.max_replicas_per_service = 1
889
890 - def _service_weight(self, data_hash, service_uuid):
891 """Compute the weight of a Keep service endpoint for a data 892 block with a known hash. 893 894 The weight is md5(h + u) where u is the last 15 characters of 895 the service endpoint's UUID. 896 """ 897 return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
898
899 - def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
900 """Return an array of Keep service endpoints, in the order in 901 which they should be probed when reading or writing data with 902 the given hash+hints. 903 """ 904 self.build_services_list(force_rebuild) 905 906 sorted_roots = [] 907 # Use the services indicated by the given +K@... remote 908 # service hints, if any are present and can be resolved to a 909 # URI. 910 for hint in locator.hints: 911 if hint.startswith('K@'): 912 if len(hint) == 7: 913 sorted_roots.append( 914 "https://keep.{}.arvadosapi.com/".format(hint[2:])) 915 elif len(hint) == 29: 916 svc = self._gateway_services.get(hint[2:]) 917 if svc: 918 sorted_roots.append(svc['_service_root']) 919 920 # Sort the available local services by weight (heaviest first) 921 # for this locator, and return their service_roots (base URIs) 922 # in that order. 923 use_services = self._keep_services 924 if need_writable: 925 use_services = self._writable_services 926 self.using_proxy = self._any_nondisk_services(use_services) 927 sorted_roots.extend([ 928 svc['_service_root'] for svc in sorted( 929 use_services, 930 reverse=True, 931 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) 932 _logger.debug("{}: {}".format(locator, sorted_roots)) 933 return sorted_roots
934
935 - def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
936 # roots_map is a dictionary, mapping Keep service root strings 937 # to KeepService objects. Poll for Keep services, and add any 938 # new ones to roots_map. Return the current list of local 939 # root strings. 940 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) 941 local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) 942 for root in local_roots: 943 if root not in roots_map: 944 roots_map[root] = self.KeepService( 945 root, self._user_agent_pool, 946 upload_counter=self.upload_counter, 947 download_counter=self.download_counter, 948 headers=headers, 949 insecure=self.insecure) 950 return local_roots
951 952 @staticmethod
953 - def _check_loop_result(result):
954 # KeepClient RetryLoops should save results as a 2-tuple: the 955 # actual result of the request, and the number of servers available 956 # to receive the request this round. 957 # This method returns True if there's a real result, False if 958 # there are no more servers available, otherwise None. 959 if isinstance(result, Exception): 960 return None 961 result, tried_server_count = result 962 if (result is not None) and (result is not False): 963 return True 964 elif tried_server_count < 1: 965 _logger.info("No more Keep services to try; giving up") 966 return False 967 else: 968 return None
969
970 - def get_from_cache(self, loc):
971 """Fetch a block only if is in the cache, otherwise return None.""" 972 slot = self.block_cache.get(loc) 973 if slot is not None and slot.ready.is_set(): 974 return slot.get() 975 else: 976 return None
977 978 @retry.retry_method
979 - def head(self, loc_s, **kwargs):
980 return self._get_or_head(loc_s, method="HEAD", **kwargs)
981 982 @retry.retry_method
983 - def get(self, loc_s, **kwargs):
984 return self._get_or_head(loc_s, method="GET", **kwargs)
985
986 - def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
987 """Get data from Keep. 988 989 This method fetches one or more blocks of data from Keep. It 990 sends a request each Keep service registered with the API 991 server (or the proxy provided when this client was 992 instantiated), then each service named in location hints, in 993 sequence. As soon as one service provides the data, it's 994 returned. 995 996 Arguments: 997 * loc_s: A string of one or more comma-separated locators to fetch. 998 This method returns the concatenation of these blocks. 999 * num_retries: The number of times to retry GET requests to 1000 *each* Keep server if it returns temporary failures, with 1001 exponential backoff. Note that, in each loop, the method may try 1002 to fetch data from every available Keep service, along with any 1003 that are named in location hints in the locator. The default value 1004 is set when the KeepClient is initialized. 1005 """ 1006 if ',' in loc_s: 1007 return ''.join(self.get(x) for x in loc_s.split(',')) 1008 1009 self.get_counter.add(1) 1010 1011 slot = None 1012 blob = None 1013 try: 1014 locator = KeepLocator(loc_s) 1015 if method == "GET": 1016 slot, first = self.block_cache.reserve_cache(locator.md5sum) 1017 if not first: 1018 self.hits_counter.add(1) 1019 blob = slot.get() 1020 if blob is None: 1021 raise arvados.errors.KeepReadError( 1022 "failed to read {}".format(loc_s)) 1023 return blob 1024 1025 self.misses_counter.add(1) 1026 1027 headers = { 1028 'X-Request-Id': (request_id or 1029 (hasattr(self, 'api_client') and self.api_client.request_id) or 1030 arvados.util.new_request_id()), 1031 } 1032 1033 # If the locator has hints specifying a prefix (indicating a 1034 # remote keepproxy) or the UUID of a local gateway service, 1035 # read data from the indicated service(s) instead of the usual 1036 # list of local disk services. 1037 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) 1038 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7] 1039 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root'] 1040 for hint in locator.hints if ( 1041 hint.startswith('K@') and 1042 len(hint) == 29 and 1043 self._gateway_services.get(hint[2:]) 1044 )]) 1045 # Map root URLs to their KeepService objects. 1046 roots_map = { 1047 root: self.KeepService(root, self._user_agent_pool, 1048 upload_counter=self.upload_counter, 1049 download_counter=self.download_counter, 1050 headers=headers, 1051 insecure=self.insecure) 1052 for root in hint_roots 1053 } 1054 1055 # See #3147 for a discussion of the loop implementation. Highlights: 1056 # * Refresh the list of Keep services after each failure, in case 1057 # it's being updated. 1058 # * Retry until we succeed, we're out of retries, or every available 1059 # service has returned permanent failure. 1060 sorted_roots = [] 1061 roots_map = {} 1062 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1063 backoff_start=2) 1064 for tries_left in loop: 1065 try: 1066 sorted_roots = self.map_new_services( 1067 roots_map, locator, 1068 force_rebuild=(tries_left < num_retries), 1069 need_writable=False, 1070 headers=headers) 1071 except Exception as error: 1072 loop.save_result(error) 1073 continue 1074 1075 # Query KeepService objects that haven't returned 1076 # permanent failure, in our specified shuffle order. 1077 services_to_try = [roots_map[root] 1078 for root in sorted_roots 1079 if roots_map[root].usable()] 1080 for keep_service in services_to_try: 1081 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left)) 1082 if blob is not None: 1083 break 1084 loop.save_result((blob, len(services_to_try))) 1085 1086 # Always cache the result, then return it if we succeeded. 1087 if loop.success(): 1088 if method == "HEAD": 1089 return True 1090 else: 1091 return blob 1092 finally: 1093 if slot is not None: 1094 slot.set(blob) 1095 self.block_cache.cap_cache() 1096 1097 # Q: Including 403 is necessary for the Keep tests to continue 1098 # passing, but maybe they should expect KeepReadError instead? 1099 not_founds = sum(1 for key in sorted_roots 1100 if roots_map[key].last_result().get('status_code', None) in {403, 404, 410}) 1101 service_errors = ((key, roots_map[key].last_result()['error']) 1102 for key in sorted_roots) 1103 if not roots_map: 1104 raise arvados.errors.KeepReadError( 1105 "failed to read {}: no Keep services available ({})".format( 1106 loc_s, loop.last_result())) 1107 elif not_founds == len(sorted_roots): 1108 raise arvados.errors.NotFoundError( 1109 "{} not found".format(loc_s), service_errors) 1110 else: 1111 raise arvados.errors.KeepReadError( 1112 "failed to read {}".format(loc_s), service_errors, label="service")
1113 1114 @retry.retry_method
1115 - def put(self, data, copies=2, num_retries=None, request_id=None):
1116 """Save data in Keep. 1117 1118 This method will get a list of Keep services from the API server, and 1119 send the data to each one simultaneously in a new thread. Once the 1120 uploads are finished, if enough copies are saved, this method returns 1121 the most recent HTTP response body. If requests fail to upload 1122 enough copies, this method raises KeepWriteError. 1123 1124 Arguments: 1125 * data: The string of data to upload. 1126 * copies: The number of copies that the user requires be saved. 1127 Default 2. 1128 * num_retries: The number of times to retry PUT requests to 1129 *each* Keep server if it returns temporary failures, with 1130 exponential backoff. The default value is set when the 1131 KeepClient is initialized. 1132 """ 1133 1134 if not isinstance(data, bytes): 1135 data = data.encode() 1136 1137 self.put_counter.add(1) 1138 1139 data_hash = hashlib.md5(data).hexdigest() 1140 loc_s = data_hash + '+' + str(len(data)) 1141 if copies < 1: 1142 return loc_s 1143 locator = KeepLocator(loc_s) 1144 1145 headers = { 1146 'X-Request-Id': (request_id or 1147 (hasattr(self, 'api_client') and self.api_client.request_id) or 1148 arvados.util.new_request_id()), 1149 'X-Keep-Desired-Replicas': str(copies), 1150 } 1151 roots_map = {} 1152 loop = retry.RetryLoop(num_retries, self._check_loop_result, 1153 backoff_start=2) 1154 done = 0 1155 for tries_left in loop: 1156 try: 1157 sorted_roots = self.map_new_services( 1158 roots_map, locator, 1159 force_rebuild=(tries_left < num_retries), 1160 need_writable=True, 1161 headers=headers) 1162 except Exception as error: 1163 loop.save_result(error) 1164 continue 1165 1166 writer_pool = KeepClient.KeepWriterThreadPool(data=data, 1167 data_hash=data_hash, 1168 copies=copies - done, 1169 max_service_replicas=self.max_replicas_per_service, 1170 timeout=self.current_timeout(num_retries - tries_left)) 1171 for service_root, ks in [(root, roots_map[root]) 1172 for root in sorted_roots]: 1173 if ks.finished(): 1174 continue 1175 writer_pool.add_task(ks, service_root) 1176 writer_pool.join() 1177 done += writer_pool.done() 1178 loop.save_result((done >= copies, writer_pool.total_task_nr)) 1179 1180 if loop.success(): 1181 return writer_pool.response() 1182 if not roots_map: 1183 raise arvados.errors.KeepWriteError( 1184 "failed to write {}: no Keep services available ({})".format( 1185 data_hash, loop.last_result())) 1186 else: 1187 service_errors = ((key, roots_map[key].last_result()['error']) 1188 for key in sorted_roots 1189 if roots_map[key].last_result()['error']) 1190 raise arvados.errors.KeepWriteError( 1191 "failed to write {} (wanted {} copies but wrote {})".format( 1192 data_hash, copies, writer_pool.done()), service_errors, label="service")
1193
1194 - def local_store_put(self, data, copies=1, num_retries=None):
1195 """A stub for put(). 1196 1197 This method is used in place of the real put() method when 1198 using local storage (see constructor's local_store argument). 1199 1200 copies and num_retries arguments are ignored: they are here 1201 only for the sake of offering the same call signature as 1202 put(). 1203 1204 Data stored this way can be retrieved via local_store_get(). 1205 """ 1206 md5 = hashlib.md5(data).hexdigest() 1207 locator = '%s+%d' % (md5, len(data)) 1208 with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f: 1209 f.write(data) 1210 os.rename(os.path.join(self.local_store, md5 + '.tmp'), 1211 os.path.join(self.local_store, md5)) 1212 return locator
1213
1214 - def local_store_get(self, loc_s, num_retries=None):
1215 """Companion to local_store_put().""" 1216 try: 1217 locator = KeepLocator(loc_s) 1218 except ValueError: 1219 raise arvados.errors.NotFoundError( 1220 "Invalid data locator: '%s'" % loc_s) 1221 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: 1222 return b'' 1223 with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: 1224 return f.read()
1225
1226 - def is_cached(self, locator):
1227 return self.block_cache.reserve_cache(expect_hash)
1228