arvados.keep

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import copy
   6import collections
   7import datetime
   8import hashlib
   9import errno
  10import io
  11import logging
  12import math
  13import os
  14import pycurl
  15import queue
  16import re
  17import socket
  18import ssl
  19import sys
  20import threading
  21import resource
  22import urllib.parse
  23import traceback
  24import weakref
  25
  26from io import BytesIO
  27
  28import arvados
  29import arvados.config as config
  30import arvados.errors
  31import arvados.retry as retry
  32import arvados.util
  33
  34from ._internal import basedirs, diskcache, Timer
  35from ._internal.pycurl import PyCurlHelper
  36
  37_logger = logging.getLogger('arvados.keep')
  38global_client_object = None
  39
  40# Monkey patch TCP constants when not available (apple). Values sourced from:
  41# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
  42if sys.platform == 'darwin':
  43    if not hasattr(socket, 'TCP_KEEPALIVE'):
  44        socket.TCP_KEEPALIVE = 0x010
  45    if not hasattr(socket, 'TCP_KEEPINTVL'):
  46        socket.TCP_KEEPINTVL = 0x101
  47    if not hasattr(socket, 'TCP_KEEPCNT'):
  48        socket.TCP_KEEPCNT = 0x102
  49
  50class KeepLocator(object):
  51    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
  52    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
  53
  54    def __init__(self, locator_str):
  55        self.hints = []
  56        self._perm_sig = None
  57        self._perm_expiry = None
  58        pieces = iter(locator_str.split('+'))
  59        self.md5sum = next(pieces)
  60        try:
  61            self.size = int(next(pieces))
  62        except StopIteration:
  63            self.size = None
  64        for hint in pieces:
  65            if self.HINT_RE.match(hint) is None:
  66                raise ValueError("invalid hint format: {}".format(hint))
  67            elif hint.startswith('A'):
  68                self.parse_permission_hint(hint)
  69            else:
  70                self.hints.append(hint)
  71
  72    def __str__(self):
  73        return '+'.join(
  74            str(s)
  75            for s in [self.md5sum, self.size,
  76                      self.permission_hint()] + self.hints
  77            if s is not None)
  78
  79    def stripped(self):
  80        if self.size is not None:
  81            return "%s+%i" % (self.md5sum, self.size)
  82        else:
  83            return self.md5sum
  84
  85    def _make_hex_prop(name, length):
  86        # Build and return a new property with the given name that
  87        # must be a hex string of the given length.
  88        data_name = '_{}'.format(name)
  89        def getter(self):
  90            return getattr(self, data_name)
  91        def setter(self, hex_str):
  92            if not arvados.util.is_hex(hex_str, length):
  93                raise ValueError("{} is not a {}-digit hex string: {!r}".
  94                                 format(name, length, hex_str))
  95            setattr(self, data_name, hex_str)
  96        return property(getter, setter)
  97
  98    md5sum = _make_hex_prop('md5sum', 32)
  99    perm_sig = _make_hex_prop('perm_sig', 40)
 100
 101    @property
 102    def perm_expiry(self):
 103        return self._perm_expiry
 104
 105    @perm_expiry.setter
 106    def perm_expiry(self, value):
 107        if not arvados.util.is_hex(value, 1, 8):
 108            raise ValueError(
 109                "permission timestamp must be a hex Unix timestamp: {}".
 110                format(value))
 111        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
 112
 113    def permission_hint(self):
 114        data = [self.perm_sig, self.perm_expiry]
 115        if None in data:
 116            return None
 117        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
 118        return "A{}@{:08x}".format(*data)
 119
 120    def parse_permission_hint(self, s):
 121        try:
 122            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
 123        except IndexError:
 124            raise ValueError("bad permission hint {}".format(s))
 125
 126    def permission_expired(self, as_of_dt=None):
 127        if self.perm_expiry is None:
 128            return False
 129        elif as_of_dt is None:
 130            as_of_dt = datetime.datetime.now()
 131        return self.perm_expiry <= as_of_dt
 132
 133
 134class KeepBlockCache(object):
 135    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
 136        self.cache_max = cache_max
 137        self._cache = collections.OrderedDict()
 138        self._cache_lock = threading.Lock()
 139        self._max_slots = max_slots
 140        self._disk_cache = disk_cache
 141        self._disk_cache_dir = disk_cache_dir
 142        self._cache_updating = threading.Condition(self._cache_lock)
 143
 144        if self._disk_cache and self._disk_cache_dir is None:
 145            self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
 146
 147        if self._max_slots == 0:
 148            if self._disk_cache:
 149                # Each block uses two file descriptors, one used to
 150                # open it initially and hold the flock(), and a second
 151                # hidden one used by mmap().
 152                #
 153                # Set max slots to 1/8 of maximum file handles.  This
 154                # means we'll use at most 1/4 of total file handles.
 155                #
 156                # NOFILE typically defaults to 1024 on Linux so this
 157                # is 128 slots (256 file handles), which means we can
 158                # cache up to 8 GiB of 64 MiB blocks.  This leaves
 159                # 768 file handles for sockets and other stuff.
 160                #
 161                # When we want the ability to have more cache (e.g. in
 162                # arv-mount) we'll increase rlimit before calling
 163                # this.
 164                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
 165            else:
 166                # RAM cache slots
 167                self._max_slots = 512
 168
 169        if self.cache_max == 0:
 170            if self._disk_cache:
 171                fs = os.statvfs(self._disk_cache_dir)
 172                # Calculation of available space incorporates existing cache usage
 173                existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
 174                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
 175                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
 176                # pick smallest of:
 177                # 10% of total disk size
 178                # 25% of available space
 179                # max_slots * 64 MiB
 180                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
 181            else:
 182                # 256 MiB in RAM
 183                self.cache_max = (256 * 1024 * 1024)
 184
 185        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 186
 187        self.cache_total = 0
 188        if self._disk_cache:
 189            self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
 190            for slot in self._cache.values():
 191                self.cache_total += slot.size()
 192            self.cap_cache()
 193
 194    class _CacheSlot:
 195        __slots__ = ("locator", "ready", "content")
 196
 197        def __init__(self, locator):
 198            self.locator = locator
 199            self.ready = threading.Event()
 200            self.content = None
 201
 202        def get(self):
 203            self.ready.wait()
 204            return self.content
 205
 206        def set(self, value):
 207            if self.content is not None:
 208                return False
 209            self.content = value
 210            self.ready.set()
 211            return True
 212
 213        def size(self):
 214            if self.content is None:
 215                return 0
 216            else:
 217                return len(self.content)
 218
 219        def evict(self):
 220            self.content = None
 221
 222
 223    def _resize_cache(self, cache_max, max_slots):
 224        # Try and make sure the contents of the cache do not exceed
 225        # the supplied maximums.
 226
 227        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
 228            return
 229
 230        _evict_candidates = collections.deque(self._cache.values())
 231        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
 232            slot = _evict_candidates.popleft()
 233            if not slot.ready.is_set():
 234                continue
 235
 236            sz = slot.size()
 237            slot.evict()
 238            self.cache_total -= sz
 239            del self._cache[slot.locator]
 240
 241
 242    def cap_cache(self):
 243        '''Cap the cache size to self.cache_max'''
 244        with self._cache_updating:
 245            self._resize_cache(self.cache_max, self._max_slots)
 246            self._cache_updating.notify_all()
 247
 248    def _get(self, locator):
 249        # Test if the locator is already in the cache
 250        if locator in self._cache:
 251            n = self._cache[locator]
 252            if n.ready.is_set() and n.content is None:
 253                del self._cache[n.locator]
 254                return None
 255            self._cache.move_to_end(locator)
 256            return n
 257        if self._disk_cache:
 258            # see if it exists on disk
 259            n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
 260            if n is not None:
 261                self._cache[n.locator] = n
 262                self.cache_total += n.size()
 263                return n
 264        return None
 265
 266    def get(self, locator):
 267        with self._cache_lock:
 268            return self._get(locator)
 269
 270    def reserve_cache(self, locator):
 271        '''Reserve a cache slot for the specified locator,
 272        or return the existing slot.'''
 273        with self._cache_updating:
 274            n = self._get(locator)
 275            if n:
 276                return n, False
 277            else:
 278                # Add a new cache slot for the locator
 279                self._resize_cache(self.cache_max, self._max_slots-1)
 280                while len(self._cache) >= self._max_slots:
 281                    # If there isn't a slot available, need to wait
 282                    # for something to happen that releases one of the
 283                    # cache slots.  Idle for 200 ms or woken up by
 284                    # another thread
 285                    self._cache_updating.wait(timeout=0.2)
 286                    self._resize_cache(self.cache_max, self._max_slots-1)
 287
 288                if self._disk_cache:
 289                    n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
 290                else:
 291                    n = KeepBlockCache._CacheSlot(locator)
 292                self._cache[n.locator] = n
 293                return n, True
 294
 295    def set(self, slot, blob):
 296        try:
 297            if slot.set(blob):
 298                self.cache_total += slot.size()
 299            return
 300        except OSError as e:
 301            if e.errno == errno.ENOMEM:
 302                # Reduce max slots to current - 4, cap cache and retry
 303                with self._cache_lock:
 304                    self._max_slots = max(4, len(self._cache) - 4)
 305            elif e.errno == errno.ENOSPC:
 306                # Reduce disk max space to current - 256 MiB, cap cache and retry
 307                with self._cache_lock:
 308                    sm = sum(st.size() for st in self._cache.values())
 309                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
 310            elif e.errno == errno.ENODEV:
 311                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
 312        except Exception as e:
 313            pass
 314        finally:
 315            # Check if we should evict things from the cache.  Either
 316            # because we added a new thing or there was an error and
 317            # we possibly adjusted the limits down, so we might need
 318            # to push something out.
 319            self.cap_cache()
 320
 321        try:
 322            # Only gets here if there was an error the first time. The
 323            # exception handler adjusts limits downward in some cases
 324            # to free up resources, which would make the operation
 325            # succeed.
 326            if slot.set(blob):
 327                self.cache_total += slot.size()
 328        except Exception as e:
 329            # It failed again.  Give up.
 330            slot.set(None)
 331            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
 332
 333        self.cap_cache()
 334
 335
 336class _Counter:
 337    def __init__(self, v=0):
 338        self._lk = threading.Lock()
 339        self._val = v
 340
 341    def add(self, v):
 342        with self._lk:
 343            self._val += v
 344
 345    def get(self):
 346        with self._lk:
 347            return self._val
 348
 349
 350class KeepClient(object):
 351    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 352    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 353
 354    class _KeepService(PyCurlHelper):
 355        """Make requests to a single Keep service, and track results.
 356
 357        A _KeepService is intended to last long enough to perform one
 358        transaction (GET or PUT) against one Keep service. This can
 359        involve calling either get() or put() multiple times in order
 360        to retry after transient failures. However, calling both get()
 361        and put() on a single instance -- or using the same instance
 362        to access two different Keep services -- will not produce
 363        sensible behavior.
 364        """
 365
 366        HTTP_ERRORS = (
 367            socket.error,
 368            ssl.SSLError,
 369            arvados.errors.HttpError,
 370        )
 371
 372        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 373                     upload_counter=None,
 374                     download_counter=None,
 375                     headers={},
 376                     insecure=False):
 377            super().__init__()
 378            self.root = root
 379            self._user_agent_pool = user_agent_pool
 380            self._result = {'error': None}
 381            self._usable = True
 382            self._session = None
 383            self._socket = None
 384            self.get_headers = {'Accept': 'application/octet-stream'}
 385            self.get_headers.update(headers)
 386            self.put_headers = headers
 387            self.upload_counter = upload_counter
 388            self.download_counter = download_counter
 389            self.insecure = insecure
 390
 391        def usable(self):
 392            """Is it worth attempting a request?"""
 393            return self._usable
 394
 395        def finished(self):
 396            """Did the request succeed or encounter permanent failure?"""
 397            return self._result['error'] == False or not self._usable
 398
 399        def last_result(self):
 400            return self._result
 401
 402        def _get_user_agent(self):
 403            try:
 404                return self._user_agent_pool.get(block=False)
 405            except queue.Empty:
 406                return pycurl.Curl()
 407
 408        def _put_user_agent(self, ua):
 409            try:
 410                ua.reset()
 411                self._user_agent_pool.put(ua, block=False)
 412            except:
 413                ua.close()
 414
 415        def get(self, locator, method="GET", timeout=None):
 416            # locator is a KeepLocator object.
 417            url = self.root + str(locator)
 418            _logger.debug("Request: %s %s", method, url)
 419            curl = self._get_user_agent()
 420            ok = None
 421            try:
 422                with Timer() as t:
 423                    self._headers = {}
 424                    response_body = BytesIO()
 425                    curl.setopt(pycurl.NOSIGNAL, 1)
 426                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 427                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 428                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 429                    curl.setopt(pycurl.HTTPHEADER, [
 430                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 431                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 432                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 433                    if self.insecure:
 434                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 435                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 436                    else:
 437                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 438                    if method == "HEAD":
 439                        curl.setopt(pycurl.NOBODY, True)
 440                    else:
 441                        curl.setopt(pycurl.HTTPGET, True)
 442                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 443
 444                    try:
 445                        curl.perform()
 446                    except Exception as e:
 447                        raise arvados.errors.HttpError(0, str(e))
 448                    finally:
 449                        if self._socket:
 450                            self._socket.close()
 451                            self._socket = None
 452                    self._result = {
 453                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 454                        'body': response_body.getvalue(),
 455                        'headers': self._headers,
 456                        'error': False,
 457                    }
 458
 459                ok = retry.check_http_response_success(self._result['status_code'])
 460                if not ok:
 461                    self._result['error'] = arvados.errors.HttpError(
 462                        self._result['status_code'],
 463                        self._headers.get('x-status-line', 'Error'))
 464            except self.HTTP_ERRORS as e:
 465                self._result = {
 466                    'error': e,
 467                }
 468            self._usable = ok != False
 469            if self._result.get('status_code', None):
 470                # The client worked well enough to get an HTTP status
 471                # code, so presumably any problems are just on the
 472                # server side and it's OK to reuse the client.
 473                self._put_user_agent(curl)
 474            else:
 475                # Don't return this client to the pool, in case it's
 476                # broken.
 477                curl.close()
 478            if not ok:
 479                _logger.debug("Request fail: GET %s => %s: %s",
 480                              url, type(self._result['error']), str(self._result['error']))
 481                return None
 482            if method == "HEAD":
 483                _logger.info("HEAD %s: %s bytes",
 484                         self._result['status_code'],
 485                         self._result.get('content-length'))
 486                if self._result['headers'].get('x-keep-locator'):
 487                    # This is a response to a remote block copy request, return
 488                    # the local copy block locator.
 489                    return self._result['headers'].get('x-keep-locator')
 490                return True
 491
 492            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 493                         self._result['status_code'],
 494                         len(self._result['body']),
 495                         t.msecs,
 496                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 497
 498            if self.download_counter:
 499                self.download_counter.add(len(self._result['body']))
 500            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 501            if resp_md5 != locator.md5sum:
 502                _logger.warning("Checksum fail: md5(%s) = %s",
 503                                url, resp_md5)
 504                self._result['error'] = arvados.errors.HttpError(
 505                    0, 'Checksum fail')
 506                return None
 507            return self._result['body']
 508
 509        def put(self, hash_s, body, timeout=None, headers={}):
 510            put_headers = copy.copy(self.put_headers)
 511            put_headers.update(headers)
 512            url = self.root + hash_s
 513            _logger.debug("Request: PUT %s", url)
 514            curl = self._get_user_agent()
 515            ok = None
 516            try:
 517                with Timer() as t:
 518                    self._headers = {}
 519                    body_reader = BytesIO(body)
 520                    response_body = BytesIO()
 521                    curl.setopt(pycurl.NOSIGNAL, 1)
 522                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 523                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 524                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 525                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 526                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 527                    # response) instead of sending the request body immediately.
 528                    # This allows the server to reject the request if the request
 529                    # is invalid or the server is read-only, without waiting for
 530                    # the client to send the entire block.
 531                    curl.setopt(pycurl.UPLOAD, True)
 532                    curl.setopt(pycurl.INFILESIZE, len(body))
 533                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 534                    curl.setopt(pycurl.HTTPHEADER, [
 535                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 536                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 537                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 538                    if self.insecure:
 539                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 540                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 541                    else:
 542                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 543                    self._setcurltimeouts(curl, timeout)
 544                    try:
 545                        curl.perform()
 546                    except Exception as e:
 547                        raise arvados.errors.HttpError(0, str(e))
 548                    finally:
 549                        if self._socket:
 550                            self._socket.close()
 551                            self._socket = None
 552                    self._result = {
 553                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 554                        'body': response_body.getvalue().decode('utf-8'),
 555                        'headers': self._headers,
 556                        'error': False,
 557                    }
 558                ok = retry.check_http_response_success(self._result['status_code'])
 559                if not ok:
 560                    self._result['error'] = arvados.errors.HttpError(
 561                        self._result['status_code'],
 562                        self._headers.get('x-status-line', 'Error'))
 563            except self.HTTP_ERRORS as e:
 564                self._result = {
 565                    'error': e,
 566                }
 567            self._usable = ok != False # still usable if ok is True or None
 568            if self._result.get('status_code', None):
 569                # Client is functional. See comment in get().
 570                self._put_user_agent(curl)
 571            else:
 572                curl.close()
 573            if not ok:
 574                _logger.debug("Request fail: PUT %s => %s: %s",
 575                              url, type(self._result['error']), str(self._result['error']))
 576                return False
 577            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 578                         self._result['status_code'],
 579                         len(body),
 580                         t.msecs,
 581                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 582            if self.upload_counter:
 583                self.upload_counter.add(len(body))
 584            return True
 585
 586
 587    class _KeepWriterQueue(queue.Queue):
 588        def __init__(self, copies, classes=[]):
 589            queue.Queue.__init__(self) # Old-style superclass
 590            self.wanted_copies = copies
 591            self.wanted_storage_classes = classes
 592            self.successful_copies = 0
 593            self.confirmed_storage_classes = {}
 594            self.response = None
 595            self.storage_classes_tracking = True
 596            self.queue_data_lock = threading.RLock()
 597            self.pending_tries = max(copies, len(classes))
 598            self.pending_tries_notification = threading.Condition()
 599
 600        def write_success(self, response, replicas_nr, classes_confirmed):
 601            with self.queue_data_lock:
 602                self.successful_copies += replicas_nr
 603                if classes_confirmed is None:
 604                    self.storage_classes_tracking = False
 605                elif self.storage_classes_tracking:
 606                    for st_class, st_copies in classes_confirmed.items():
 607                        try:
 608                            self.confirmed_storage_classes[st_class] += st_copies
 609                        except KeyError:
 610                            self.confirmed_storage_classes[st_class] = st_copies
 611                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 612                self.response = response
 613            with self.pending_tries_notification:
 614                self.pending_tries_notification.notify_all()
 615
 616        def write_fail(self, ks):
 617            with self.pending_tries_notification:
 618                self.pending_tries += 1
 619                self.pending_tries_notification.notify()
 620
 621        def pending_copies(self):
 622            with self.queue_data_lock:
 623                return self.wanted_copies - self.successful_copies
 624
 625        def satisfied_classes(self):
 626            with self.queue_data_lock:
 627                if not self.storage_classes_tracking:
 628                    # Notifies disabled storage classes expectation to
 629                    # the outer loop.
 630                    return None
 631            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 632
 633        def pending_classes(self):
 634            with self.queue_data_lock:
 635                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 636                    return []
 637                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 638                for st_class, st_copies in self.confirmed_storage_classes.items():
 639                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 640                        unsatisfied_classes.remove(st_class)
 641                return unsatisfied_classes
 642
 643        def get_next_task(self):
 644            with self.pending_tries_notification:
 645                while True:
 646                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 647                        # This notify_all() is unnecessary --
 648                        # write_success() already called notify_all()
 649                        # when pending<1 became true, so it's not
 650                        # possible for any other thread to be in
 651                        # wait() now -- but it's cheap insurance
 652                        # against deadlock so we do it anyway:
 653                        self.pending_tries_notification.notify_all()
 654                        # Drain the queue and then raise Queue.Empty
 655                        while True:
 656                            self.get_nowait()
 657                            self.task_done()
 658                    elif self.pending_tries > 0:
 659                        service, service_root = self.get_nowait()
 660                        if service.finished():
 661                            self.task_done()
 662                            continue
 663                        self.pending_tries -= 1
 664                        return service, service_root
 665                    elif self.empty():
 666                        self.pending_tries_notification.notify_all()
 667                        raise queue.Empty
 668                    else:
 669                        self.pending_tries_notification.wait()
 670
 671
 672    class _KeepWriterThreadPool:
 673        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 674            self.total_task_nr = 0
 675            if (not max_service_replicas) or (max_service_replicas >= copies):
 676                num_threads = 1
 677            else:
 678                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 679            _logger.debug("Pool max threads is %d", num_threads)
 680            self.workers = []
 681            self.queue = KeepClient._KeepWriterQueue(copies, classes)
 682            # Create workers
 683            for _ in range(num_threads):
 684                w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
 685                self.workers.append(w)
 686
 687        def add_task(self, ks, service_root):
 688            self.queue.put((ks, service_root))
 689            self.total_task_nr += 1
 690
 691        def done(self):
 692            return self.queue.successful_copies, self.queue.satisfied_classes()
 693
 694        def join(self):
 695            # Start workers
 696            for worker in self.workers:
 697                worker.start()
 698            # Wait for finished work
 699            self.queue.join()
 700
 701        def response(self):
 702            return self.queue.response
 703
 704
 705    class _KeepWriterThread(threading.Thread):
 706        class TaskFailed(RuntimeError):
 707            """Exception for failed Keep writes
 708
 709            TODO: Move this class to the module top level and document it
 710
 711            @private
 712            """
 713
 714
 715        def __init__(self, queue, data, data_hash, timeout=None):
 716            super().__init__()
 717            self.timeout = timeout
 718            self.queue = queue
 719            self.data = data
 720            self.data_hash = data_hash
 721            self.daemon = True
 722
 723        def run(self):
 724            while True:
 725                try:
 726                    service, service_root = self.queue.get_next_task()
 727                except queue.Empty:
 728                    return
 729                try:
 730                    locator, copies, classes = self.do_task(service, service_root)
 731                except Exception as e:
 732                    if not isinstance(e, self.TaskFailed):
 733                        _logger.exception("Exception in _KeepWriterThread")
 734                    self.queue.write_fail(service)
 735                else:
 736                    self.queue.write_success(locator, copies, classes)
 737                finally:
 738                    self.queue.task_done()
 739
 740        def do_task(self, service, service_root):
 741            classes = self.queue.pending_classes()
 742            headers = {}
 743            if len(classes) > 0:
 744                classes.sort()
 745                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 746            success = bool(service.put(self.data_hash,
 747                                        self.data,
 748                                        timeout=self.timeout,
 749                                        headers=headers))
 750            result = service.last_result()
 751
 752            if not success:
 753                if result.get('status_code'):
 754                    _logger.debug("Request fail: PUT %s => %s %s",
 755                                  self.data_hash,
 756                                  result.get('status_code'),
 757                                  result.get('body'))
 758                raise self.TaskFailed()
 759
 760            _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
 761                          str(threading.current_thread()),
 762                          self.data_hash,
 763                          len(self.data),
 764                          service_root)
 765            try:
 766                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 767            except (KeyError, ValueError):
 768                replicas_stored = 1
 769
 770            classes_confirmed = {}
 771            try:
 772                scch = result['headers']['x-keep-storage-classes-confirmed']
 773                for confirmation in scch.replace(' ', '').split(','):
 774                    if '=' in confirmation:
 775                        stored_class, stored_copies = confirmation.split('=')[:2]
 776                        classes_confirmed[stored_class] = int(stored_copies)
 777            except (KeyError, ValueError):
 778                # Storage classes confirmed header missing or corrupt
 779                classes_confirmed = None
 780
 781            return result['body'].strip(), replicas_stored, classes_confirmed
 782
 783
 784    def __init__(self, api_client=None, proxy=None,
 785                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 786                 api_token=None, local_store=None, block_cache=None,
 787                 num_retries=10, session=None, num_prefetch_threads=None):
 788        """Initialize a new KeepClient.
 789
 790        Arguments:
 791        :api_client:
 792          The API client to use to find Keep services.  If not
 793          provided, KeepClient will build one from available Arvados
 794          configuration.
 795
 796        :proxy:
 797          If specified, this KeepClient will send requests to this Keep
 798          proxy.  Otherwise, KeepClient will fall back to the setting of the
 799          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 800          If you want to KeepClient does not use a proxy, pass in an empty
 801          string.
 802
 803        :timeout:
 804          The initial timeout (in seconds) for HTTP requests to Keep
 805          non-proxy servers.  A tuple of three floats is interpreted as
 806          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 807          will be aborted if the average traffic rate falls below
 808          minimum_bandwidth bytes per second over an interval of read_timeout
 809          seconds. Because timeouts are often a result of transient server
 810          load, the actual connection timeout will be increased by a factor
 811          of two on each retry.
 812          Default: (2, 256, 32768).
 813
 814        :proxy_timeout:
 815          The initial timeout (in seconds) for HTTP requests to
 816          Keep proxies. A tuple of three floats is interpreted as
 817          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 818          described above for adjusting connection timeouts on retry also
 819          applies.
 820          Default: (20, 256, 32768).
 821
 822        :api_token:
 823          If you're not using an API client, but only talking
 824          directly to a Keep proxy, this parameter specifies an API token
 825          to authenticate Keep requests.  It is an error to specify both
 826          api_client and api_token.  If you specify neither, KeepClient
 827          will use one available from the Arvados configuration.
 828
 829        :local_store:
 830          If specified, this KeepClient will bypass Keep
 831          services, and save data to the named directory.  If unspecified,
 832          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 833          environment variable.  If you want to ensure KeepClient does not
 834          use local storage, pass in an empty string.  This is primarily
 835          intended to mock a server for testing.
 836
 837        :num_retries:
 838          The default number of times to retry failed requests.
 839          This will be used as the default num_retries value when get() and
 840          put() are called.  Default 10.
 841        """
 842        self.lock = threading.Lock()
 843        if proxy is None:
 844            if config.get('ARVADOS_KEEP_SERVICES'):
 845                proxy = config.get('ARVADOS_KEEP_SERVICES')
 846            else:
 847                proxy = config.get('ARVADOS_KEEP_PROXY')
 848        if api_token is None:
 849            if api_client is None:
 850                api_token = config.get('ARVADOS_API_TOKEN')
 851            else:
 852                api_token = api_client.api_token
 853        elif api_client is not None:
 854            raise ValueError(
 855                "can't build KeepClient with both API client and token")
 856        if local_store is None:
 857            local_store = os.environ.get('KEEP_LOCAL_STORE')
 858
 859        if api_client is None:
 860            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 861        else:
 862            self.insecure = api_client.insecure
 863
 864        self.block_cache = block_cache if block_cache else KeepBlockCache()
 865        self.timeout = timeout
 866        self.proxy_timeout = proxy_timeout
 867        self._user_agent_pool = queue.LifoQueue()
 868        self.upload_counter = _Counter()
 869        self.download_counter = _Counter()
 870        self.put_counter = _Counter()
 871        self.get_counter = _Counter()
 872        self.hits_counter = _Counter()
 873        self.misses_counter = _Counter()
 874        self._storage_classes_unsupported_warning = False
 875        self._default_classes = []
 876        if num_prefetch_threads is not None:
 877            self.num_prefetch_threads = num_prefetch_threads
 878        else:
 879            self.num_prefetch_threads = 2
 880        self._prefetch_queue = None
 881        self._prefetch_threads = None
 882
 883        if local_store:
 884            self.local_store = local_store
 885            self.head = self.local_store_head
 886            self.get = self.local_store_get
 887            self.put = self.local_store_put
 888        else:
 889            self.num_retries = num_retries
 890            self.max_replicas_per_service = None
 891            if proxy:
 892                proxy_uris = proxy.split()
 893                for i in range(len(proxy_uris)):
 894                    if not proxy_uris[i].endswith('/'):
 895                        proxy_uris[i] += '/'
 896                    # URL validation
 897                    url = urllib.parse.urlparse(proxy_uris[i])
 898                    if not (url.scheme and url.netloc):
 899                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 900                self.api_token = api_token
 901                self._gateway_services = {}
 902                self._keep_services = [{
 903                    'uuid': "00000-bi6l4-%015d" % idx,
 904                    'service_type': 'proxy',
 905                    '_service_root': uri,
 906                    } for idx, uri in enumerate(proxy_uris)]
 907                self._writable_services = self._keep_services
 908                self.using_proxy = True
 909                self._static_services_list = True
 910            else:
 911                # It's important to avoid instantiating an API client
 912                # unless we actually need one, for testing's sake.
 913                if api_client is None:
 914                    api_client = arvados.api('v1')
 915                self.api_client = api_client
 916                self.api_token = api_client.api_token
 917                self._gateway_services = {}
 918                self._keep_services = None
 919                self._writable_services = None
 920                self.using_proxy = None
 921                self._static_services_list = False
 922                try:
 923                    self._default_classes = [
 924                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 925                except KeyError:
 926                    # We're talking to an old cluster
 927                    pass
 928
 929    def current_timeout(self, attempt_number):
 930        """Return the appropriate timeout to use for this client.
 931
 932        The proxy timeout setting if the backend service is currently a proxy,
 933        the regular timeout setting otherwise.  The `attempt_number` indicates
 934        how many times the operation has been tried already (starting from 0
 935        for the first try), and scales the connection timeout portion of the
 936        return value accordingly.
 937
 938        """
 939        # TODO(twp): the timeout should be a property of a
 940        # _KeepService, not a KeepClient. See #4488.
 941        t = self.proxy_timeout if self.using_proxy else self.timeout
 942        if len(t) == 2:
 943            return (t[0] * (1 << attempt_number), t[1])
 944        else:
 945            return (t[0] * (1 << attempt_number), t[1], t[2])
 946    def _any_nondisk_services(self, service_list):
 947        return any(ks.get('service_type', 'disk') != 'disk'
 948                   for ks in service_list)
 949
 950    def build_services_list(self, force_rebuild=False):
 951        if (self._static_services_list or
 952              (self._keep_services and not force_rebuild)):
 953            return
 954        with self.lock:
 955            try:
 956                keep_services = self.api_client.keep_services().accessible()
 957            except Exception:  # API server predates Keep services.
 958                keep_services = self.api_client.keep_disks().list()
 959
 960            # Gateway services are only used when specified by UUID,
 961            # so there's nothing to gain by filtering them by
 962            # service_type.
 963            self._gateway_services = {ks['uuid']: ks for ks in
 964                                      keep_services.execute()['items']}
 965            if not self._gateway_services:
 966                raise arvados.errors.NoKeepServersError()
 967
 968            # Precompute the base URI for each service.
 969            for r in self._gateway_services.values():
 970                host = r['service_host']
 971                if not host.startswith('[') and host.find(':') >= 0:
 972                    # IPv6 URIs must be formatted like http://[::1]:80/...
 973                    host = '[' + host + ']'
 974                r['_service_root'] = "{}://{}:{:d}/".format(
 975                    'https' if r['service_ssl_flag'] else 'http',
 976                    host,
 977                    r['service_port'])
 978
 979            _logger.debug(str(self._gateway_services))
 980            self._keep_services = [
 981                ks for ks in self._gateway_services.values()
 982                if not ks.get('service_type', '').startswith('gateway:')]
 983            self._writable_services = [ks for ks in self._keep_services
 984                                       if not ks.get('read_only')]
 985
 986            # For disk type services, max_replicas_per_service is 1
 987            # It is unknown (unlimited) for other service types.
 988            if self._any_nondisk_services(self._writable_services):
 989                self.max_replicas_per_service = None
 990            else:
 991                self.max_replicas_per_service = 1
 992
 993    def _service_weight(self, data_hash, service_uuid):
 994        """Compute the weight of a Keep service endpoint for a data
 995        block with a known hash.
 996
 997        The weight is md5(h + u) where u is the last 15 characters of
 998        the service endpoint's UUID.
 999        """
1000        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1001
1002    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1003        """Return an array of Keep service endpoints, in the order in
1004        which they should be probed when reading or writing data with
1005        the given hash+hints.
1006        """
1007        self.build_services_list(force_rebuild)
1008
1009        sorted_roots = []
1010        # Use the services indicated by the given +K@... remote
1011        # service hints, if any are present and can be resolved to a
1012        # URI.
1013        for hint in locator.hints:
1014            if hint.startswith('K@'):
1015                if len(hint) == 7:
1016                    sorted_roots.append(
1017                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1018                elif len(hint) == 29:
1019                    svc = self._gateway_services.get(hint[2:])
1020                    if svc:
1021                        sorted_roots.append(svc['_service_root'])
1022
1023        # Sort the available local services by weight (heaviest first)
1024        # for this locator, and return their service_roots (base URIs)
1025        # in that order.
1026        use_services = self._keep_services
1027        if need_writable:
1028            use_services = self._writable_services
1029        self.using_proxy = self._any_nondisk_services(use_services)
1030        sorted_roots.extend([
1031            svc['_service_root'] for svc in sorted(
1032                use_services,
1033                reverse=True,
1034                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1035        _logger.debug("{}: {}".format(locator, sorted_roots))
1036        return sorted_roots
1037
1038    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1039        # roots_map is a dictionary, mapping Keep service root strings
1040        # to _KeepService objects.  Poll for Keep services, and add any
1041        # new ones to roots_map.  Return the current list of local
1042        # root strings.
1043        headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1044        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1045        for root in local_roots:
1046            if root not in roots_map:
1047                roots_map[root] = self._KeepService(
1048                    root, self._user_agent_pool,
1049                    upload_counter=self.upload_counter,
1050                    download_counter=self.download_counter,
1051                    headers=headers,
1052                    insecure=self.insecure)
1053        return local_roots
1054
1055    @staticmethod
1056    def _check_loop_result(result):
1057        # KeepClient RetryLoops should save results as a 2-tuple: the
1058        # actual result of the request, and the number of servers available
1059        # to receive the request this round.
1060        # This method returns True if there's a real result, False if
1061        # there are no more servers available, otherwise None.
1062        if isinstance(result, Exception):
1063            return None
1064        result, tried_server_count = result
1065        if (result is not None) and (result is not False):
1066            return True
1067        elif tried_server_count < 1:
1068            _logger.info("No more Keep services to try; giving up")
1069            return False
1070        else:
1071            return None
1072
1073    def get_from_cache(self, loc_s):
1074        """Fetch a block only if is in the cache, otherwise return None."""
1075        locator = KeepLocator(loc_s)
1076        slot = self.block_cache.get(locator.md5sum)
1077        if slot is not None and slot.ready.is_set():
1078            return slot.get()
1079        else:
1080            return None
1081
1082    def refresh_signature(self, loc):
1083        """Ask Keep to get the remote block and return its local signature"""
1084        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1085        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1086
1087    @retry.retry_method
1088    def head(self, loc_s, **kwargs):
1089        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1090
1091    @retry.retry_method
1092    def get(self, loc_s, **kwargs):
1093        return self._get_or_head(loc_s, method="GET", **kwargs)
1094
1095    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1096        """Get data from Keep.
1097
1098        This method fetches one or more blocks of data from Keep.  It
1099        sends a request each Keep service registered with the API
1100        server (or the proxy provided when this client was
1101        instantiated), then each service named in location hints, in
1102        sequence.  As soon as one service provides the data, it's
1103        returned.
1104
1105        Arguments:
1106        * loc_s: A string of one or more comma-separated locators to fetch.
1107          This method returns the concatenation of these blocks.
1108        * num_retries: The number of times to retry GET requests to
1109          *each* Keep server if it returns temporary failures, with
1110          exponential backoff.  Note that, in each loop, the method may try
1111          to fetch data from every available Keep service, along with any
1112          that are named in location hints in the locator.  The default value
1113          is set when the KeepClient is initialized.
1114        """
1115        if ',' in loc_s:
1116            return ''.join(self.get(x) for x in loc_s.split(','))
1117
1118        self.get_counter.add(1)
1119
1120        request_id = (request_id or
1121                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1122                      arvados.util.new_request_id())
1123        if headers is None:
1124            headers = {}
1125        headers['X-Request-Id'] = request_id
1126
1127        slot = None
1128        blob = None
1129        try:
1130            locator = KeepLocator(loc_s)
1131            if method == "GET":
1132                while slot is None:
1133                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1134                    if first:
1135                        # Fresh and empty "first time it is used" slot
1136                        break
1137                    if prefetch:
1138                        # this is request for a prefetch to fill in
1139                        # the cache, don't need to wait for the
1140                        # result, so if it is already in flight return
1141                        # immediately.  Clear 'slot' to prevent
1142                        # finally block from calling slot.set()
1143                        if slot.ready.is_set():
1144                            slot.get()
1145                        slot = None
1146                        return None
1147
1148                    blob = slot.get()
1149                    if blob is not None:
1150                        self.hits_counter.add(1)
1151                        return blob
1152
1153                    # If blob is None, this means either
1154                    #
1155                    # (a) another thread was fetching this block and
1156                    # failed with an error or
1157                    #
1158                    # (b) cache thrashing caused the slot to be
1159                    # evicted (content set to None) by another thread
1160                    # between the call to reserve_cache() and get().
1161                    #
1162                    # We'll handle these cases by reserving a new slot
1163                    # and then doing a full GET request.
1164                    slot = None
1165
1166            self.misses_counter.add(1)
1167
1168            # If the locator has hints specifying a prefix (indicating a
1169            # remote keepproxy) or the UUID of a local gateway service,
1170            # read data from the indicated service(s) instead of the usual
1171            # list of local disk services.
1172            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1173                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1174            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1175                               for hint in locator.hints if (
1176                                       hint.startswith('K@') and
1177                                       len(hint) == 29 and
1178                                       self._gateway_services.get(hint[2:])
1179                                       )])
1180            # Map root URLs to their _KeepService objects.
1181            roots_map = {
1182                root: self._KeepService(root, self._user_agent_pool,
1183                                       upload_counter=self.upload_counter,
1184                                       download_counter=self.download_counter,
1185                                       headers=headers,
1186                                       insecure=self.insecure)
1187                for root in hint_roots
1188            }
1189
1190            # See #3147 for a discussion of the loop implementation.  Highlights:
1191            # * Refresh the list of Keep services after each failure, in case
1192            #   it's being updated.
1193            # * Retry until we succeed, we're out of retries, or every available
1194            #   service has returned permanent failure.
1195            sorted_roots = []
1196            roots_map = {}
1197            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1198                                   backoff_start=2)
1199            for tries_left in loop:
1200                try:
1201                    sorted_roots = self.map_new_services(
1202                        roots_map, locator,
1203                        force_rebuild=(tries_left < num_retries),
1204                        need_writable=False,
1205                        headers=headers)
1206                except Exception as error:
1207                    loop.save_result(error)
1208                    continue
1209
1210                # Query _KeepService objects that haven't returned
1211                # permanent failure, in our specified shuffle order.
1212                services_to_try = [roots_map[root]
1213                                   for root in sorted_roots
1214                                   if roots_map[root].usable()]
1215                for keep_service in services_to_try:
1216                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1217                    if blob is not None:
1218                        break
1219                loop.save_result((blob, len(services_to_try)))
1220
1221            # Always cache the result, then return it if we succeeded.
1222            if loop.success():
1223                return blob
1224        finally:
1225            if slot is not None:
1226                self.block_cache.set(slot, blob)
1227
1228        # Q: Including 403 is necessary for the Keep tests to continue
1229        # passing, but maybe they should expect KeepReadError instead?
1230        not_founds = sum(1 for key in sorted_roots
1231                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1232        service_errors = ((key, roots_map[key].last_result()['error'])
1233                          for key in sorted_roots)
1234        if not roots_map:
1235            raise arvados.errors.KeepReadError(
1236                "[{}] failed to read {}: no Keep services available ({})".format(
1237                    request_id, loc_s, loop.last_result()))
1238        elif not_founds == len(sorted_roots):
1239            raise arvados.errors.NotFoundError(
1240                "[{}] {} not found".format(request_id, loc_s), service_errors)
1241        else:
1242            raise arvados.errors.KeepReadError(
1243                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1244
1245    @retry.retry_method
1246    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1247        """Save data in Keep.
1248
1249        This method will get a list of Keep services from the API server, and
1250        send the data to each one simultaneously in a new thread.  Once the
1251        uploads are finished, if enough copies are saved, this method returns
1252        the most recent HTTP response body.  If requests fail to upload
1253        enough copies, this method raises KeepWriteError.
1254
1255        Arguments:
1256        * data: The string of data to upload.
1257        * copies: The number of copies that the user requires be saved.
1258          Default 2.
1259        * num_retries: The number of times to retry PUT requests to
1260          *each* Keep server if it returns temporary failures, with
1261          exponential backoff.  The default value is set when the
1262          KeepClient is initialized.
1263        * classes: An optional list of storage class names where copies should
1264          be written.
1265        """
1266
1267        classes = classes or self._default_classes
1268
1269        if not isinstance(data, bytes):
1270            data = data.encode()
1271
1272        self.put_counter.add(1)
1273
1274        data_hash = hashlib.md5(data).hexdigest()
1275        loc_s = data_hash + '+' + str(len(data))
1276        if copies < 1:
1277            return loc_s
1278        locator = KeepLocator(loc_s)
1279
1280        request_id = (request_id or
1281                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1282                      arvados.util.new_request_id())
1283        headers = {
1284            'X-Request-Id': request_id,
1285            'X-Keep-Desired-Replicas': str(copies),
1286        }
1287        roots_map = {}
1288        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1289                               backoff_start=2)
1290        done_copies = 0
1291        done_classes = []
1292        for tries_left in loop:
1293            try:
1294                sorted_roots = self.map_new_services(
1295                    roots_map, locator,
1296                    force_rebuild=(tries_left < num_retries),
1297                    need_writable=True,
1298                    headers=headers)
1299            except Exception as error:
1300                loop.save_result(error)
1301                continue
1302
1303            pending_classes = []
1304            if done_classes is not None:
1305                pending_classes = list(set(classes) - set(done_classes))
1306            writer_pool = KeepClient._KeepWriterThreadPool(
1307                data=data,
1308                data_hash=data_hash,
1309                copies=copies - done_copies,
1310                max_service_replicas=self.max_replicas_per_service,
1311                timeout=self.current_timeout(num_retries - tries_left),
1312                classes=pending_classes,
1313            )
1314            for service_root, ks in [(root, roots_map[root])
1315                                     for root in sorted_roots]:
1316                if ks.finished():
1317                    continue
1318                writer_pool.add_task(ks, service_root)
1319            writer_pool.join()
1320            pool_copies, pool_classes = writer_pool.done()
1321            done_copies += pool_copies
1322            if (done_classes is not None) and (pool_classes is not None):
1323                done_classes += pool_classes
1324                loop.save_result(
1325                    (done_copies >= copies and set(done_classes) == set(classes),
1326                    writer_pool.total_task_nr))
1327            else:
1328                # Old keepstore contacted without storage classes support:
1329                # success is determined only by successful copies.
1330                #
1331                # Disable storage classes tracking from this point forward.
1332                if not self._storage_classes_unsupported_warning:
1333                    self._storage_classes_unsupported_warning = True
1334                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1335                done_classes = None
1336                loop.save_result(
1337                    (done_copies >= copies, writer_pool.total_task_nr))
1338
1339        if loop.success():
1340            return writer_pool.response()
1341        if not roots_map:
1342            raise arvados.errors.KeepWriteError(
1343                "[{}] failed to write {}: no Keep services available ({})".format(
1344                    request_id, data_hash, loop.last_result()))
1345        else:
1346            service_errors = ((key, roots_map[key].last_result()['error'])
1347                              for key in sorted_roots
1348                              if roots_map[key].last_result()['error'])
1349            raise arvados.errors.KeepWriteError(
1350                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1351                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1352
1353    def _block_prefetch_worker(self):
1354        """The background downloader thread."""
1355        while True:
1356            try:
1357                b = self._prefetch_queue.get()
1358                if b is None:
1359                    return
1360                self.get(b, prefetch=True)
1361            except Exception:
1362                _logger.exception("Exception doing block prefetch")
1363
1364    def _start_prefetch_threads(self):
1365        if self._prefetch_threads is None:
1366            with self.lock:
1367                if self._prefetch_threads is not None:
1368                    return
1369                self._prefetch_queue = queue.Queue()
1370                self._prefetch_threads = []
1371                for i in range(0, self.num_prefetch_threads):
1372                    thread = threading.Thread(target=self._block_prefetch_worker)
1373                    self._prefetch_threads.append(thread)
1374                    thread.daemon = True
1375                    thread.start()
1376
1377    def block_prefetch(self, locator):
1378        """
1379        This relies on the fact that KeepClient implements a block cache,
1380        so repeated requests for the same block will not result in repeated
1381        downloads (unless the block is evicted from the cache.)  This method
1382        does not block.
1383        """
1384
1385        if self.block_cache.get(locator) is not None:
1386            return
1387
1388        self._start_prefetch_threads()
1389        self._prefetch_queue.put(locator)
1390
1391    def stop_prefetch_threads(self):
1392        with self.lock:
1393            if self._prefetch_threads is not None:
1394                for t in self._prefetch_threads:
1395                    self._prefetch_queue.put(None)
1396                for t in self._prefetch_threads:
1397                    t.join()
1398            self._prefetch_threads = None
1399            self._prefetch_queue = None
1400
1401    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1402        """A stub for put().
1403
1404        This method is used in place of the real put() method when
1405        using local storage (see constructor's local_store argument).
1406
1407        copies and num_retries arguments are ignored: they are here
1408        only for the sake of offering the same call signature as
1409        put().
1410
1411        Data stored this way can be retrieved via local_store_get().
1412        """
1413        md5 = hashlib.md5(data).hexdigest()
1414        locator = '%s+%d' % (md5, len(data))
1415        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1416            f.write(data)
1417        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1418                  os.path.join(self.local_store, md5))
1419        return locator
1420
1421    def local_store_get(self, loc_s, num_retries=None):
1422        """Companion to local_store_put()."""
1423        try:
1424            locator = KeepLocator(loc_s)
1425        except ValueError:
1426            raise arvados.errors.NotFoundError(
1427                "Invalid data locator: '%s'" % loc_s)
1428        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1429            return b''
1430        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1431            return f.read()
1432
1433    def local_store_head(self, loc_s, num_retries=None):
1434        """Companion to local_store_put()."""
1435        try:
1436            locator = KeepLocator(loc_s)
1437        except ValueError:
1438            raise arvados.errors.NotFoundError(
1439                "Invalid data locator: '%s'" % loc_s)
1440        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1441            return True
1442        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1443            return True
global_client_object = None
class KeepLocator:
 51class KeepLocator(object):
 52    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
 53    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 54
 55    def __init__(self, locator_str):
 56        self.hints = []
 57        self._perm_sig = None
 58        self._perm_expiry = None
 59        pieces = iter(locator_str.split('+'))
 60        self.md5sum = next(pieces)
 61        try:
 62            self.size = int(next(pieces))
 63        except StopIteration:
 64            self.size = None
 65        for hint in pieces:
 66            if self.HINT_RE.match(hint) is None:
 67                raise ValueError("invalid hint format: {}".format(hint))
 68            elif hint.startswith('A'):
 69                self.parse_permission_hint(hint)
 70            else:
 71                self.hints.append(hint)
 72
 73    def __str__(self):
 74        return '+'.join(
 75            str(s)
 76            for s in [self.md5sum, self.size,
 77                      self.permission_hint()] + self.hints
 78            if s is not None)
 79
 80    def stripped(self):
 81        if self.size is not None:
 82            return "%s+%i" % (self.md5sum, self.size)
 83        else:
 84            return self.md5sum
 85
 86    def _make_hex_prop(name, length):
 87        # Build and return a new property with the given name that
 88        # must be a hex string of the given length.
 89        data_name = '_{}'.format(name)
 90        def getter(self):
 91            return getattr(self, data_name)
 92        def setter(self, hex_str):
 93            if not arvados.util.is_hex(hex_str, length):
 94                raise ValueError("{} is not a {}-digit hex string: {!r}".
 95                                 format(name, length, hex_str))
 96            setattr(self, data_name, hex_str)
 97        return property(getter, setter)
 98
 99    md5sum = _make_hex_prop('md5sum', 32)
100    perm_sig = _make_hex_prop('perm_sig', 40)
101
102    @property
103    def perm_expiry(self):
104        return self._perm_expiry
105
106    @perm_expiry.setter
107    def perm_expiry(self, value):
108        if not arvados.util.is_hex(value, 1, 8):
109            raise ValueError(
110                "permission timestamp must be a hex Unix timestamp: {}".
111                format(value))
112        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113
114    def permission_hint(self):
115        data = [self.perm_sig, self.perm_expiry]
116        if None in data:
117            return None
118        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119        return "A{}@{:08x}".format(*data)
120
121    def parse_permission_hint(self, s):
122        try:
123            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124        except IndexError:
125            raise ValueError("bad permission hint {}".format(s))
126
127    def permission_expired(self, as_of_dt=None):
128        if self.perm_expiry is None:
129            return False
130        elif as_of_dt is None:
131            as_of_dt = datetime.datetime.now()
132        return self.perm_expiry <= as_of_dt
KeepLocator(locator_str)
55    def __init__(self, locator_str):
56        self.hints = []
57        self._perm_sig = None
58        self._perm_expiry = None
59        pieces = iter(locator_str.split('+'))
60        self.md5sum = next(pieces)
61        try:
62            self.size = int(next(pieces))
63        except StopIteration:
64            self.size = None
65        for hint in pieces:
66            if self.HINT_RE.match(hint) is None:
67                raise ValueError("invalid hint format: {}".format(hint))
68            elif hint.startswith('A'):
69                self.parse_permission_hint(hint)
70            else:
71                self.hints.append(hint)
EPOCH_DATETIME = datetime.datetime(1970, 1, 1, 0, 0)
HINT_RE = re.compile('^[A-Z][A-Za-z0-9@_-]+$')
hints
md5sum
90        def getter(self):
91            return getattr(self, data_name)
def stripped(self):
80    def stripped(self):
81        if self.size is not None:
82            return "%s+%i" % (self.md5sum, self.size)
83        else:
84            return self.md5sum
perm_sig
90        def getter(self):
91            return getattr(self, data_name)
perm_expiry
102    @property
103    def perm_expiry(self):
104        return self._perm_expiry
def permission_hint(self):
114    def permission_hint(self):
115        data = [self.perm_sig, self.perm_expiry]
116        if None in data:
117            return None
118        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119        return "A{}@{:08x}".format(*data)
def parse_permission_hint(self, s):
121    def parse_permission_hint(self, s):
122        try:
123            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124        except IndexError:
125            raise ValueError("bad permission hint {}".format(s))
def permission_expired(self, as_of_dt=None):
127    def permission_expired(self, as_of_dt=None):
128        if self.perm_expiry is None:
129            return False
130        elif as_of_dt is None:
131            as_of_dt = datetime.datetime.now()
132        return self.perm_expiry <= as_of_dt
class KeepBlockCache:
135class KeepBlockCache(object):
136    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
137        self.cache_max = cache_max
138        self._cache = collections.OrderedDict()
139        self._cache_lock = threading.Lock()
140        self._max_slots = max_slots
141        self._disk_cache = disk_cache
142        self._disk_cache_dir = disk_cache_dir
143        self._cache_updating = threading.Condition(self._cache_lock)
144
145        if self._disk_cache and self._disk_cache_dir is None:
146            self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
147
148        if self._max_slots == 0:
149            if self._disk_cache:
150                # Each block uses two file descriptors, one used to
151                # open it initially and hold the flock(), and a second
152                # hidden one used by mmap().
153                #
154                # Set max slots to 1/8 of maximum file handles.  This
155                # means we'll use at most 1/4 of total file handles.
156                #
157                # NOFILE typically defaults to 1024 on Linux so this
158                # is 128 slots (256 file handles), which means we can
159                # cache up to 8 GiB of 64 MiB blocks.  This leaves
160                # 768 file handles for sockets and other stuff.
161                #
162                # When we want the ability to have more cache (e.g. in
163                # arv-mount) we'll increase rlimit before calling
164                # this.
165                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
166            else:
167                # RAM cache slots
168                self._max_slots = 512
169
170        if self.cache_max == 0:
171            if self._disk_cache:
172                fs = os.statvfs(self._disk_cache_dir)
173                # Calculation of available space incorporates existing cache usage
174                existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
175                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
176                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177                # pick smallest of:
178                # 10% of total disk size
179                # 25% of available space
180                # max_slots * 64 MiB
181                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
182            else:
183                # 256 MiB in RAM
184                self.cache_max = (256 * 1024 * 1024)
185
186        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
187
188        self.cache_total = 0
189        if self._disk_cache:
190            self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
191            for slot in self._cache.values():
192                self.cache_total += slot.size()
193            self.cap_cache()
194
195    class _CacheSlot:
196        __slots__ = ("locator", "ready", "content")
197
198        def __init__(self, locator):
199            self.locator = locator
200            self.ready = threading.Event()
201            self.content = None
202
203        def get(self):
204            self.ready.wait()
205            return self.content
206
207        def set(self, value):
208            if self.content is not None:
209                return False
210            self.content = value
211            self.ready.set()
212            return True
213
214        def size(self):
215            if self.content is None:
216                return 0
217            else:
218                return len(self.content)
219
220        def evict(self):
221            self.content = None
222
223
224    def _resize_cache(self, cache_max, max_slots):
225        # Try and make sure the contents of the cache do not exceed
226        # the supplied maximums.
227
228        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
229            return
230
231        _evict_candidates = collections.deque(self._cache.values())
232        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
233            slot = _evict_candidates.popleft()
234            if not slot.ready.is_set():
235                continue
236
237            sz = slot.size()
238            slot.evict()
239            self.cache_total -= sz
240            del self._cache[slot.locator]
241
242
243    def cap_cache(self):
244        '''Cap the cache size to self.cache_max'''
245        with self._cache_updating:
246            self._resize_cache(self.cache_max, self._max_slots)
247            self._cache_updating.notify_all()
248
249    def _get(self, locator):
250        # Test if the locator is already in the cache
251        if locator in self._cache:
252            n = self._cache[locator]
253            if n.ready.is_set() and n.content is None:
254                del self._cache[n.locator]
255                return None
256            self._cache.move_to_end(locator)
257            return n
258        if self._disk_cache:
259            # see if it exists on disk
260            n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
261            if n is not None:
262                self._cache[n.locator] = n
263                self.cache_total += n.size()
264                return n
265        return None
266
267    def get(self, locator):
268        with self._cache_lock:
269            return self._get(locator)
270
271    def reserve_cache(self, locator):
272        '''Reserve a cache slot for the specified locator,
273        or return the existing slot.'''
274        with self._cache_updating:
275            n = self._get(locator)
276            if n:
277                return n, False
278            else:
279                # Add a new cache slot for the locator
280                self._resize_cache(self.cache_max, self._max_slots-1)
281                while len(self._cache) >= self._max_slots:
282                    # If there isn't a slot available, need to wait
283                    # for something to happen that releases one of the
284                    # cache slots.  Idle for 200 ms or woken up by
285                    # another thread
286                    self._cache_updating.wait(timeout=0.2)
287                    self._resize_cache(self.cache_max, self._max_slots-1)
288
289                if self._disk_cache:
290                    n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291                else:
292                    n = KeepBlockCache._CacheSlot(locator)
293                self._cache[n.locator] = n
294                return n, True
295
296    def set(self, slot, blob):
297        try:
298            if slot.set(blob):
299                self.cache_total += slot.size()
300            return
301        except OSError as e:
302            if e.errno == errno.ENOMEM:
303                # Reduce max slots to current - 4, cap cache and retry
304                with self._cache_lock:
305                    self._max_slots = max(4, len(self._cache) - 4)
306            elif e.errno == errno.ENOSPC:
307                # Reduce disk max space to current - 256 MiB, cap cache and retry
308                with self._cache_lock:
309                    sm = sum(st.size() for st in self._cache.values())
310                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
311            elif e.errno == errno.ENODEV:
312                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
313        except Exception as e:
314            pass
315        finally:
316            # Check if we should evict things from the cache.  Either
317            # because we added a new thing or there was an error and
318            # we possibly adjusted the limits down, so we might need
319            # to push something out.
320            self.cap_cache()
321
322        try:
323            # Only gets here if there was an error the first time. The
324            # exception handler adjusts limits downward in some cases
325            # to free up resources, which would make the operation
326            # succeed.
327            if slot.set(blob):
328                self.cache_total += slot.size()
329        except Exception as e:
330            # It failed again.  Give up.
331            slot.set(None)
332            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
333
334        self.cap_cache()
KeepBlockCache(cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None)
136    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
137        self.cache_max = cache_max
138        self._cache = collections.OrderedDict()
139        self._cache_lock = threading.Lock()
140        self._max_slots = max_slots
141        self._disk_cache = disk_cache
142        self._disk_cache_dir = disk_cache_dir
143        self._cache_updating = threading.Condition(self._cache_lock)
144
145        if self._disk_cache and self._disk_cache_dir is None:
146            self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
147
148        if self._max_slots == 0:
149            if self._disk_cache:
150                # Each block uses two file descriptors, one used to
151                # open it initially and hold the flock(), and a second
152                # hidden one used by mmap().
153                #
154                # Set max slots to 1/8 of maximum file handles.  This
155                # means we'll use at most 1/4 of total file handles.
156                #
157                # NOFILE typically defaults to 1024 on Linux so this
158                # is 128 slots (256 file handles), which means we can
159                # cache up to 8 GiB of 64 MiB blocks.  This leaves
160                # 768 file handles for sockets and other stuff.
161                #
162                # When we want the ability to have more cache (e.g. in
163                # arv-mount) we'll increase rlimit before calling
164                # this.
165                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
166            else:
167                # RAM cache slots
168                self._max_slots = 512
169
170        if self.cache_max == 0:
171            if self._disk_cache:
172                fs = os.statvfs(self._disk_cache_dir)
173                # Calculation of available space incorporates existing cache usage
174                existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
175                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
176                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177                # pick smallest of:
178                # 10% of total disk size
179                # 25% of available space
180                # max_slots * 64 MiB
181                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
182            else:
183                # 256 MiB in RAM
184                self.cache_max = (256 * 1024 * 1024)
185
186        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
187
188        self.cache_total = 0
189        if self._disk_cache:
190            self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
191            for slot in self._cache.values():
192                self.cache_total += slot.size()
193            self.cap_cache()
cache_max
cache_total
def cap_cache(self):
243    def cap_cache(self):
244        '''Cap the cache size to self.cache_max'''
245        with self._cache_updating:
246            self._resize_cache(self.cache_max, self._max_slots)
247            self._cache_updating.notify_all()

Cap the cache size to self.cache_max

def get(self, locator):
267    def get(self, locator):
268        with self._cache_lock:
269            return self._get(locator)
def reserve_cache(self, locator):
271    def reserve_cache(self, locator):
272        '''Reserve a cache slot for the specified locator,
273        or return the existing slot.'''
274        with self._cache_updating:
275            n = self._get(locator)
276            if n:
277                return n, False
278            else:
279                # Add a new cache slot for the locator
280                self._resize_cache(self.cache_max, self._max_slots-1)
281                while len(self._cache) >= self._max_slots:
282                    # If there isn't a slot available, need to wait
283                    # for something to happen that releases one of the
284                    # cache slots.  Idle for 200 ms or woken up by
285                    # another thread
286                    self._cache_updating.wait(timeout=0.2)
287                    self._resize_cache(self.cache_max, self._max_slots-1)
288
289                if self._disk_cache:
290                    n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291                else:
292                    n = KeepBlockCache._CacheSlot(locator)
293                self._cache[n.locator] = n
294                return n, True

Reserve a cache slot for the specified locator, or return the existing slot.

def set(self, slot, blob):
296    def set(self, slot, blob):
297        try:
298            if slot.set(blob):
299                self.cache_total += slot.size()
300            return
301        except OSError as e:
302            if e.errno == errno.ENOMEM:
303                # Reduce max slots to current - 4, cap cache and retry
304                with self._cache_lock:
305                    self._max_slots = max(4, len(self._cache) - 4)
306            elif e.errno == errno.ENOSPC:
307                # Reduce disk max space to current - 256 MiB, cap cache and retry
308                with self._cache_lock:
309                    sm = sum(st.size() for st in self._cache.values())
310                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
311            elif e.errno == errno.ENODEV:
312                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
313        except Exception as e:
314            pass
315        finally:
316            # Check if we should evict things from the cache.  Either
317            # because we added a new thing or there was an error and
318            # we possibly adjusted the limits down, so we might need
319            # to push something out.
320            self.cap_cache()
321
322        try:
323            # Only gets here if there was an error the first time. The
324            # exception handler adjusts limits downward in some cases
325            # to free up resources, which would make the operation
326            # succeed.
327            if slot.set(blob):
328                self.cache_total += slot.size()
329        except Exception as e:
330            # It failed again.  Give up.
331            slot.set(None)
332            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
333
334        self.cap_cache()
class KeepClient:
 351class KeepClient(object):
 352    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 353    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 354
 355    class _KeepService(PyCurlHelper):
 356        """Make requests to a single Keep service, and track results.
 357
 358        A _KeepService is intended to last long enough to perform one
 359        transaction (GET or PUT) against one Keep service. This can
 360        involve calling either get() or put() multiple times in order
 361        to retry after transient failures. However, calling both get()
 362        and put() on a single instance -- or using the same instance
 363        to access two different Keep services -- will not produce
 364        sensible behavior.
 365        """
 366
 367        HTTP_ERRORS = (
 368            socket.error,
 369            ssl.SSLError,
 370            arvados.errors.HttpError,
 371        )
 372
 373        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 374                     upload_counter=None,
 375                     download_counter=None,
 376                     headers={},
 377                     insecure=False):
 378            super().__init__()
 379            self.root = root
 380            self._user_agent_pool = user_agent_pool
 381            self._result = {'error': None}
 382            self._usable = True
 383            self._session = None
 384            self._socket = None
 385            self.get_headers = {'Accept': 'application/octet-stream'}
 386            self.get_headers.update(headers)
 387            self.put_headers = headers
 388            self.upload_counter = upload_counter
 389            self.download_counter = download_counter
 390            self.insecure = insecure
 391
 392        def usable(self):
 393            """Is it worth attempting a request?"""
 394            return self._usable
 395
 396        def finished(self):
 397            """Did the request succeed or encounter permanent failure?"""
 398            return self._result['error'] == False or not self._usable
 399
 400        def last_result(self):
 401            return self._result
 402
 403        def _get_user_agent(self):
 404            try:
 405                return self._user_agent_pool.get(block=False)
 406            except queue.Empty:
 407                return pycurl.Curl()
 408
 409        def _put_user_agent(self, ua):
 410            try:
 411                ua.reset()
 412                self._user_agent_pool.put(ua, block=False)
 413            except:
 414                ua.close()
 415
 416        def get(self, locator, method="GET", timeout=None):
 417            # locator is a KeepLocator object.
 418            url = self.root + str(locator)
 419            _logger.debug("Request: %s %s", method, url)
 420            curl = self._get_user_agent()
 421            ok = None
 422            try:
 423                with Timer() as t:
 424                    self._headers = {}
 425                    response_body = BytesIO()
 426                    curl.setopt(pycurl.NOSIGNAL, 1)
 427                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 428                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 429                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 430                    curl.setopt(pycurl.HTTPHEADER, [
 431                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 432                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 433                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 434                    if self.insecure:
 435                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 436                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 437                    else:
 438                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 439                    if method == "HEAD":
 440                        curl.setopt(pycurl.NOBODY, True)
 441                    else:
 442                        curl.setopt(pycurl.HTTPGET, True)
 443                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 444
 445                    try:
 446                        curl.perform()
 447                    except Exception as e:
 448                        raise arvados.errors.HttpError(0, str(e))
 449                    finally:
 450                        if self._socket:
 451                            self._socket.close()
 452                            self._socket = None
 453                    self._result = {
 454                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 455                        'body': response_body.getvalue(),
 456                        'headers': self._headers,
 457                        'error': False,
 458                    }
 459
 460                ok = retry.check_http_response_success(self._result['status_code'])
 461                if not ok:
 462                    self._result['error'] = arvados.errors.HttpError(
 463                        self._result['status_code'],
 464                        self._headers.get('x-status-line', 'Error'))
 465            except self.HTTP_ERRORS as e:
 466                self._result = {
 467                    'error': e,
 468                }
 469            self._usable = ok != False
 470            if self._result.get('status_code', None):
 471                # The client worked well enough to get an HTTP status
 472                # code, so presumably any problems are just on the
 473                # server side and it's OK to reuse the client.
 474                self._put_user_agent(curl)
 475            else:
 476                # Don't return this client to the pool, in case it's
 477                # broken.
 478                curl.close()
 479            if not ok:
 480                _logger.debug("Request fail: GET %s => %s: %s",
 481                              url, type(self._result['error']), str(self._result['error']))
 482                return None
 483            if method == "HEAD":
 484                _logger.info("HEAD %s: %s bytes",
 485                         self._result['status_code'],
 486                         self._result.get('content-length'))
 487                if self._result['headers'].get('x-keep-locator'):
 488                    # This is a response to a remote block copy request, return
 489                    # the local copy block locator.
 490                    return self._result['headers'].get('x-keep-locator')
 491                return True
 492
 493            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 494                         self._result['status_code'],
 495                         len(self._result['body']),
 496                         t.msecs,
 497                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 498
 499            if self.download_counter:
 500                self.download_counter.add(len(self._result['body']))
 501            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 502            if resp_md5 != locator.md5sum:
 503                _logger.warning("Checksum fail: md5(%s) = %s",
 504                                url, resp_md5)
 505                self._result['error'] = arvados.errors.HttpError(
 506                    0, 'Checksum fail')
 507                return None
 508            return self._result['body']
 509
 510        def put(self, hash_s, body, timeout=None, headers={}):
 511            put_headers = copy.copy(self.put_headers)
 512            put_headers.update(headers)
 513            url = self.root + hash_s
 514            _logger.debug("Request: PUT %s", url)
 515            curl = self._get_user_agent()
 516            ok = None
 517            try:
 518                with Timer() as t:
 519                    self._headers = {}
 520                    body_reader = BytesIO(body)
 521                    response_body = BytesIO()
 522                    curl.setopt(pycurl.NOSIGNAL, 1)
 523                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 524                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 525                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 526                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 527                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 528                    # response) instead of sending the request body immediately.
 529                    # This allows the server to reject the request if the request
 530                    # is invalid or the server is read-only, without waiting for
 531                    # the client to send the entire block.
 532                    curl.setopt(pycurl.UPLOAD, True)
 533                    curl.setopt(pycurl.INFILESIZE, len(body))
 534                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 535                    curl.setopt(pycurl.HTTPHEADER, [
 536                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 537                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 538                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 539                    if self.insecure:
 540                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 541                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 542                    else:
 543                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 544                    self._setcurltimeouts(curl, timeout)
 545                    try:
 546                        curl.perform()
 547                    except Exception as e:
 548                        raise arvados.errors.HttpError(0, str(e))
 549                    finally:
 550                        if self._socket:
 551                            self._socket.close()
 552                            self._socket = None
 553                    self._result = {
 554                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 555                        'body': response_body.getvalue().decode('utf-8'),
 556                        'headers': self._headers,
 557                        'error': False,
 558                    }
 559                ok = retry.check_http_response_success(self._result['status_code'])
 560                if not ok:
 561                    self._result['error'] = arvados.errors.HttpError(
 562                        self._result['status_code'],
 563                        self._headers.get('x-status-line', 'Error'))
 564            except self.HTTP_ERRORS as e:
 565                self._result = {
 566                    'error': e,
 567                }
 568            self._usable = ok != False # still usable if ok is True or None
 569            if self._result.get('status_code', None):
 570                # Client is functional. See comment in get().
 571                self._put_user_agent(curl)
 572            else:
 573                curl.close()
 574            if not ok:
 575                _logger.debug("Request fail: PUT %s => %s: %s",
 576                              url, type(self._result['error']), str(self._result['error']))
 577                return False
 578            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 579                         self._result['status_code'],
 580                         len(body),
 581                         t.msecs,
 582                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 583            if self.upload_counter:
 584                self.upload_counter.add(len(body))
 585            return True
 586
 587
 588    class _KeepWriterQueue(queue.Queue):
 589        def __init__(self, copies, classes=[]):
 590            queue.Queue.__init__(self) # Old-style superclass
 591            self.wanted_copies = copies
 592            self.wanted_storage_classes = classes
 593            self.successful_copies = 0
 594            self.confirmed_storage_classes = {}
 595            self.response = None
 596            self.storage_classes_tracking = True
 597            self.queue_data_lock = threading.RLock()
 598            self.pending_tries = max(copies, len(classes))
 599            self.pending_tries_notification = threading.Condition()
 600
 601        def write_success(self, response, replicas_nr, classes_confirmed):
 602            with self.queue_data_lock:
 603                self.successful_copies += replicas_nr
 604                if classes_confirmed is None:
 605                    self.storage_classes_tracking = False
 606                elif self.storage_classes_tracking:
 607                    for st_class, st_copies in classes_confirmed.items():
 608                        try:
 609                            self.confirmed_storage_classes[st_class] += st_copies
 610                        except KeyError:
 611                            self.confirmed_storage_classes[st_class] = st_copies
 612                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 613                self.response = response
 614            with self.pending_tries_notification:
 615                self.pending_tries_notification.notify_all()
 616
 617        def write_fail(self, ks):
 618            with self.pending_tries_notification:
 619                self.pending_tries += 1
 620                self.pending_tries_notification.notify()
 621
 622        def pending_copies(self):
 623            with self.queue_data_lock:
 624                return self.wanted_copies - self.successful_copies
 625
 626        def satisfied_classes(self):
 627            with self.queue_data_lock:
 628                if not self.storage_classes_tracking:
 629                    # Notifies disabled storage classes expectation to
 630                    # the outer loop.
 631                    return None
 632            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 633
 634        def pending_classes(self):
 635            with self.queue_data_lock:
 636                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 637                    return []
 638                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 639                for st_class, st_copies in self.confirmed_storage_classes.items():
 640                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 641                        unsatisfied_classes.remove(st_class)
 642                return unsatisfied_classes
 643
 644        def get_next_task(self):
 645            with self.pending_tries_notification:
 646                while True:
 647                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 648                        # This notify_all() is unnecessary --
 649                        # write_success() already called notify_all()
 650                        # when pending<1 became true, so it's not
 651                        # possible for any other thread to be in
 652                        # wait() now -- but it's cheap insurance
 653                        # against deadlock so we do it anyway:
 654                        self.pending_tries_notification.notify_all()
 655                        # Drain the queue and then raise Queue.Empty
 656                        while True:
 657                            self.get_nowait()
 658                            self.task_done()
 659                    elif self.pending_tries > 0:
 660                        service, service_root = self.get_nowait()
 661                        if service.finished():
 662                            self.task_done()
 663                            continue
 664                        self.pending_tries -= 1
 665                        return service, service_root
 666                    elif self.empty():
 667                        self.pending_tries_notification.notify_all()
 668                        raise queue.Empty
 669                    else:
 670                        self.pending_tries_notification.wait()
 671
 672
 673    class _KeepWriterThreadPool:
 674        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 675            self.total_task_nr = 0
 676            if (not max_service_replicas) or (max_service_replicas >= copies):
 677                num_threads = 1
 678            else:
 679                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 680            _logger.debug("Pool max threads is %d", num_threads)
 681            self.workers = []
 682            self.queue = KeepClient._KeepWriterQueue(copies, classes)
 683            # Create workers
 684            for _ in range(num_threads):
 685                w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
 686                self.workers.append(w)
 687
 688        def add_task(self, ks, service_root):
 689            self.queue.put((ks, service_root))
 690            self.total_task_nr += 1
 691
 692        def done(self):
 693            return self.queue.successful_copies, self.queue.satisfied_classes()
 694
 695        def join(self):
 696            # Start workers
 697            for worker in self.workers:
 698                worker.start()
 699            # Wait for finished work
 700            self.queue.join()
 701
 702        def response(self):
 703            return self.queue.response
 704
 705
 706    class _KeepWriterThread(threading.Thread):
 707        class TaskFailed(RuntimeError):
 708            """Exception for failed Keep writes
 709
 710            TODO: Move this class to the module top level and document it
 711
 712            @private
 713            """
 714
 715
 716        def __init__(self, queue, data, data_hash, timeout=None):
 717            super().__init__()
 718            self.timeout = timeout
 719            self.queue = queue
 720            self.data = data
 721            self.data_hash = data_hash
 722            self.daemon = True
 723
 724        def run(self):
 725            while True:
 726                try:
 727                    service, service_root = self.queue.get_next_task()
 728                except queue.Empty:
 729                    return
 730                try:
 731                    locator, copies, classes = self.do_task(service, service_root)
 732                except Exception as e:
 733                    if not isinstance(e, self.TaskFailed):
 734                        _logger.exception("Exception in _KeepWriterThread")
 735                    self.queue.write_fail(service)
 736                else:
 737                    self.queue.write_success(locator, copies, classes)
 738                finally:
 739                    self.queue.task_done()
 740
 741        def do_task(self, service, service_root):
 742            classes = self.queue.pending_classes()
 743            headers = {}
 744            if len(classes) > 0:
 745                classes.sort()
 746                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 747            success = bool(service.put(self.data_hash,
 748                                        self.data,
 749                                        timeout=self.timeout,
 750                                        headers=headers))
 751            result = service.last_result()
 752
 753            if not success:
 754                if result.get('status_code'):
 755                    _logger.debug("Request fail: PUT %s => %s %s",
 756                                  self.data_hash,
 757                                  result.get('status_code'),
 758                                  result.get('body'))
 759                raise self.TaskFailed()
 760
 761            _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
 762                          str(threading.current_thread()),
 763                          self.data_hash,
 764                          len(self.data),
 765                          service_root)
 766            try:
 767                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 768            except (KeyError, ValueError):
 769                replicas_stored = 1
 770
 771            classes_confirmed = {}
 772            try:
 773                scch = result['headers']['x-keep-storage-classes-confirmed']
 774                for confirmation in scch.replace(' ', '').split(','):
 775                    if '=' in confirmation:
 776                        stored_class, stored_copies = confirmation.split('=')[:2]
 777                        classes_confirmed[stored_class] = int(stored_copies)
 778            except (KeyError, ValueError):
 779                # Storage classes confirmed header missing or corrupt
 780                classes_confirmed = None
 781
 782            return result['body'].strip(), replicas_stored, classes_confirmed
 783
 784
 785    def __init__(self, api_client=None, proxy=None,
 786                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 787                 api_token=None, local_store=None, block_cache=None,
 788                 num_retries=10, session=None, num_prefetch_threads=None):
 789        """Initialize a new KeepClient.
 790
 791        Arguments:
 792        :api_client:
 793          The API client to use to find Keep services.  If not
 794          provided, KeepClient will build one from available Arvados
 795          configuration.
 796
 797        :proxy:
 798          If specified, this KeepClient will send requests to this Keep
 799          proxy.  Otherwise, KeepClient will fall back to the setting of the
 800          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 801          If you want to KeepClient does not use a proxy, pass in an empty
 802          string.
 803
 804        :timeout:
 805          The initial timeout (in seconds) for HTTP requests to Keep
 806          non-proxy servers.  A tuple of three floats is interpreted as
 807          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 808          will be aborted if the average traffic rate falls below
 809          minimum_bandwidth bytes per second over an interval of read_timeout
 810          seconds. Because timeouts are often a result of transient server
 811          load, the actual connection timeout will be increased by a factor
 812          of two on each retry.
 813          Default: (2, 256, 32768).
 814
 815        :proxy_timeout:
 816          The initial timeout (in seconds) for HTTP requests to
 817          Keep proxies. A tuple of three floats is interpreted as
 818          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 819          described above for adjusting connection timeouts on retry also
 820          applies.
 821          Default: (20, 256, 32768).
 822
 823        :api_token:
 824          If you're not using an API client, but only talking
 825          directly to a Keep proxy, this parameter specifies an API token
 826          to authenticate Keep requests.  It is an error to specify both
 827          api_client and api_token.  If you specify neither, KeepClient
 828          will use one available from the Arvados configuration.
 829
 830        :local_store:
 831          If specified, this KeepClient will bypass Keep
 832          services, and save data to the named directory.  If unspecified,
 833          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 834          environment variable.  If you want to ensure KeepClient does not
 835          use local storage, pass in an empty string.  This is primarily
 836          intended to mock a server for testing.
 837
 838        :num_retries:
 839          The default number of times to retry failed requests.
 840          This will be used as the default num_retries value when get() and
 841          put() are called.  Default 10.
 842        """
 843        self.lock = threading.Lock()
 844        if proxy is None:
 845            if config.get('ARVADOS_KEEP_SERVICES'):
 846                proxy = config.get('ARVADOS_KEEP_SERVICES')
 847            else:
 848                proxy = config.get('ARVADOS_KEEP_PROXY')
 849        if api_token is None:
 850            if api_client is None:
 851                api_token = config.get('ARVADOS_API_TOKEN')
 852            else:
 853                api_token = api_client.api_token
 854        elif api_client is not None:
 855            raise ValueError(
 856                "can't build KeepClient with both API client and token")
 857        if local_store is None:
 858            local_store = os.environ.get('KEEP_LOCAL_STORE')
 859
 860        if api_client is None:
 861            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 862        else:
 863            self.insecure = api_client.insecure
 864
 865        self.block_cache = block_cache if block_cache else KeepBlockCache()
 866        self.timeout = timeout
 867        self.proxy_timeout = proxy_timeout
 868        self._user_agent_pool = queue.LifoQueue()
 869        self.upload_counter = _Counter()
 870        self.download_counter = _Counter()
 871        self.put_counter = _Counter()
 872        self.get_counter = _Counter()
 873        self.hits_counter = _Counter()
 874        self.misses_counter = _Counter()
 875        self._storage_classes_unsupported_warning = False
 876        self._default_classes = []
 877        if num_prefetch_threads is not None:
 878            self.num_prefetch_threads = num_prefetch_threads
 879        else:
 880            self.num_prefetch_threads = 2
 881        self._prefetch_queue = None
 882        self._prefetch_threads = None
 883
 884        if local_store:
 885            self.local_store = local_store
 886            self.head = self.local_store_head
 887            self.get = self.local_store_get
 888            self.put = self.local_store_put
 889        else:
 890            self.num_retries = num_retries
 891            self.max_replicas_per_service = None
 892            if proxy:
 893                proxy_uris = proxy.split()
 894                for i in range(len(proxy_uris)):
 895                    if not proxy_uris[i].endswith('/'):
 896                        proxy_uris[i] += '/'
 897                    # URL validation
 898                    url = urllib.parse.urlparse(proxy_uris[i])
 899                    if not (url.scheme and url.netloc):
 900                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 901                self.api_token = api_token
 902                self._gateway_services = {}
 903                self._keep_services = [{
 904                    'uuid': "00000-bi6l4-%015d" % idx,
 905                    'service_type': 'proxy',
 906                    '_service_root': uri,
 907                    } for idx, uri in enumerate(proxy_uris)]
 908                self._writable_services = self._keep_services
 909                self.using_proxy = True
 910                self._static_services_list = True
 911            else:
 912                # It's important to avoid instantiating an API client
 913                # unless we actually need one, for testing's sake.
 914                if api_client is None:
 915                    api_client = arvados.api('v1')
 916                self.api_client = api_client
 917                self.api_token = api_client.api_token
 918                self._gateway_services = {}
 919                self._keep_services = None
 920                self._writable_services = None
 921                self.using_proxy = None
 922                self._static_services_list = False
 923                try:
 924                    self._default_classes = [
 925                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 926                except KeyError:
 927                    # We're talking to an old cluster
 928                    pass
 929
 930    def current_timeout(self, attempt_number):
 931        """Return the appropriate timeout to use for this client.
 932
 933        The proxy timeout setting if the backend service is currently a proxy,
 934        the regular timeout setting otherwise.  The `attempt_number` indicates
 935        how many times the operation has been tried already (starting from 0
 936        for the first try), and scales the connection timeout portion of the
 937        return value accordingly.
 938
 939        """
 940        # TODO(twp): the timeout should be a property of a
 941        # _KeepService, not a KeepClient. See #4488.
 942        t = self.proxy_timeout if self.using_proxy else self.timeout
 943        if len(t) == 2:
 944            return (t[0] * (1 << attempt_number), t[1])
 945        else:
 946            return (t[0] * (1 << attempt_number), t[1], t[2])
 947    def _any_nondisk_services(self, service_list):
 948        return any(ks.get('service_type', 'disk') != 'disk'
 949                   for ks in service_list)
 950
 951    def build_services_list(self, force_rebuild=False):
 952        if (self._static_services_list or
 953              (self._keep_services and not force_rebuild)):
 954            return
 955        with self.lock:
 956            try:
 957                keep_services = self.api_client.keep_services().accessible()
 958            except Exception:  # API server predates Keep services.
 959                keep_services = self.api_client.keep_disks().list()
 960
 961            # Gateway services are only used when specified by UUID,
 962            # so there's nothing to gain by filtering them by
 963            # service_type.
 964            self._gateway_services = {ks['uuid']: ks for ks in
 965                                      keep_services.execute()['items']}
 966            if not self._gateway_services:
 967                raise arvados.errors.NoKeepServersError()
 968
 969            # Precompute the base URI for each service.
 970            for r in self._gateway_services.values():
 971                host = r['service_host']
 972                if not host.startswith('[') and host.find(':') >= 0:
 973                    # IPv6 URIs must be formatted like http://[::1]:80/...
 974                    host = '[' + host + ']'
 975                r['_service_root'] = "{}://{}:{:d}/".format(
 976                    'https' if r['service_ssl_flag'] else 'http',
 977                    host,
 978                    r['service_port'])
 979
 980            _logger.debug(str(self._gateway_services))
 981            self._keep_services = [
 982                ks for ks in self._gateway_services.values()
 983                if not ks.get('service_type', '').startswith('gateway:')]
 984            self._writable_services = [ks for ks in self._keep_services
 985                                       if not ks.get('read_only')]
 986
 987            # For disk type services, max_replicas_per_service is 1
 988            # It is unknown (unlimited) for other service types.
 989            if self._any_nondisk_services(self._writable_services):
 990                self.max_replicas_per_service = None
 991            else:
 992                self.max_replicas_per_service = 1
 993
 994    def _service_weight(self, data_hash, service_uuid):
 995        """Compute the weight of a Keep service endpoint for a data
 996        block with a known hash.
 997
 998        The weight is md5(h + u) where u is the last 15 characters of
 999        the service endpoint's UUID.
1000        """
1001        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1002
1003    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1004        """Return an array of Keep service endpoints, in the order in
1005        which they should be probed when reading or writing data with
1006        the given hash+hints.
1007        """
1008        self.build_services_list(force_rebuild)
1009
1010        sorted_roots = []
1011        # Use the services indicated by the given +K@... remote
1012        # service hints, if any are present and can be resolved to a
1013        # URI.
1014        for hint in locator.hints:
1015            if hint.startswith('K@'):
1016                if len(hint) == 7:
1017                    sorted_roots.append(
1018                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1019                elif len(hint) == 29:
1020                    svc = self._gateway_services.get(hint[2:])
1021                    if svc:
1022                        sorted_roots.append(svc['_service_root'])
1023
1024        # Sort the available local services by weight (heaviest first)
1025        # for this locator, and return their service_roots (base URIs)
1026        # in that order.
1027        use_services = self._keep_services
1028        if need_writable:
1029            use_services = self._writable_services
1030        self.using_proxy = self._any_nondisk_services(use_services)
1031        sorted_roots.extend([
1032            svc['_service_root'] for svc in sorted(
1033                use_services,
1034                reverse=True,
1035                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1036        _logger.debug("{}: {}".format(locator, sorted_roots))
1037        return sorted_roots
1038
1039    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1040        # roots_map is a dictionary, mapping Keep service root strings
1041        # to _KeepService objects.  Poll for Keep services, and add any
1042        # new ones to roots_map.  Return the current list of local
1043        # root strings.
1044        headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1045        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1046        for root in local_roots:
1047            if root not in roots_map:
1048                roots_map[root] = self._KeepService(
1049                    root, self._user_agent_pool,
1050                    upload_counter=self.upload_counter,
1051                    download_counter=self.download_counter,
1052                    headers=headers,
1053                    insecure=self.insecure)
1054        return local_roots
1055
1056    @staticmethod
1057    def _check_loop_result(result):
1058        # KeepClient RetryLoops should save results as a 2-tuple: the
1059        # actual result of the request, and the number of servers available
1060        # to receive the request this round.
1061        # This method returns True if there's a real result, False if
1062        # there are no more servers available, otherwise None.
1063        if isinstance(result, Exception):
1064            return None
1065        result, tried_server_count = result
1066        if (result is not None) and (result is not False):
1067            return True
1068        elif tried_server_count < 1:
1069            _logger.info("No more Keep services to try; giving up")
1070            return False
1071        else:
1072            return None
1073
1074    def get_from_cache(self, loc_s):
1075        """Fetch a block only if is in the cache, otherwise return None."""
1076        locator = KeepLocator(loc_s)
1077        slot = self.block_cache.get(locator.md5sum)
1078        if slot is not None and slot.ready.is_set():
1079            return slot.get()
1080        else:
1081            return None
1082
1083    def refresh_signature(self, loc):
1084        """Ask Keep to get the remote block and return its local signature"""
1085        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1086        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1087
1088    @retry.retry_method
1089    def head(self, loc_s, **kwargs):
1090        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1091
1092    @retry.retry_method
1093    def get(self, loc_s, **kwargs):
1094        return self._get_or_head(loc_s, method="GET", **kwargs)
1095
1096    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1097        """Get data from Keep.
1098
1099        This method fetches one or more blocks of data from Keep.  It
1100        sends a request each Keep service registered with the API
1101        server (or the proxy provided when this client was
1102        instantiated), then each service named in location hints, in
1103        sequence.  As soon as one service provides the data, it's
1104        returned.
1105
1106        Arguments:
1107        * loc_s: A string of one or more comma-separated locators to fetch.
1108          This method returns the concatenation of these blocks.
1109        * num_retries: The number of times to retry GET requests to
1110          *each* Keep server if it returns temporary failures, with
1111          exponential backoff.  Note that, in each loop, the method may try
1112          to fetch data from every available Keep service, along with any
1113          that are named in location hints in the locator.  The default value
1114          is set when the KeepClient is initialized.
1115        """
1116        if ',' in loc_s:
1117            return ''.join(self.get(x) for x in loc_s.split(','))
1118
1119        self.get_counter.add(1)
1120
1121        request_id = (request_id or
1122                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1123                      arvados.util.new_request_id())
1124        if headers is None:
1125            headers = {}
1126        headers['X-Request-Id'] = request_id
1127
1128        slot = None
1129        blob = None
1130        try:
1131            locator = KeepLocator(loc_s)
1132            if method == "GET":
1133                while slot is None:
1134                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1135                    if first:
1136                        # Fresh and empty "first time it is used" slot
1137                        break
1138                    if prefetch:
1139                        # this is request for a prefetch to fill in
1140                        # the cache, don't need to wait for the
1141                        # result, so if it is already in flight return
1142                        # immediately.  Clear 'slot' to prevent
1143                        # finally block from calling slot.set()
1144                        if slot.ready.is_set():
1145                            slot.get()
1146                        slot = None
1147                        return None
1148
1149                    blob = slot.get()
1150                    if blob is not None:
1151                        self.hits_counter.add(1)
1152                        return blob
1153
1154                    # If blob is None, this means either
1155                    #
1156                    # (a) another thread was fetching this block and
1157                    # failed with an error or
1158                    #
1159                    # (b) cache thrashing caused the slot to be
1160                    # evicted (content set to None) by another thread
1161                    # between the call to reserve_cache() and get().
1162                    #
1163                    # We'll handle these cases by reserving a new slot
1164                    # and then doing a full GET request.
1165                    slot = None
1166
1167            self.misses_counter.add(1)
1168
1169            # If the locator has hints specifying a prefix (indicating a
1170            # remote keepproxy) or the UUID of a local gateway service,
1171            # read data from the indicated service(s) instead of the usual
1172            # list of local disk services.
1173            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1174                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1175            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1176                               for hint in locator.hints if (
1177                                       hint.startswith('K@') and
1178                                       len(hint) == 29 and
1179                                       self._gateway_services.get(hint[2:])
1180                                       )])
1181            # Map root URLs to their _KeepService objects.
1182            roots_map = {
1183                root: self._KeepService(root, self._user_agent_pool,
1184                                       upload_counter=self.upload_counter,
1185                                       download_counter=self.download_counter,
1186                                       headers=headers,
1187                                       insecure=self.insecure)
1188                for root in hint_roots
1189            }
1190
1191            # See #3147 for a discussion of the loop implementation.  Highlights:
1192            # * Refresh the list of Keep services after each failure, in case
1193            #   it's being updated.
1194            # * Retry until we succeed, we're out of retries, or every available
1195            #   service has returned permanent failure.
1196            sorted_roots = []
1197            roots_map = {}
1198            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1199                                   backoff_start=2)
1200            for tries_left in loop:
1201                try:
1202                    sorted_roots = self.map_new_services(
1203                        roots_map, locator,
1204                        force_rebuild=(tries_left < num_retries),
1205                        need_writable=False,
1206                        headers=headers)
1207                except Exception as error:
1208                    loop.save_result(error)
1209                    continue
1210
1211                # Query _KeepService objects that haven't returned
1212                # permanent failure, in our specified shuffle order.
1213                services_to_try = [roots_map[root]
1214                                   for root in sorted_roots
1215                                   if roots_map[root].usable()]
1216                for keep_service in services_to_try:
1217                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1218                    if blob is not None:
1219                        break
1220                loop.save_result((blob, len(services_to_try)))
1221
1222            # Always cache the result, then return it if we succeeded.
1223            if loop.success():
1224                return blob
1225        finally:
1226            if slot is not None:
1227                self.block_cache.set(slot, blob)
1228
1229        # Q: Including 403 is necessary for the Keep tests to continue
1230        # passing, but maybe they should expect KeepReadError instead?
1231        not_founds = sum(1 for key in sorted_roots
1232                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1233        service_errors = ((key, roots_map[key].last_result()['error'])
1234                          for key in sorted_roots)
1235        if not roots_map:
1236            raise arvados.errors.KeepReadError(
1237                "[{}] failed to read {}: no Keep services available ({})".format(
1238                    request_id, loc_s, loop.last_result()))
1239        elif not_founds == len(sorted_roots):
1240            raise arvados.errors.NotFoundError(
1241                "[{}] {} not found".format(request_id, loc_s), service_errors)
1242        else:
1243            raise arvados.errors.KeepReadError(
1244                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1245
1246    @retry.retry_method
1247    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1248        """Save data in Keep.
1249
1250        This method will get a list of Keep services from the API server, and
1251        send the data to each one simultaneously in a new thread.  Once the
1252        uploads are finished, if enough copies are saved, this method returns
1253        the most recent HTTP response body.  If requests fail to upload
1254        enough copies, this method raises KeepWriteError.
1255
1256        Arguments:
1257        * data: The string of data to upload.
1258        * copies: The number of copies that the user requires be saved.
1259          Default 2.
1260        * num_retries: The number of times to retry PUT requests to
1261          *each* Keep server if it returns temporary failures, with
1262          exponential backoff.  The default value is set when the
1263          KeepClient is initialized.
1264        * classes: An optional list of storage class names where copies should
1265          be written.
1266        """
1267
1268        classes = classes or self._default_classes
1269
1270        if not isinstance(data, bytes):
1271            data = data.encode()
1272
1273        self.put_counter.add(1)
1274
1275        data_hash = hashlib.md5(data).hexdigest()
1276        loc_s = data_hash + '+' + str(len(data))
1277        if copies < 1:
1278            return loc_s
1279        locator = KeepLocator(loc_s)
1280
1281        request_id = (request_id or
1282                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1283                      arvados.util.new_request_id())
1284        headers = {
1285            'X-Request-Id': request_id,
1286            'X-Keep-Desired-Replicas': str(copies),
1287        }
1288        roots_map = {}
1289        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1290                               backoff_start=2)
1291        done_copies = 0
1292        done_classes = []
1293        for tries_left in loop:
1294            try:
1295                sorted_roots = self.map_new_services(
1296                    roots_map, locator,
1297                    force_rebuild=(tries_left < num_retries),
1298                    need_writable=True,
1299                    headers=headers)
1300            except Exception as error:
1301                loop.save_result(error)
1302                continue
1303
1304            pending_classes = []
1305            if done_classes is not None:
1306                pending_classes = list(set(classes) - set(done_classes))
1307            writer_pool = KeepClient._KeepWriterThreadPool(
1308                data=data,
1309                data_hash=data_hash,
1310                copies=copies - done_copies,
1311                max_service_replicas=self.max_replicas_per_service,
1312                timeout=self.current_timeout(num_retries - tries_left),
1313                classes=pending_classes,
1314            )
1315            for service_root, ks in [(root, roots_map[root])
1316                                     for root in sorted_roots]:
1317                if ks.finished():
1318                    continue
1319                writer_pool.add_task(ks, service_root)
1320            writer_pool.join()
1321            pool_copies, pool_classes = writer_pool.done()
1322            done_copies += pool_copies
1323            if (done_classes is not None) and (pool_classes is not None):
1324                done_classes += pool_classes
1325                loop.save_result(
1326                    (done_copies >= copies and set(done_classes) == set(classes),
1327                    writer_pool.total_task_nr))
1328            else:
1329                # Old keepstore contacted without storage classes support:
1330                # success is determined only by successful copies.
1331                #
1332                # Disable storage classes tracking from this point forward.
1333                if not self._storage_classes_unsupported_warning:
1334                    self._storage_classes_unsupported_warning = True
1335                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1336                done_classes = None
1337                loop.save_result(
1338                    (done_copies >= copies, writer_pool.total_task_nr))
1339
1340        if loop.success():
1341            return writer_pool.response()
1342        if not roots_map:
1343            raise arvados.errors.KeepWriteError(
1344                "[{}] failed to write {}: no Keep services available ({})".format(
1345                    request_id, data_hash, loop.last_result()))
1346        else:
1347            service_errors = ((key, roots_map[key].last_result()['error'])
1348                              for key in sorted_roots
1349                              if roots_map[key].last_result()['error'])
1350            raise arvados.errors.KeepWriteError(
1351                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1352                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1353
1354    def _block_prefetch_worker(self):
1355        """The background downloader thread."""
1356        while True:
1357            try:
1358                b = self._prefetch_queue.get()
1359                if b is None:
1360                    return
1361                self.get(b, prefetch=True)
1362            except Exception:
1363                _logger.exception("Exception doing block prefetch")
1364
1365    def _start_prefetch_threads(self):
1366        if self._prefetch_threads is None:
1367            with self.lock:
1368                if self._prefetch_threads is not None:
1369                    return
1370                self._prefetch_queue = queue.Queue()
1371                self._prefetch_threads = []
1372                for i in range(0, self.num_prefetch_threads):
1373                    thread = threading.Thread(target=self._block_prefetch_worker)
1374                    self._prefetch_threads.append(thread)
1375                    thread.daemon = True
1376                    thread.start()
1377
1378    def block_prefetch(self, locator):
1379        """
1380        This relies on the fact that KeepClient implements a block cache,
1381        so repeated requests for the same block will not result in repeated
1382        downloads (unless the block is evicted from the cache.)  This method
1383        does not block.
1384        """
1385
1386        if self.block_cache.get(locator) is not None:
1387            return
1388
1389        self._start_prefetch_threads()
1390        self._prefetch_queue.put(locator)
1391
1392    def stop_prefetch_threads(self):
1393        with self.lock:
1394            if self._prefetch_threads is not None:
1395                for t in self._prefetch_threads:
1396                    self._prefetch_queue.put(None)
1397                for t in self._prefetch_threads:
1398                    t.join()
1399            self._prefetch_threads = None
1400            self._prefetch_queue = None
1401
1402    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1403        """A stub for put().
1404
1405        This method is used in place of the real put() method when
1406        using local storage (see constructor's local_store argument).
1407
1408        copies and num_retries arguments are ignored: they are here
1409        only for the sake of offering the same call signature as
1410        put().
1411
1412        Data stored this way can be retrieved via local_store_get().
1413        """
1414        md5 = hashlib.md5(data).hexdigest()
1415        locator = '%s+%d' % (md5, len(data))
1416        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1417            f.write(data)
1418        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1419                  os.path.join(self.local_store, md5))
1420        return locator
1421
1422    def local_store_get(self, loc_s, num_retries=None):
1423        """Companion to local_store_put()."""
1424        try:
1425            locator = KeepLocator(loc_s)
1426        except ValueError:
1427            raise arvados.errors.NotFoundError(
1428                "Invalid data locator: '%s'" % loc_s)
1429        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1430            return b''
1431        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1432            return f.read()
1433
1434    def local_store_head(self, loc_s, num_retries=None):
1435        """Companion to local_store_put()."""
1436        try:
1437            locator = KeepLocator(loc_s)
1438        except ValueError:
1439            raise arvados.errors.NotFoundError(
1440                "Invalid data locator: '%s'" % loc_s)
1441        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1442            return True
1443        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1444            return True
KeepClient( api_client=None, proxy=None, timeout=(2, 256, 32768), proxy_timeout=(20, 256, 32768), api_token=None, local_store=None, block_cache=None, num_retries=10, session=None, num_prefetch_threads=None)
785    def __init__(self, api_client=None, proxy=None,
786                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
787                 api_token=None, local_store=None, block_cache=None,
788                 num_retries=10, session=None, num_prefetch_threads=None):
789        """Initialize a new KeepClient.
790
791        Arguments:
792        :api_client:
793          The API client to use to find Keep services.  If not
794          provided, KeepClient will build one from available Arvados
795          configuration.
796
797        :proxy:
798          If specified, this KeepClient will send requests to this Keep
799          proxy.  Otherwise, KeepClient will fall back to the setting of the
800          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
801          If you want to KeepClient does not use a proxy, pass in an empty
802          string.
803
804        :timeout:
805          The initial timeout (in seconds) for HTTP requests to Keep
806          non-proxy servers.  A tuple of three floats is interpreted as
807          (connection_timeout, read_timeout, minimum_bandwidth). A connection
808          will be aborted if the average traffic rate falls below
809          minimum_bandwidth bytes per second over an interval of read_timeout
810          seconds. Because timeouts are often a result of transient server
811          load, the actual connection timeout will be increased by a factor
812          of two on each retry.
813          Default: (2, 256, 32768).
814
815        :proxy_timeout:
816          The initial timeout (in seconds) for HTTP requests to
817          Keep proxies. A tuple of three floats is interpreted as
818          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
819          described above for adjusting connection timeouts on retry also
820          applies.
821          Default: (20, 256, 32768).
822
823        :api_token:
824          If you're not using an API client, but only talking
825          directly to a Keep proxy, this parameter specifies an API token
826          to authenticate Keep requests.  It is an error to specify both
827          api_client and api_token.  If you specify neither, KeepClient
828          will use one available from the Arvados configuration.
829
830        :local_store:
831          If specified, this KeepClient will bypass Keep
832          services, and save data to the named directory.  If unspecified,
833          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
834          environment variable.  If you want to ensure KeepClient does not
835          use local storage, pass in an empty string.  This is primarily
836          intended to mock a server for testing.
837
838        :num_retries:
839          The default number of times to retry failed requests.
840          This will be used as the default num_retries value when get() and
841          put() are called.  Default 10.
842        """
843        self.lock = threading.Lock()
844        if proxy is None:
845            if config.get('ARVADOS_KEEP_SERVICES'):
846                proxy = config.get('ARVADOS_KEEP_SERVICES')
847            else:
848                proxy = config.get('ARVADOS_KEEP_PROXY')
849        if api_token is None:
850            if api_client is None:
851                api_token = config.get('ARVADOS_API_TOKEN')
852            else:
853                api_token = api_client.api_token
854        elif api_client is not None:
855            raise ValueError(
856                "can't build KeepClient with both API client and token")
857        if local_store is None:
858            local_store = os.environ.get('KEEP_LOCAL_STORE')
859
860        if api_client is None:
861            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
862        else:
863            self.insecure = api_client.insecure
864
865        self.block_cache = block_cache if block_cache else KeepBlockCache()
866        self.timeout = timeout
867        self.proxy_timeout = proxy_timeout
868        self._user_agent_pool = queue.LifoQueue()
869        self.upload_counter = _Counter()
870        self.download_counter = _Counter()
871        self.put_counter = _Counter()
872        self.get_counter = _Counter()
873        self.hits_counter = _Counter()
874        self.misses_counter = _Counter()
875        self._storage_classes_unsupported_warning = False
876        self._default_classes = []
877        if num_prefetch_threads is not None:
878            self.num_prefetch_threads = num_prefetch_threads
879        else:
880            self.num_prefetch_threads = 2
881        self._prefetch_queue = None
882        self._prefetch_threads = None
883
884        if local_store:
885            self.local_store = local_store
886            self.head = self.local_store_head
887            self.get = self.local_store_get
888            self.put = self.local_store_put
889        else:
890            self.num_retries = num_retries
891            self.max_replicas_per_service = None
892            if proxy:
893                proxy_uris = proxy.split()
894                for i in range(len(proxy_uris)):
895                    if not proxy_uris[i].endswith('/'):
896                        proxy_uris[i] += '/'
897                    # URL validation
898                    url = urllib.parse.urlparse(proxy_uris[i])
899                    if not (url.scheme and url.netloc):
900                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
901                self.api_token = api_token
902                self._gateway_services = {}
903                self._keep_services = [{
904                    'uuid': "00000-bi6l4-%015d" % idx,
905                    'service_type': 'proxy',
906                    '_service_root': uri,
907                    } for idx, uri in enumerate(proxy_uris)]
908                self._writable_services = self._keep_services
909                self.using_proxy = True
910                self._static_services_list = True
911            else:
912                # It's important to avoid instantiating an API client
913                # unless we actually need one, for testing's sake.
914                if api_client is None:
915                    api_client = arvados.api('v1')
916                self.api_client = api_client
917                self.api_token = api_client.api_token
918                self._gateway_services = {}
919                self._keep_services = None
920                self._writable_services = None
921                self.using_proxy = None
922                self._static_services_list = False
923                try:
924                    self._default_classes = [
925                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
926                except KeyError:
927                    # We're talking to an old cluster
928                    pass

Initialize a new KeepClient.

Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.

:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.

:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).

:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).

:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.

:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.

:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.

DEFAULT_TIMEOUT = (2, 256, 32768)
DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
lock
block_cache
timeout
proxy_timeout
upload_counter
download_counter
put_counter
get_counter
hits_counter
misses_counter
def current_timeout(self, attempt_number):
930    def current_timeout(self, attempt_number):
931        """Return the appropriate timeout to use for this client.
932
933        The proxy timeout setting if the backend service is currently a proxy,
934        the regular timeout setting otherwise.  The `attempt_number` indicates
935        how many times the operation has been tried already (starting from 0
936        for the first try), and scales the connection timeout portion of the
937        return value accordingly.
938
939        """
940        # TODO(twp): the timeout should be a property of a
941        # _KeepService, not a KeepClient. See #4488.
942        t = self.proxy_timeout if self.using_proxy else self.timeout
943        if len(t) == 2:
944            return (t[0] * (1 << attempt_number), t[1])
945        else:
946            return (t[0] * (1 << attempt_number), t[1], t[2])

Return the appropriate timeout to use for this client.

The proxy timeout setting if the backend service is currently a proxy, the regular timeout setting otherwise. The attempt_number indicates how many times the operation has been tried already (starting from 0 for the first try), and scales the connection timeout portion of the return value accordingly.

def build_services_list(self, force_rebuild=False):
951    def build_services_list(self, force_rebuild=False):
952        if (self._static_services_list or
953              (self._keep_services and not force_rebuild)):
954            return
955        with self.lock:
956            try:
957                keep_services = self.api_client.keep_services().accessible()
958            except Exception:  # API server predates Keep services.
959                keep_services = self.api_client.keep_disks().list()
960
961            # Gateway services are only used when specified by UUID,
962            # so there's nothing to gain by filtering them by
963            # service_type.
964            self._gateway_services = {ks['uuid']: ks for ks in
965                                      keep_services.execute()['items']}
966            if not self._gateway_services:
967                raise arvados.errors.NoKeepServersError()
968
969            # Precompute the base URI for each service.
970            for r in self._gateway_services.values():
971                host = r['service_host']
972                if not host.startswith('[') and host.find(':') >= 0:
973                    # IPv6 URIs must be formatted like http://[::1]:80/...
974                    host = '[' + host + ']'
975                r['_service_root'] = "{}://{}:{:d}/".format(
976                    'https' if r['service_ssl_flag'] else 'http',
977                    host,
978                    r['service_port'])
979
980            _logger.debug(str(self._gateway_services))
981            self._keep_services = [
982                ks for ks in self._gateway_services.values()
983                if not ks.get('service_type', '').startswith('gateway:')]
984            self._writable_services = [ks for ks in self._keep_services
985                                       if not ks.get('read_only')]
986
987            # For disk type services, max_replicas_per_service is 1
988            # It is unknown (unlimited) for other service types.
989            if self._any_nondisk_services(self._writable_services):
990                self.max_replicas_per_service = None
991            else:
992                self.max_replicas_per_service = 1
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1003    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1004        """Return an array of Keep service endpoints, in the order in
1005        which they should be probed when reading or writing data with
1006        the given hash+hints.
1007        """
1008        self.build_services_list(force_rebuild)
1009
1010        sorted_roots = []
1011        # Use the services indicated by the given +K@... remote
1012        # service hints, if any are present and can be resolved to a
1013        # URI.
1014        for hint in locator.hints:
1015            if hint.startswith('K@'):
1016                if len(hint) == 7:
1017                    sorted_roots.append(
1018                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1019                elif len(hint) == 29:
1020                    svc = self._gateway_services.get(hint[2:])
1021                    if svc:
1022                        sorted_roots.append(svc['_service_root'])
1023
1024        # Sort the available local services by weight (heaviest first)
1025        # for this locator, and return their service_roots (base URIs)
1026        # in that order.
1027        use_services = self._keep_services
1028        if need_writable:
1029            use_services = self._writable_services
1030        self.using_proxy = self._any_nondisk_services(use_services)
1031        sorted_roots.extend([
1032            svc['_service_root'] for svc in sorted(
1033                use_services,
1034                reverse=True,
1035                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1036        _logger.debug("{}: {}".format(locator, sorted_roots))
1037        return sorted_roots

Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.

def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1039    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1040        # roots_map is a dictionary, mapping Keep service root strings
1041        # to _KeepService objects.  Poll for Keep services, and add any
1042        # new ones to roots_map.  Return the current list of local
1043        # root strings.
1044        headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1045        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1046        for root in local_roots:
1047            if root not in roots_map:
1048                roots_map[root] = self._KeepService(
1049                    root, self._user_agent_pool,
1050                    upload_counter=self.upload_counter,
1051                    download_counter=self.download_counter,
1052                    headers=headers,
1053                    insecure=self.insecure)
1054        return local_roots
def get_from_cache(self, loc_s):
1074    def get_from_cache(self, loc_s):
1075        """Fetch a block only if is in the cache, otherwise return None."""
1076        locator = KeepLocator(loc_s)
1077        slot = self.block_cache.get(locator.md5sum)
1078        if slot is not None and slot.ready.is_set():
1079            return slot.get()
1080        else:
1081            return None

Fetch a block only if is in the cache, otherwise return None.

def refresh_signature(self, loc):
1083    def refresh_signature(self, loc):
1084        """Ask Keep to get the remote block and return its local signature"""
1085        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1086        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})

Ask Keep to get the remote block and return its local signature

@retry.retry_method
def head(self, loc_s, **kwargs):
1088    @retry.retry_method
1089    def head(self, loc_s, **kwargs):
1090        return self._get_or_head(loc_s, method="HEAD", **kwargs)
@retry.retry_method
def get(self, loc_s, **kwargs):
1092    @retry.retry_method
1093    def get(self, loc_s, **kwargs):
1094        return self._get_or_head(loc_s, method="GET", **kwargs)
@retry.retry_method
def put( self, data, copies=2, num_retries=None, request_id=None, classes=None):
1246    @retry.retry_method
1247    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1248        """Save data in Keep.
1249
1250        This method will get a list of Keep services from the API server, and
1251        send the data to each one simultaneously in a new thread.  Once the
1252        uploads are finished, if enough copies are saved, this method returns
1253        the most recent HTTP response body.  If requests fail to upload
1254        enough copies, this method raises KeepWriteError.
1255
1256        Arguments:
1257        * data: The string of data to upload.
1258        * copies: The number of copies that the user requires be saved.
1259          Default 2.
1260        * num_retries: The number of times to retry PUT requests to
1261          *each* Keep server if it returns temporary failures, with
1262          exponential backoff.  The default value is set when the
1263          KeepClient is initialized.
1264        * classes: An optional list of storage class names where copies should
1265          be written.
1266        """
1267
1268        classes = classes or self._default_classes
1269
1270        if not isinstance(data, bytes):
1271            data = data.encode()
1272
1273        self.put_counter.add(1)
1274
1275        data_hash = hashlib.md5(data).hexdigest()
1276        loc_s = data_hash + '+' + str(len(data))
1277        if copies < 1:
1278            return loc_s
1279        locator = KeepLocator(loc_s)
1280
1281        request_id = (request_id or
1282                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1283                      arvados.util.new_request_id())
1284        headers = {
1285            'X-Request-Id': request_id,
1286            'X-Keep-Desired-Replicas': str(copies),
1287        }
1288        roots_map = {}
1289        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1290                               backoff_start=2)
1291        done_copies = 0
1292        done_classes = []
1293        for tries_left in loop:
1294            try:
1295                sorted_roots = self.map_new_services(
1296                    roots_map, locator,
1297                    force_rebuild=(tries_left < num_retries),
1298                    need_writable=True,
1299                    headers=headers)
1300            except Exception as error:
1301                loop.save_result(error)
1302                continue
1303
1304            pending_classes = []
1305            if done_classes is not None:
1306                pending_classes = list(set(classes) - set(done_classes))
1307            writer_pool = KeepClient._KeepWriterThreadPool(
1308                data=data,
1309                data_hash=data_hash,
1310                copies=copies - done_copies,
1311                max_service_replicas=self.max_replicas_per_service,
1312                timeout=self.current_timeout(num_retries - tries_left),
1313                classes=pending_classes,
1314            )
1315            for service_root, ks in [(root, roots_map[root])
1316                                     for root in sorted_roots]:
1317                if ks.finished():
1318                    continue
1319                writer_pool.add_task(ks, service_root)
1320            writer_pool.join()
1321            pool_copies, pool_classes = writer_pool.done()
1322            done_copies += pool_copies
1323            if (done_classes is not None) and (pool_classes is not None):
1324                done_classes += pool_classes
1325                loop.save_result(
1326                    (done_copies >= copies and set(done_classes) == set(classes),
1327                    writer_pool.total_task_nr))
1328            else:
1329                # Old keepstore contacted without storage classes support:
1330                # success is determined only by successful copies.
1331                #
1332                # Disable storage classes tracking from this point forward.
1333                if not self._storage_classes_unsupported_warning:
1334                    self._storage_classes_unsupported_warning = True
1335                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1336                done_classes = None
1337                loop.save_result(
1338                    (done_copies >= copies, writer_pool.total_task_nr))
1339
1340        if loop.success():
1341            return writer_pool.response()
1342        if not roots_map:
1343            raise arvados.errors.KeepWriteError(
1344                "[{}] failed to write {}: no Keep services available ({})".format(
1345                    request_id, data_hash, loop.last_result()))
1346        else:
1347            service_errors = ((key, roots_map[key].last_result()['error'])
1348                              for key in sorted_roots
1349                              if roots_map[key].last_result()['error'])
1350            raise arvados.errors.KeepWriteError(
1351                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1352                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")

Save data in Keep.

This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.

Arguments:

  • data: The string of data to upload.
  • copies: The number of copies that the user requires be saved. Default 2.
  • num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
  • classes: An optional list of storage class names where copies should be written.
def block_prefetch(self, locator):
1378    def block_prefetch(self, locator):
1379        """
1380        This relies on the fact that KeepClient implements a block cache,
1381        so repeated requests for the same block will not result in repeated
1382        downloads (unless the block is evicted from the cache.)  This method
1383        does not block.
1384        """
1385
1386        if self.block_cache.get(locator) is not None:
1387            return
1388
1389        self._start_prefetch_threads()
1390        self._prefetch_queue.put(locator)

This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.

def stop_prefetch_threads(self):
1392    def stop_prefetch_threads(self):
1393        with self.lock:
1394            if self._prefetch_threads is not None:
1395                for t in self._prefetch_threads:
1396                    self._prefetch_queue.put(None)
1397                for t in self._prefetch_threads:
1398                    t.join()
1399            self._prefetch_threads = None
1400            self._prefetch_queue = None
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1402    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1403        """A stub for put().
1404
1405        This method is used in place of the real put() method when
1406        using local storage (see constructor's local_store argument).
1407
1408        copies and num_retries arguments are ignored: they are here
1409        only for the sake of offering the same call signature as
1410        put().
1411
1412        Data stored this way can be retrieved via local_store_get().
1413        """
1414        md5 = hashlib.md5(data).hexdigest()
1415        locator = '%s+%d' % (md5, len(data))
1416        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1417            f.write(data)
1418        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1419                  os.path.join(self.local_store, md5))
1420        return locator

A stub for put().

This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).

copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().

Data stored this way can be retrieved via local_store_get().

def local_store_get(self, loc_s, num_retries=None):
1422    def local_store_get(self, loc_s, num_retries=None):
1423        """Companion to local_store_put()."""
1424        try:
1425            locator = KeepLocator(loc_s)
1426        except ValueError:
1427            raise arvados.errors.NotFoundError(
1428                "Invalid data locator: '%s'" % loc_s)
1429        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1430            return b''
1431        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1432            return f.read()

Companion to local_store_put().

def local_store_head(self, loc_s, num_retries=None):
1434    def local_store_head(self, loc_s, num_retries=None):
1435        """Companion to local_store_put()."""
1436        try:
1437            locator = KeepLocator(loc_s)
1438        except ValueError:
1439            raise arvados.errors.NotFoundError(
1440                "Invalid data locator: '%s'" % loc_s)
1441        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1442            return True
1443        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1444            return True

Companion to local_store_put().