arvados.keep

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import copy
   6import collections
   7import datetime
   8import hashlib
   9import errno
  10import io
  11import logging
  12import math
  13import os
  14import pycurl
  15import queue
  16import re
  17import socket
  18import ssl
  19import sys
  20import threading
  21import resource
  22import urllib.parse
  23import traceback
  24import weakref
  25
  26from io import BytesIO
  27
  28import arvados
  29import arvados.config as config
  30import arvados.errors
  31import arvados.retry as retry
  32import arvados.util
  33
  34from ._internal import basedirs, diskcache, Timer
  35from ._internal.pycurl import PyCurlHelper
  36
  37_logger = logging.getLogger('arvados.keep')
  38global_client_object = None
  39
  40# Monkey patch TCP constants when not available (apple). Values sourced from:
  41# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
  42if sys.platform == 'darwin':
  43    if not hasattr(socket, 'TCP_KEEPALIVE'):
  44        socket.TCP_KEEPALIVE = 0x010
  45    if not hasattr(socket, 'TCP_KEEPINTVL'):
  46        socket.TCP_KEEPINTVL = 0x101
  47    if not hasattr(socket, 'TCP_KEEPCNT'):
  48        socket.TCP_KEEPCNT = 0x102
  49
  50class KeepLocator(object):
  51    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
  52    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
  53
  54    def __init__(self, locator_str):
  55        self.hints = []
  56        self._perm_sig = None
  57        self._perm_expiry = None
  58        pieces = iter(locator_str.split('+'))
  59        self.md5sum = next(pieces)
  60        try:
  61            self.size = int(next(pieces))
  62        except StopIteration:
  63            self.size = None
  64        for hint in pieces:
  65            if self.HINT_RE.match(hint) is None:
  66                raise ValueError("invalid hint format: {}".format(hint))
  67            elif hint.startswith('A'):
  68                self.parse_permission_hint(hint)
  69            else:
  70                self.hints.append(hint)
  71
  72    def __str__(self):
  73        return '+'.join(
  74            str(s)
  75            for s in [self.md5sum, self.size,
  76                      self.permission_hint()] + self.hints
  77            if s is not None)
  78
  79    def stripped(self):
  80        if self.size is not None:
  81            return "%s+%i" % (self.md5sum, self.size)
  82        else:
  83            return self.md5sum
  84
  85    def _make_hex_prop(name, length):
  86        # Build and return a new property with the given name that
  87        # must be a hex string of the given length.
  88        data_name = '_{}'.format(name)
  89        def getter(self):
  90            return getattr(self, data_name)
  91        def setter(self, hex_str):
  92            if not arvados.util.is_hex(hex_str, length):
  93                raise ValueError("{} is not a {}-digit hex string: {!r}".
  94                                 format(name, length, hex_str))
  95            setattr(self, data_name, hex_str)
  96        return property(getter, setter)
  97
  98    md5sum = _make_hex_prop('md5sum', 32)
  99    perm_sig = _make_hex_prop('perm_sig', 40)
 100
 101    @property
 102    def perm_expiry(self):
 103        return self._perm_expiry
 104
 105    @perm_expiry.setter
 106    def perm_expiry(self, value):
 107        if not arvados.util.is_hex(value, 1, 8):
 108            raise ValueError(
 109                "permission timestamp must be a hex Unix timestamp: {}".
 110                format(value))
 111        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
 112
 113    def permission_hint(self):
 114        data = [self.perm_sig, self.perm_expiry]
 115        if None in data:
 116            return None
 117        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
 118        return "A{}@{:08x}".format(*data)
 119
 120    def parse_permission_hint(self, s):
 121        try:
 122            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
 123        except IndexError:
 124            raise ValueError("bad permission hint {}".format(s))
 125
 126    def permission_expired(self, as_of_dt=None):
 127        if self.perm_expiry is None:
 128            return False
 129        elif as_of_dt is None:
 130            as_of_dt = datetime.datetime.now()
 131        return self.perm_expiry <= as_of_dt
 132
 133
 134class KeepBlockCache(object):
 135    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
 136        self.cache_max = cache_max
 137        self._cache = collections.OrderedDict()
 138        self._cache_lock = threading.Lock()
 139        self._max_slots = max_slots
 140        self._disk_cache = disk_cache
 141        self._disk_cache_dir = disk_cache_dir
 142        self._cache_updating = threading.Condition(self._cache_lock)
 143
 144        if self._disk_cache and self._disk_cache_dir is None:
 145            self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
 146
 147        if self._max_slots == 0:
 148            if self._disk_cache:
 149                # Each block uses two file descriptors, one used to
 150                # open it initially and hold the flock(), and a second
 151                # hidden one used by mmap().
 152                #
 153                # Set max slots to 1/8 of maximum file handles.  This
 154                # means we'll use at most 1/4 of total file handles.
 155                #
 156                # NOFILE typically defaults to 1024 on Linux so this
 157                # is 128 slots (256 file handles), which means we can
 158                # cache up to 8 GiB of 64 MiB blocks.  This leaves
 159                # 768 file handles for sockets and other stuff.
 160                #
 161                # When we want the ability to have more cache (e.g. in
 162                # arv-mount) we'll increase rlimit before calling
 163                # this.
 164                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
 165            else:
 166                # RAM cache slots
 167                self._max_slots = 512
 168
 169        if self.cache_max == 0:
 170            if self._disk_cache:
 171                fs = os.statvfs(self._disk_cache_dir)
 172                # Calculation of available space incorporates existing cache usage
 173                existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
 174                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
 175                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
 176                # pick smallest of:
 177                # 10% of total disk size
 178                # 25% of available space
 179                # max_slots * 64 MiB
 180                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
 181            else:
 182                # 256 MiB in RAM
 183                self.cache_max = (256 * 1024 * 1024)
 184
 185        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 186
 187        self.cache_total = 0
 188        if self._disk_cache:
 189            self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
 190            for slot in self._cache.values():
 191                self.cache_total += slot.size()
 192            self.cap_cache()
 193
 194    class _CacheSlot:
 195        __slots__ = ("locator", "ready", "content")
 196
 197        def __init__(self, locator):
 198            self.locator = locator
 199            self.ready = threading.Event()
 200            self.content = None
 201
 202        def get(self):
 203            self.ready.wait()
 204            return self.content
 205
 206        def set(self, value):
 207            if self.content is not None:
 208                return False
 209            self.content = value
 210            self.ready.set()
 211            return True
 212
 213        def size(self):
 214            if self.content is None:
 215                return 0
 216            else:
 217                return len(self.content)
 218
 219        def evict(self):
 220            self.content = None
 221
 222
 223    def _resize_cache(self, cache_max, max_slots):
 224        # Try and make sure the contents of the cache do not exceed
 225        # the supplied maximums.
 226
 227        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
 228            return
 229
 230        _evict_candidates = collections.deque(self._cache.values())
 231        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
 232            slot = _evict_candidates.popleft()
 233            if not slot.ready.is_set():
 234                continue
 235
 236            sz = slot.size()
 237            slot.evict()
 238            self.cache_total -= sz
 239            del self._cache[slot.locator]
 240
 241
 242    def cap_cache(self):
 243        '''Cap the cache size to self.cache_max'''
 244        with self._cache_updating:
 245            self._resize_cache(self.cache_max, self._max_slots)
 246            self._cache_updating.notify_all()
 247
 248    def _get(self, locator):
 249        # Test if the locator is already in the cache
 250        if locator in self._cache:
 251            n = self._cache[locator]
 252            if n.ready.is_set() and n.content is None:
 253                del self._cache[n.locator]
 254                return None
 255            self._cache.move_to_end(locator)
 256            return n
 257        if self._disk_cache:
 258            # see if it exists on disk
 259            n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
 260            if n is not None:
 261                self._cache[n.locator] = n
 262                self.cache_total += n.size()
 263                return n
 264        return None
 265
 266    def get(self, locator):
 267        with self._cache_lock:
 268            return self._get(locator)
 269
 270    def reserve_cache(self, locator):
 271        '''Reserve a cache slot for the specified locator,
 272        or return the existing slot.'''
 273        with self._cache_updating:
 274            n = self._get(locator)
 275            if n:
 276                return n, False
 277            else:
 278                # Add a new cache slot for the locator
 279                self._resize_cache(self.cache_max, self._max_slots-1)
 280                while len(self._cache) >= self._max_slots:
 281                    # If there isn't a slot available, need to wait
 282                    # for something to happen that releases one of the
 283                    # cache slots.  Idle for 200 ms or woken up by
 284                    # another thread
 285                    self._cache_updating.wait(timeout=0.2)
 286                    self._resize_cache(self.cache_max, self._max_slots-1)
 287
 288                if self._disk_cache:
 289                    n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
 290                else:
 291                    n = KeepBlockCache._CacheSlot(locator)
 292                self._cache[n.locator] = n
 293                return n, True
 294
 295    def set(self, slot, blob):
 296        try:
 297            if slot.set(blob):
 298                self.cache_total += slot.size()
 299            return
 300        except OSError as e:
 301            if e.errno == errno.ENOMEM:
 302                # Reduce max slots to current - 4, cap cache and retry
 303                with self._cache_lock:
 304                    self._max_slots = max(4, len(self._cache) - 4)
 305            elif e.errno == errno.ENOSPC:
 306                # Reduce disk max space to current - 256 MiB, cap cache and retry
 307                with self._cache_lock:
 308                    sm = sum(st.size() for st in self._cache.values())
 309                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
 310            elif e.errno == errno.ENODEV:
 311                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
 312        except Exception as e:
 313            pass
 314        finally:
 315            # Check if we should evict things from the cache.  Either
 316            # because we added a new thing or there was an error and
 317            # we possibly adjusted the limits down, so we might need
 318            # to push something out.
 319            self.cap_cache()
 320
 321        try:
 322            # Only gets here if there was an error the first time. The
 323            # exception handler adjusts limits downward in some cases
 324            # to free up resources, which would make the operation
 325            # succeed.
 326            if slot.set(blob):
 327                self.cache_total += slot.size()
 328        except Exception as e:
 329            # It failed again.  Give up.
 330            slot.set(None)
 331            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
 332
 333        self.cap_cache()
 334
 335    def clear(self):
 336        with self._cache_lock:
 337            self._cache.clear()
 338            self.cache_total = 0
 339
 340class _Counter:
 341    def __init__(self, v=0):
 342        self._lk = threading.Lock()
 343        self._val = v
 344
 345    def add(self, v):
 346        with self._lk:
 347            self._val += v
 348
 349    def get(self):
 350        with self._lk:
 351            return self._val
 352
 353
 354class KeepClient(object):
 355    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 356    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 357
 358    class _KeepService(PyCurlHelper):
 359        """Make requests to a single Keep service, and track results.
 360
 361        A _KeepService is intended to last long enough to perform one
 362        transaction (GET or PUT) against one Keep service. This can
 363        involve calling either get() or put() multiple times in order
 364        to retry after transient failures. However, calling both get()
 365        and put() on a single instance -- or using the same instance
 366        to access two different Keep services -- will not produce
 367        sensible behavior.
 368        """
 369
 370        HTTP_ERRORS = (
 371            socket.error,
 372            ssl.SSLError,
 373            arvados.errors.HttpError,
 374        )
 375
 376        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 377                     upload_counter=None,
 378                     download_counter=None,
 379                     headers={},
 380                     insecure=False):
 381            super().__init__()
 382            self.root = root
 383            self._user_agent_pool = user_agent_pool
 384            self._result = {'error': None}
 385            self._usable = True
 386            self._session = None
 387            self._socket = None
 388            self.get_headers = {'Accept': 'application/octet-stream'}
 389            self.get_headers.update(headers)
 390            self.put_headers = headers
 391            self.upload_counter = upload_counter
 392            self.download_counter = download_counter
 393            self.insecure = insecure
 394
 395        def usable(self):
 396            """Is it worth attempting a request?"""
 397            return self._usable
 398
 399        def finished(self):
 400            """Did the request succeed or encounter permanent failure?"""
 401            return self._result['error'] == False or not self._usable
 402
 403        def last_result(self):
 404            return self._result
 405
 406        def _get_user_agent(self):
 407            try:
 408                return self._user_agent_pool.get(block=False)
 409            except queue.Empty:
 410                return pycurl.Curl()
 411
 412        def _put_user_agent(self, ua):
 413            try:
 414                ua.reset()
 415                self._user_agent_pool.put(ua, block=False)
 416            except:
 417                ua.close()
 418
 419        def get(self, locator, method="GET", timeout=None):
 420            # locator is a KeepLocator object.
 421            url = self.root + str(locator)
 422            _logger.debug("Request: %s %s", method, url)
 423            curl = self._get_user_agent()
 424            ok = None
 425            try:
 426                with Timer() as t:
 427                    self._headers = {}
 428                    response_body = BytesIO()
 429                    curl.setopt(pycurl.NOSIGNAL, 1)
 430                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 431                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 432                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 433                    curl.setopt(pycurl.HTTPHEADER, [
 434                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 435                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 436                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 437                    if self.insecure:
 438                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 439                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 440                    else:
 441                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 442                    if method == "HEAD":
 443                        curl.setopt(pycurl.NOBODY, True)
 444                    else:
 445                        curl.setopt(pycurl.HTTPGET, True)
 446                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 447
 448                    try:
 449                        curl.perform()
 450                    except Exception as e:
 451                        raise arvados.errors.HttpError(0, str(e))
 452                    finally:
 453                        if self._socket:
 454                            self._socket.close()
 455                            self._socket = None
 456                    self._result = {
 457                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 458                        'body': response_body.getvalue(),
 459                        'headers': self._headers,
 460                        'error': False,
 461                    }
 462
 463                ok = retry.check_http_response_success(self._result['status_code'])
 464                if not ok:
 465                    self._result['error'] = arvados.errors.HttpError(
 466                        self._result['status_code'],
 467                        self._headers.get('x-status-line', 'Error'))
 468            except self.HTTP_ERRORS as e:
 469                self._result = {
 470                    'error': e,
 471                }
 472            self._usable = ok != False
 473            if self._result.get('status_code', None):
 474                # The client worked well enough to get an HTTP status
 475                # code, so presumably any problems are just on the
 476                # server side and it's OK to reuse the client.
 477                self._put_user_agent(curl)
 478            else:
 479                # Don't return this client to the pool, in case it's
 480                # broken.
 481                curl.close()
 482            if not ok:
 483                _logger.debug("Request fail: GET %s => %s: %s",
 484                              url, type(self._result['error']), str(self._result['error']))
 485                return None
 486            if method == "HEAD":
 487                _logger.info("HEAD %s: %s bytes",
 488                         self._result['status_code'],
 489                         self._result.get('content-length'))
 490                if self._result['headers'].get('x-keep-locator'):
 491                    # This is a response to a remote block copy request, return
 492                    # the local copy block locator.
 493                    return self._result['headers'].get('x-keep-locator')
 494                return True
 495
 496            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 497                         self._result['status_code'],
 498                         len(self._result['body']),
 499                         t.msecs,
 500                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 501
 502            if self.download_counter:
 503                self.download_counter.add(len(self._result['body']))
 504            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 505            if resp_md5 != locator.md5sum:
 506                _logger.warning("Checksum fail: md5(%s) = %s",
 507                                url, resp_md5)
 508                self._result['error'] = arvados.errors.HttpError(
 509                    0, 'Checksum fail')
 510                return None
 511            return self._result['body']
 512
 513        def put(self, hash_s, body, timeout=None, headers={}):
 514            put_headers = copy.copy(self.put_headers)
 515            put_headers.update(headers)
 516            url = self.root + hash_s
 517            _logger.debug("Request: PUT %s", url)
 518            curl = self._get_user_agent()
 519            ok = None
 520            try:
 521                with Timer() as t:
 522                    self._headers = {}
 523                    body_reader = BytesIO(body)
 524                    response_body = BytesIO()
 525                    curl.setopt(pycurl.NOSIGNAL, 1)
 526                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 527                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 528                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 529                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 530                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 531                    # response) instead of sending the request body immediately.
 532                    # This allows the server to reject the request if the request
 533                    # is invalid or the server is read-only, without waiting for
 534                    # the client to send the entire block.
 535                    curl.setopt(pycurl.UPLOAD, True)
 536                    curl.setopt(pycurl.INFILESIZE, len(body))
 537                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 538                    curl.setopt(pycurl.HTTPHEADER, [
 539                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 540                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 541                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 542                    if self.insecure:
 543                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 544                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 545                    else:
 546                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 547                    self._setcurltimeouts(curl, timeout)
 548                    try:
 549                        curl.perform()
 550                    except Exception as e:
 551                        raise arvados.errors.HttpError(0, str(e))
 552                    finally:
 553                        if self._socket:
 554                            self._socket.close()
 555                            self._socket = None
 556                    self._result = {
 557                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 558                        'body': response_body.getvalue().decode('utf-8'),
 559                        'headers': self._headers,
 560                        'error': False,
 561                    }
 562                ok = retry.check_http_response_success(self._result['status_code'])
 563                if not ok:
 564                    self._result['error'] = arvados.errors.HttpError(
 565                        self._result['status_code'],
 566                        self._headers.get('x-status-line', 'Error'))
 567            except self.HTTP_ERRORS as e:
 568                self._result = {
 569                    'error': e,
 570                }
 571            self._usable = ok != False # still usable if ok is True or None
 572            if self._result.get('status_code', None):
 573                # Client is functional. See comment in get().
 574                self._put_user_agent(curl)
 575            else:
 576                curl.close()
 577            if not ok:
 578                _logger.debug("Request fail: PUT %s => %s: %s",
 579                              url, type(self._result['error']), str(self._result['error']))
 580                return False
 581            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 582                         self._result['status_code'],
 583                         len(body),
 584                         t.msecs,
 585                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 586            if self.upload_counter:
 587                self.upload_counter.add(len(body))
 588            return True
 589
 590
 591    class _KeepWriterQueue(queue.Queue):
 592        def __init__(self, copies, classes=[]):
 593            queue.Queue.__init__(self) # Old-style superclass
 594            self.wanted_copies = copies
 595            self.wanted_storage_classes = classes
 596            self.successful_copies = 0
 597            self.confirmed_storage_classes = {}
 598            self.response = None
 599            self.storage_classes_tracking = True
 600            self.queue_data_lock = threading.RLock()
 601            self.pending_tries = max(copies, len(classes))
 602            self.pending_tries_notification = threading.Condition()
 603
 604        def write_success(self, response, replicas_nr, classes_confirmed):
 605            with self.queue_data_lock:
 606                self.successful_copies += replicas_nr
 607                if classes_confirmed is None:
 608                    self.storage_classes_tracking = False
 609                elif self.storage_classes_tracking:
 610                    for st_class, st_copies in classes_confirmed.items():
 611                        try:
 612                            self.confirmed_storage_classes[st_class] += st_copies
 613                        except KeyError:
 614                            self.confirmed_storage_classes[st_class] = st_copies
 615                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 616                self.response = response
 617            with self.pending_tries_notification:
 618                self.pending_tries_notification.notify_all()
 619
 620        def write_fail(self, ks):
 621            with self.pending_tries_notification:
 622                self.pending_tries += 1
 623                self.pending_tries_notification.notify()
 624
 625        def pending_copies(self):
 626            with self.queue_data_lock:
 627                return self.wanted_copies - self.successful_copies
 628
 629        def satisfied_classes(self):
 630            with self.queue_data_lock:
 631                if not self.storage_classes_tracking:
 632                    # Notifies disabled storage classes expectation to
 633                    # the outer loop.
 634                    return None
 635            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 636
 637        def pending_classes(self):
 638            with self.queue_data_lock:
 639                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 640                    return []
 641                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 642                for st_class, st_copies in self.confirmed_storage_classes.items():
 643                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 644                        unsatisfied_classes.remove(st_class)
 645                return unsatisfied_classes
 646
 647        def get_next_task(self):
 648            with self.pending_tries_notification:
 649                while True:
 650                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 651                        # This notify_all() is unnecessary --
 652                        # write_success() already called notify_all()
 653                        # when pending<1 became true, so it's not
 654                        # possible for any other thread to be in
 655                        # wait() now -- but it's cheap insurance
 656                        # against deadlock so we do it anyway:
 657                        self.pending_tries_notification.notify_all()
 658                        # Drain the queue and then raise Queue.Empty
 659                        while True:
 660                            self.get_nowait()
 661                            self.task_done()
 662                    elif self.pending_tries > 0:
 663                        service, service_root = self.get_nowait()
 664                        if service.finished():
 665                            self.task_done()
 666                            continue
 667                        self.pending_tries -= 1
 668                        return service, service_root
 669                    elif self.empty():
 670                        self.pending_tries_notification.notify_all()
 671                        raise queue.Empty
 672                    else:
 673                        self.pending_tries_notification.wait()
 674
 675
 676    class _KeepWriterThreadPool:
 677        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 678            self.total_task_nr = 0
 679            if (not max_service_replicas) or (max_service_replicas >= copies):
 680                num_threads = 1
 681            else:
 682                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 683            _logger.debug("Pool max threads is %d", num_threads)
 684            self.workers = []
 685            self.queue = KeepClient._KeepWriterQueue(copies, classes)
 686            # Create workers
 687            for _ in range(num_threads):
 688                w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
 689                self.workers.append(w)
 690
 691        def add_task(self, ks, service_root):
 692            self.queue.put((ks, service_root))
 693            self.total_task_nr += 1
 694
 695        def done(self):
 696            return self.queue.successful_copies, self.queue.satisfied_classes()
 697
 698        def join(self):
 699            # Start workers
 700            for worker in self.workers:
 701                worker.start()
 702            # Wait for finished work
 703            self.queue.join()
 704
 705        def response(self):
 706            return self.queue.response
 707
 708
 709    class _KeepWriterThread(threading.Thread):
 710        class TaskFailed(RuntimeError):
 711            """Exception for failed Keep writes
 712
 713            TODO: Move this class to the module top level and document it
 714
 715            @private
 716            """
 717
 718
 719        def __init__(self, queue, data, data_hash, timeout=None):
 720            super().__init__()
 721            self.timeout = timeout
 722            self.queue = queue
 723            self.data = data
 724            self.data_hash = data_hash
 725            self.daemon = True
 726
 727        def run(self):
 728            while True:
 729                try:
 730                    service, service_root = self.queue.get_next_task()
 731                except queue.Empty:
 732                    return
 733                try:
 734                    locator, copies, classes = self.do_task(service, service_root)
 735                except Exception as e:
 736                    if not isinstance(e, self.TaskFailed):
 737                        _logger.exception("Exception in _KeepWriterThread")
 738                    self.queue.write_fail(service)
 739                else:
 740                    self.queue.write_success(locator, copies, classes)
 741                finally:
 742                    self.queue.task_done()
 743
 744        def do_task(self, service, service_root):
 745            classes = self.queue.pending_classes()
 746            headers = {}
 747            if len(classes) > 0:
 748                classes.sort()
 749                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 750            success = bool(service.put(self.data_hash,
 751                                        self.data,
 752                                        timeout=self.timeout,
 753                                        headers=headers))
 754            result = service.last_result()
 755
 756            if not success:
 757                if result.get('status_code'):
 758                    _logger.debug("Request fail: PUT %s => %s %s",
 759                                  self.data_hash,
 760                                  result.get('status_code'),
 761                                  result.get('body'))
 762                raise self.TaskFailed()
 763
 764            _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
 765                          str(threading.current_thread()),
 766                          self.data_hash,
 767                          len(self.data),
 768                          service_root)
 769            try:
 770                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 771            except (KeyError, ValueError):
 772                replicas_stored = 1
 773
 774            classes_confirmed = {}
 775            try:
 776                scch = result['headers']['x-keep-storage-classes-confirmed']
 777                for confirmation in scch.replace(' ', '').split(','):
 778                    if '=' in confirmation:
 779                        stored_class, stored_copies = confirmation.split('=')[:2]
 780                        classes_confirmed[stored_class] = int(stored_copies)
 781            except (KeyError, ValueError):
 782                # Storage classes confirmed header missing or corrupt
 783                classes_confirmed = None
 784
 785            return result['body'].strip(), replicas_stored, classes_confirmed
 786
 787
 788    def __init__(self, api_client=None, proxy=None,
 789                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 790                 api_token=None, local_store=None, block_cache=None,
 791                 num_retries=10, session=None, num_prefetch_threads=None):
 792        """Initialize a new KeepClient.
 793
 794        Arguments:
 795        :api_client:
 796          The API client to use to find Keep services.  If not
 797          provided, KeepClient will build one from available Arvados
 798          configuration.
 799
 800        :proxy:
 801          If specified, this KeepClient will send requests to this Keep
 802          proxy.  Otherwise, KeepClient will fall back to the setting of the
 803          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 804          If you want to KeepClient does not use a proxy, pass in an empty
 805          string.
 806
 807        :timeout:
 808          The initial timeout (in seconds) for HTTP requests to Keep
 809          non-proxy servers.  A tuple of three floats is interpreted as
 810          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 811          will be aborted if the average traffic rate falls below
 812          minimum_bandwidth bytes per second over an interval of read_timeout
 813          seconds. Because timeouts are often a result of transient server
 814          load, the actual connection timeout will be increased by a factor
 815          of two on each retry.
 816          Default: (2, 256, 32768).
 817
 818        :proxy_timeout:
 819          The initial timeout (in seconds) for HTTP requests to
 820          Keep proxies. A tuple of three floats is interpreted as
 821          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 822          described above for adjusting connection timeouts on retry also
 823          applies.
 824          Default: (20, 256, 32768).
 825
 826        :api_token:
 827          If you're not using an API client, but only talking
 828          directly to a Keep proxy, this parameter specifies an API token
 829          to authenticate Keep requests.  It is an error to specify both
 830          api_client and api_token.  If you specify neither, KeepClient
 831          will use one available from the Arvados configuration.
 832
 833        :local_store:
 834          If specified, this KeepClient will bypass Keep
 835          services, and save data to the named directory.  If unspecified,
 836          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 837          environment variable.  If you want to ensure KeepClient does not
 838          use local storage, pass in an empty string.  This is primarily
 839          intended to mock a server for testing.
 840
 841        :num_retries:
 842          The default number of times to retry failed requests.
 843          This will be used as the default num_retries value when get() and
 844          put() are called.  Default 10.
 845        """
 846        self.lock = threading.Lock()
 847        if proxy is None:
 848            if config.get('ARVADOS_KEEP_SERVICES'):
 849                proxy = config.get('ARVADOS_KEEP_SERVICES')
 850            else:
 851                proxy = config.get('ARVADOS_KEEP_PROXY')
 852        if api_token is None:
 853            if api_client is None:
 854                api_token = config.get('ARVADOS_API_TOKEN')
 855            else:
 856                api_token = api_client.api_token
 857        elif api_client is not None:
 858            raise ValueError(
 859                "can't build KeepClient with both API client and token")
 860        if local_store is None:
 861            local_store = os.environ.get('KEEP_LOCAL_STORE')
 862
 863        if api_client is None:
 864            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 865        else:
 866            self.insecure = api_client.insecure
 867
 868        self.block_cache = block_cache if block_cache else KeepBlockCache()
 869        self.timeout = timeout
 870        self.proxy_timeout = proxy_timeout
 871        self._user_agent_pool = queue.LifoQueue()
 872        self.upload_counter = _Counter()
 873        self.download_counter = _Counter()
 874        self.put_counter = _Counter()
 875        self.get_counter = _Counter()
 876        self.hits_counter = _Counter()
 877        self.misses_counter = _Counter()
 878        self._storage_classes_unsupported_warning = False
 879        self._default_classes = []
 880        if num_prefetch_threads is not None:
 881            self.num_prefetch_threads = num_prefetch_threads
 882        else:
 883            self.num_prefetch_threads = 2
 884        self._prefetch_queue = None
 885        self._prefetch_threads = None
 886
 887        if local_store:
 888            self.local_store = local_store
 889            self.head = self.local_store_head
 890            self.get = self.local_store_get
 891            self.put = self.local_store_put
 892        else:
 893            self.num_retries = num_retries
 894            self.max_replicas_per_service = None
 895            if proxy:
 896                proxy_uris = proxy.split()
 897                for i in range(len(proxy_uris)):
 898                    if not proxy_uris[i].endswith('/'):
 899                        proxy_uris[i] += '/'
 900                    # URL validation
 901                    url = urllib.parse.urlparse(proxy_uris[i])
 902                    if not (url.scheme and url.netloc):
 903                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 904                self.api_token = api_token
 905                self._gateway_services = {}
 906                self._keep_services = [{
 907                    'uuid': "00000-bi6l4-%015d" % idx,
 908                    'service_type': 'proxy',
 909                    '_service_root': uri,
 910                    } for idx, uri in enumerate(proxy_uris)]
 911                self._writable_services = self._keep_services
 912                self.using_proxy = True
 913                self._static_services_list = True
 914            else:
 915                # It's important to avoid instantiating an API client
 916                # unless we actually need one, for testing's sake.
 917                if api_client is None:
 918                    api_client = arvados.api('v1')
 919                self.api_client = api_client
 920                self.api_token = api_client.api_token
 921                self._gateway_services = {}
 922                self._keep_services = None
 923                self._writable_services = None
 924                self.using_proxy = None
 925                self._static_services_list = False
 926                try:
 927                    self._default_classes = [
 928                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 929                except KeyError:
 930                    # We're talking to an old cluster
 931                    pass
 932
 933    def current_timeout(self, attempt_number):
 934        """Return the appropriate timeout to use for this client.
 935
 936        The proxy timeout setting if the backend service is currently a proxy,
 937        the regular timeout setting otherwise.  The `attempt_number` indicates
 938        how many times the operation has been tried already (starting from 0
 939        for the first try), and scales the connection timeout portion of the
 940        return value accordingly.
 941
 942        """
 943        # TODO(twp): the timeout should be a property of a
 944        # _KeepService, not a KeepClient. See #4488.
 945        t = self.proxy_timeout if self.using_proxy else self.timeout
 946        if len(t) == 2:
 947            return (t[0] * (1 << attempt_number), t[1])
 948        else:
 949            return (t[0] * (1 << attempt_number), t[1], t[2])
 950    def _any_nondisk_services(self, service_list):
 951        return any(ks.get('service_type', 'disk') != 'disk'
 952                   for ks in service_list)
 953
 954    def build_services_list(self, force_rebuild=False):
 955        if (self._static_services_list or
 956              (self._keep_services and not force_rebuild)):
 957            return
 958        with self.lock:
 959            try:
 960                keep_services = self.api_client.keep_services().accessible()
 961            except Exception:  # API server predates Keep services.
 962                keep_services = self.api_client.keep_disks().list()
 963
 964            # Gateway services are only used when specified by UUID,
 965            # so there's nothing to gain by filtering them by
 966            # service_type.
 967            self._gateway_services = {ks['uuid']: ks for ks in
 968                                      keep_services.execute()['items']}
 969            if not self._gateway_services:
 970                raise arvados.errors.NoKeepServersError()
 971
 972            # Precompute the base URI for each service.
 973            for r in self._gateway_services.values():
 974                host = r['service_host']
 975                if not host.startswith('[') and host.find(':') >= 0:
 976                    # IPv6 URIs must be formatted like http://[::1]:80/...
 977                    host = '[' + host + ']'
 978                r['_service_root'] = "{}://{}:{:d}/".format(
 979                    'https' if r['service_ssl_flag'] else 'http',
 980                    host,
 981                    r['service_port'])
 982
 983            _logger.debug(str(self._gateway_services))
 984            self._keep_services = [
 985                ks for ks in self._gateway_services.values()
 986                if not ks.get('service_type', '').startswith('gateway:')]
 987            self._writable_services = [ks for ks in self._keep_services
 988                                       if not ks.get('read_only')]
 989
 990            # For disk type services, max_replicas_per_service is 1
 991            # It is unknown (unlimited) for other service types.
 992            if self._any_nondisk_services(self._writable_services):
 993                self.max_replicas_per_service = None
 994            else:
 995                self.max_replicas_per_service = 1
 996
 997    def _service_weight(self, data_hash, service_uuid):
 998        """Compute the weight of a Keep service endpoint for a data
 999        block with a known hash.
1000
1001        The weight is md5(h + u) where u is the last 15 characters of
1002        the service endpoint's UUID.
1003        """
1004        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1005
1006    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1007        """Return an array of Keep service endpoints, in the order in
1008        which they should be probed when reading or writing data with
1009        the given hash+hints.
1010        """
1011        self.build_services_list(force_rebuild)
1012
1013        sorted_roots = []
1014        # Use the services indicated by the given +K@... remote
1015        # service hints, if any are present and can be resolved to a
1016        # URI.
1017        for hint in locator.hints:
1018            if hint.startswith('K@'):
1019                if len(hint) == 7:
1020                    sorted_roots.append(
1021                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1022                elif len(hint) == 29:
1023                    svc = self._gateway_services.get(hint[2:])
1024                    if svc:
1025                        sorted_roots.append(svc['_service_root'])
1026
1027        # Sort the available local services by weight (heaviest first)
1028        # for this locator, and return their service_roots (base URIs)
1029        # in that order.
1030        use_services = self._keep_services
1031        if need_writable:
1032            use_services = self._writable_services
1033        self.using_proxy = self._any_nondisk_services(use_services)
1034        sorted_roots.extend([
1035            svc['_service_root'] for svc in sorted(
1036                use_services,
1037                reverse=True,
1038                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1039        _logger.debug("{}: {}".format(locator, sorted_roots))
1040        return sorted_roots
1041
1042    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1043        # roots_map is a dictionary, mapping Keep service root strings
1044        # to _KeepService objects.  Poll for Keep services, and add any
1045        # new ones to roots_map.  Return the current list of local
1046        # root strings.
1047        headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1048        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1049        for root in local_roots:
1050            if root not in roots_map:
1051                roots_map[root] = self._KeepService(
1052                    root, self._user_agent_pool,
1053                    upload_counter=self.upload_counter,
1054                    download_counter=self.download_counter,
1055                    headers=headers,
1056                    insecure=self.insecure)
1057        return local_roots
1058
1059    @staticmethod
1060    def _check_loop_result(result):
1061        # KeepClient RetryLoops should save results as a 2-tuple: the
1062        # actual result of the request, and the number of servers available
1063        # to receive the request this round.
1064        # This method returns True if there's a real result, False if
1065        # there are no more servers available, otherwise None.
1066        if isinstance(result, Exception):
1067            return None
1068        result, tried_server_count = result
1069        if (result is not None) and (result is not False):
1070            return True
1071        elif tried_server_count < 1:
1072            _logger.info("No more Keep services to try; giving up")
1073            return False
1074        else:
1075            return None
1076
1077    def get_from_cache(self, loc_s):
1078        """Fetch a block only if is in the cache, otherwise return None."""
1079        locator = KeepLocator(loc_s)
1080        slot = self.block_cache.get(locator.md5sum)
1081        if slot is not None and slot.ready.is_set():
1082            return slot.get()
1083        else:
1084            return None
1085
1086    def refresh_signature(self, loc):
1087        """Ask Keep to get the remote block and return its local signature"""
1088        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1089        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1090
1091    @retry.retry_method
1092    def head(self, loc_s, **kwargs):
1093        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1094
1095    @retry.retry_method
1096    def get(self, loc_s, **kwargs):
1097        return self._get_or_head(loc_s, method="GET", **kwargs)
1098
1099    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1100        """Get data from Keep.
1101
1102        This method fetches one or more blocks of data from Keep.  It
1103        sends a request each Keep service registered with the API
1104        server (or the proxy provided when this client was
1105        instantiated), then each service named in location hints, in
1106        sequence.  As soon as one service provides the data, it's
1107        returned.
1108
1109        Arguments:
1110        * loc_s: A string of one or more comma-separated locators to fetch.
1111          This method returns the concatenation of these blocks.
1112        * num_retries: The number of times to retry GET requests to
1113          *each* Keep server if it returns temporary failures, with
1114          exponential backoff.  Note that, in each loop, the method may try
1115          to fetch data from every available Keep service, along with any
1116          that are named in location hints in the locator.  The default value
1117          is set when the KeepClient is initialized.
1118        """
1119        if ',' in loc_s:
1120            return ''.join(self.get(x) for x in loc_s.split(','))
1121
1122        self.get_counter.add(1)
1123
1124        request_id = (request_id or
1125                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1126                      arvados.util.new_request_id())
1127        if headers is None:
1128            headers = {}
1129        headers['X-Request-Id'] = request_id
1130
1131        slot = None
1132        blob = None
1133        try:
1134            locator = KeepLocator(loc_s)
1135            if method == "GET":
1136                while slot is None:
1137                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1138                    if first:
1139                        # Fresh and empty "first time it is used" slot
1140                        break
1141                    if prefetch:
1142                        # this is request for a prefetch to fill in
1143                        # the cache, don't need to wait for the
1144                        # result, so if it is already in flight return
1145                        # immediately.  Clear 'slot' to prevent
1146                        # finally block from calling slot.set()
1147                        if slot.ready.is_set():
1148                            slot.get()
1149                        slot = None
1150                        return None
1151
1152                    blob = slot.get()
1153                    if blob is not None:
1154                        self.hits_counter.add(1)
1155                        return blob
1156
1157                    # If blob is None, this means either
1158                    #
1159                    # (a) another thread was fetching this block and
1160                    # failed with an error or
1161                    #
1162                    # (b) cache thrashing caused the slot to be
1163                    # evicted (content set to None) by another thread
1164                    # between the call to reserve_cache() and get().
1165                    #
1166                    # We'll handle these cases by reserving a new slot
1167                    # and then doing a full GET request.
1168                    slot = None
1169
1170            self.misses_counter.add(1)
1171
1172            # If the locator has hints specifying a prefix (indicating a
1173            # remote keepproxy) or the UUID of a local gateway service,
1174            # read data from the indicated service(s) instead of the usual
1175            # list of local disk services.
1176            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1177                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1178            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1179                               for hint in locator.hints if (
1180                                       hint.startswith('K@') and
1181                                       len(hint) == 29 and
1182                                       self._gateway_services.get(hint[2:])
1183                                       )])
1184            # Map root URLs to their _KeepService objects.
1185            roots_map = {
1186                root: self._KeepService(root, self._user_agent_pool,
1187                                       upload_counter=self.upload_counter,
1188                                       download_counter=self.download_counter,
1189                                       headers=headers,
1190                                       insecure=self.insecure)
1191                for root in hint_roots
1192            }
1193
1194            # See #3147 for a discussion of the loop implementation.  Highlights:
1195            # * Refresh the list of Keep services after each failure, in case
1196            #   it's being updated.
1197            # * Retry until we succeed, we're out of retries, or every available
1198            #   service has returned permanent failure.
1199            sorted_roots = []
1200            roots_map = {}
1201            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1202                                   backoff_start=2)
1203            for tries_left in loop:
1204                try:
1205                    sorted_roots = self.map_new_services(
1206                        roots_map, locator,
1207                        force_rebuild=(tries_left < num_retries),
1208                        need_writable=False,
1209                        headers=headers)
1210                except Exception as error:
1211                    loop.save_result(error)
1212                    continue
1213
1214                # Query _KeepService objects that haven't returned
1215                # permanent failure, in our specified shuffle order.
1216                services_to_try = [roots_map[root]
1217                                   for root in sorted_roots
1218                                   if roots_map[root].usable()]
1219                for keep_service in services_to_try:
1220                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1221                    if blob is not None:
1222                        break
1223                loop.save_result((blob, len(services_to_try)))
1224
1225            # Always cache the result, then return it if we succeeded.
1226            if loop.success():
1227                return blob
1228        finally:
1229            if slot is not None:
1230                self.block_cache.set(slot, blob)
1231
1232        # Q: Including 403 is necessary for the Keep tests to continue
1233        # passing, but maybe they should expect KeepReadError instead?
1234        not_founds = sum(1 for key in sorted_roots
1235                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1236        service_errors = ((key, roots_map[key].last_result()['error'])
1237                          for key in sorted_roots)
1238        if not roots_map:
1239            raise arvados.errors.KeepReadError(
1240                "[{}] failed to read {}: no Keep services available ({})".format(
1241                    request_id, loc_s, loop.last_result()))
1242        elif not_founds == len(sorted_roots):
1243            raise arvados.errors.NotFoundError(
1244                "[{}] {} not found".format(request_id, loc_s), service_errors)
1245        else:
1246            raise arvados.errors.KeepReadError(
1247                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1248
1249    @retry.retry_method
1250    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1251        """Save data in Keep.
1252
1253        This method will get a list of Keep services from the API server, and
1254        send the data to each one simultaneously in a new thread.  Once the
1255        uploads are finished, if enough copies are saved, this method returns
1256        the most recent HTTP response body.  If requests fail to upload
1257        enough copies, this method raises KeepWriteError.
1258
1259        Arguments:
1260        * data: The string of data to upload.
1261        * copies: The number of copies that the user requires be saved.
1262          Default 2.
1263        * num_retries: The number of times to retry PUT requests to
1264          *each* Keep server if it returns temporary failures, with
1265          exponential backoff.  The default value is set when the
1266          KeepClient is initialized.
1267        * classes: An optional list of storage class names where copies should
1268          be written.
1269        """
1270
1271        classes = classes or self._default_classes
1272
1273        if not isinstance(data, bytes):
1274            data = data.encode()
1275
1276        self.put_counter.add(1)
1277
1278        data_hash = hashlib.md5(data).hexdigest()
1279        loc_s = data_hash + '+' + str(len(data))
1280        if copies < 1:
1281            return loc_s
1282        locator = KeepLocator(loc_s)
1283
1284        request_id = (request_id or
1285                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1286                      arvados.util.new_request_id())
1287        headers = {
1288            'X-Request-Id': request_id,
1289            'X-Keep-Desired-Replicas': str(copies),
1290        }
1291        roots_map = {}
1292        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1293                               backoff_start=2)
1294        done_copies = 0
1295        done_classes = []
1296        for tries_left in loop:
1297            try:
1298                sorted_roots = self.map_new_services(
1299                    roots_map, locator,
1300                    force_rebuild=(tries_left < num_retries),
1301                    need_writable=True,
1302                    headers=headers)
1303            except Exception as error:
1304                loop.save_result(error)
1305                continue
1306
1307            pending_classes = []
1308            if done_classes is not None:
1309                pending_classes = list(set(classes) - set(done_classes))
1310            writer_pool = KeepClient._KeepWriterThreadPool(
1311                data=data,
1312                data_hash=data_hash,
1313                copies=copies - done_copies,
1314                max_service_replicas=self.max_replicas_per_service,
1315                timeout=self.current_timeout(num_retries - tries_left),
1316                classes=pending_classes,
1317            )
1318            for service_root, ks in [(root, roots_map[root])
1319                                     for root in sorted_roots]:
1320                if ks.finished():
1321                    continue
1322                writer_pool.add_task(ks, service_root)
1323            writer_pool.join()
1324            pool_copies, pool_classes = writer_pool.done()
1325            done_copies += pool_copies
1326            if (done_classes is not None) and (pool_classes is not None):
1327                done_classes += pool_classes
1328                loop.save_result(
1329                    (done_copies >= copies and set(done_classes) == set(classes),
1330                    writer_pool.total_task_nr))
1331            else:
1332                # Old keepstore contacted without storage classes support:
1333                # success is determined only by successful copies.
1334                #
1335                # Disable storage classes tracking from this point forward.
1336                if not self._storage_classes_unsupported_warning:
1337                    self._storage_classes_unsupported_warning = True
1338                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1339                done_classes = None
1340                loop.save_result(
1341                    (done_copies >= copies, writer_pool.total_task_nr))
1342
1343        if loop.success():
1344            return writer_pool.response()
1345        if not roots_map:
1346            raise arvados.errors.KeepWriteError(
1347                "[{}] failed to write {}: no Keep services available ({})".format(
1348                    request_id, data_hash, loop.last_result()))
1349        else:
1350            service_errors = ((key, roots_map[key].last_result()['error'])
1351                              for key in sorted_roots
1352                              if roots_map[key].last_result()['error'])
1353            raise arvados.errors.KeepWriteError(
1354                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1355                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1356
1357    def _block_prefetch_worker(self):
1358        """The background downloader thread."""
1359        while True:
1360            try:
1361                b = self._prefetch_queue.get()
1362                if b is None:
1363                    return
1364                self.get(b, prefetch=True)
1365            except Exception:
1366                _logger.exception("Exception doing block prefetch")
1367
1368    def _start_prefetch_threads(self):
1369        if self._prefetch_threads is None:
1370            with self.lock:
1371                if self._prefetch_threads is not None:
1372                    return
1373                self._prefetch_queue = queue.Queue()
1374                self._prefetch_threads = []
1375                for i in range(0, self.num_prefetch_threads):
1376                    thread = threading.Thread(target=self._block_prefetch_worker)
1377                    self._prefetch_threads.append(thread)
1378                    thread.daemon = True
1379                    thread.start()
1380
1381    def block_prefetch(self, locator):
1382        """
1383        This relies on the fact that KeepClient implements a block cache,
1384        so repeated requests for the same block will not result in repeated
1385        downloads (unless the block is evicted from the cache.)  This method
1386        does not block.
1387        """
1388
1389        if self.block_cache.get(locator) is not None:
1390            return
1391
1392        self._start_prefetch_threads()
1393        self._prefetch_queue.put(locator)
1394
1395    def stop_prefetch_threads(self):
1396        with self.lock:
1397            if self._prefetch_threads is not None:
1398                for t in self._prefetch_threads:
1399                    self._prefetch_queue.put(None)
1400                for t in self._prefetch_threads:
1401                    t.join()
1402            self._prefetch_threads = None
1403            self._prefetch_queue = None
1404
1405    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1406        """A stub for put().
1407
1408        This method is used in place of the real put() method when
1409        using local storage (see constructor's local_store argument).
1410
1411        copies and num_retries arguments are ignored: they are here
1412        only for the sake of offering the same call signature as
1413        put().
1414
1415        Data stored this way can be retrieved via local_store_get().
1416        """
1417        md5 = hashlib.md5(data).hexdigest()
1418        locator = '%s+%d' % (md5, len(data))
1419        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1420            f.write(data)
1421        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1422                  os.path.join(self.local_store, md5))
1423        return locator
1424
1425    def local_store_get(self, loc_s, num_retries=None):
1426        """Companion to local_store_put()."""
1427        try:
1428            locator = KeepLocator(loc_s)
1429        except ValueError:
1430            raise arvados.errors.NotFoundError(
1431                "Invalid data locator: '%s'" % loc_s)
1432        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1433            return b''
1434        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1435            return f.read()
1436
1437    def local_store_head(self, loc_s, num_retries=None):
1438        """Companion to local_store_put()."""
1439        try:
1440            locator = KeepLocator(loc_s)
1441        except ValueError:
1442            raise arvados.errors.NotFoundError(
1443                "Invalid data locator: '%s'" % loc_s)
1444        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1445            return True
1446        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1447            return True
global_client_object = None
class KeepLocator:
 51class KeepLocator(object):
 52    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
 53    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 54
 55    def __init__(self, locator_str):
 56        self.hints = []
 57        self._perm_sig = None
 58        self._perm_expiry = None
 59        pieces = iter(locator_str.split('+'))
 60        self.md5sum = next(pieces)
 61        try:
 62            self.size = int(next(pieces))
 63        except StopIteration:
 64            self.size = None
 65        for hint in pieces:
 66            if self.HINT_RE.match(hint) is None:
 67                raise ValueError("invalid hint format: {}".format(hint))
 68            elif hint.startswith('A'):
 69                self.parse_permission_hint(hint)
 70            else:
 71                self.hints.append(hint)
 72
 73    def __str__(self):
 74        return '+'.join(
 75            str(s)
 76            for s in [self.md5sum, self.size,
 77                      self.permission_hint()] + self.hints
 78            if s is not None)
 79
 80    def stripped(self):
 81        if self.size is not None:
 82            return "%s+%i" % (self.md5sum, self.size)
 83        else:
 84            return self.md5sum
 85
 86    def _make_hex_prop(name, length):
 87        # Build and return a new property with the given name that
 88        # must be a hex string of the given length.
 89        data_name = '_{}'.format(name)
 90        def getter(self):
 91            return getattr(self, data_name)
 92        def setter(self, hex_str):
 93            if not arvados.util.is_hex(hex_str, length):
 94                raise ValueError("{} is not a {}-digit hex string: {!r}".
 95                                 format(name, length, hex_str))
 96            setattr(self, data_name, hex_str)
 97        return property(getter, setter)
 98
 99    md5sum = _make_hex_prop('md5sum', 32)
100    perm_sig = _make_hex_prop('perm_sig', 40)
101
102    @property
103    def perm_expiry(self):
104        return self._perm_expiry
105
106    @perm_expiry.setter
107    def perm_expiry(self, value):
108        if not arvados.util.is_hex(value, 1, 8):
109            raise ValueError(
110                "permission timestamp must be a hex Unix timestamp: {}".
111                format(value))
112        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113
114    def permission_hint(self):
115        data = [self.perm_sig, self.perm_expiry]
116        if None in data:
117            return None
118        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119        return "A{}@{:08x}".format(*data)
120
121    def parse_permission_hint(self, s):
122        try:
123            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124        except IndexError:
125            raise ValueError("bad permission hint {}".format(s))
126
127    def permission_expired(self, as_of_dt=None):
128        if self.perm_expiry is None:
129            return False
130        elif as_of_dt is None:
131            as_of_dt = datetime.datetime.now()
132        return self.perm_expiry <= as_of_dt
KeepLocator(locator_str)
55    def __init__(self, locator_str):
56        self.hints = []
57        self._perm_sig = None
58        self._perm_expiry = None
59        pieces = iter(locator_str.split('+'))
60        self.md5sum = next(pieces)
61        try:
62            self.size = int(next(pieces))
63        except StopIteration:
64            self.size = None
65        for hint in pieces:
66            if self.HINT_RE.match(hint) is None:
67                raise ValueError("invalid hint format: {}".format(hint))
68            elif hint.startswith('A'):
69                self.parse_permission_hint(hint)
70            else:
71                self.hints.append(hint)
EPOCH_DATETIME = datetime.datetime(1970, 1, 1, 0, 0)
HINT_RE = re.compile('^[A-Z][A-Za-z0-9@_-]+$')
hints
md5sum
90        def getter(self):
91            return getattr(self, data_name)
def stripped(self):
80    def stripped(self):
81        if self.size is not None:
82            return "%s+%i" % (self.md5sum, self.size)
83        else:
84            return self.md5sum
perm_sig
90        def getter(self):
91            return getattr(self, data_name)
perm_expiry
102    @property
103    def perm_expiry(self):
104        return self._perm_expiry
def permission_hint(self):
114    def permission_hint(self):
115        data = [self.perm_sig, self.perm_expiry]
116        if None in data:
117            return None
118        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119        return "A{}@{:08x}".format(*data)
def parse_permission_hint(self, s):
121    def parse_permission_hint(self, s):
122        try:
123            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124        except IndexError:
125            raise ValueError("bad permission hint {}".format(s))
def permission_expired(self, as_of_dt=None):
127    def permission_expired(self, as_of_dt=None):
128        if self.perm_expiry is None:
129            return False
130        elif as_of_dt is None:
131            as_of_dt = datetime.datetime.now()
132        return self.perm_expiry <= as_of_dt
class KeepBlockCache:
135class KeepBlockCache(object):
136    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
137        self.cache_max = cache_max
138        self._cache = collections.OrderedDict()
139        self._cache_lock = threading.Lock()
140        self._max_slots = max_slots
141        self._disk_cache = disk_cache
142        self._disk_cache_dir = disk_cache_dir
143        self._cache_updating = threading.Condition(self._cache_lock)
144
145        if self._disk_cache and self._disk_cache_dir is None:
146            self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
147
148        if self._max_slots == 0:
149            if self._disk_cache:
150                # Each block uses two file descriptors, one used to
151                # open it initially and hold the flock(), and a second
152                # hidden one used by mmap().
153                #
154                # Set max slots to 1/8 of maximum file handles.  This
155                # means we'll use at most 1/4 of total file handles.
156                #
157                # NOFILE typically defaults to 1024 on Linux so this
158                # is 128 slots (256 file handles), which means we can
159                # cache up to 8 GiB of 64 MiB blocks.  This leaves
160                # 768 file handles for sockets and other stuff.
161                #
162                # When we want the ability to have more cache (e.g. in
163                # arv-mount) we'll increase rlimit before calling
164                # this.
165                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
166            else:
167                # RAM cache slots
168                self._max_slots = 512
169
170        if self.cache_max == 0:
171            if self._disk_cache:
172                fs = os.statvfs(self._disk_cache_dir)
173                # Calculation of available space incorporates existing cache usage
174                existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
175                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
176                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177                # pick smallest of:
178                # 10% of total disk size
179                # 25% of available space
180                # max_slots * 64 MiB
181                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
182            else:
183                # 256 MiB in RAM
184                self.cache_max = (256 * 1024 * 1024)
185
186        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
187
188        self.cache_total = 0
189        if self._disk_cache:
190            self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
191            for slot in self._cache.values():
192                self.cache_total += slot.size()
193            self.cap_cache()
194
195    class _CacheSlot:
196        __slots__ = ("locator", "ready", "content")
197
198        def __init__(self, locator):
199            self.locator = locator
200            self.ready = threading.Event()
201            self.content = None
202
203        def get(self):
204            self.ready.wait()
205            return self.content
206
207        def set(self, value):
208            if self.content is not None:
209                return False
210            self.content = value
211            self.ready.set()
212            return True
213
214        def size(self):
215            if self.content is None:
216                return 0
217            else:
218                return len(self.content)
219
220        def evict(self):
221            self.content = None
222
223
224    def _resize_cache(self, cache_max, max_slots):
225        # Try and make sure the contents of the cache do not exceed
226        # the supplied maximums.
227
228        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
229            return
230
231        _evict_candidates = collections.deque(self._cache.values())
232        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
233            slot = _evict_candidates.popleft()
234            if not slot.ready.is_set():
235                continue
236
237            sz = slot.size()
238            slot.evict()
239            self.cache_total -= sz
240            del self._cache[slot.locator]
241
242
243    def cap_cache(self):
244        '''Cap the cache size to self.cache_max'''
245        with self._cache_updating:
246            self._resize_cache(self.cache_max, self._max_slots)
247            self._cache_updating.notify_all()
248
249    def _get(self, locator):
250        # Test if the locator is already in the cache
251        if locator in self._cache:
252            n = self._cache[locator]
253            if n.ready.is_set() and n.content is None:
254                del self._cache[n.locator]
255                return None
256            self._cache.move_to_end(locator)
257            return n
258        if self._disk_cache:
259            # see if it exists on disk
260            n = diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
261            if n is not None:
262                self._cache[n.locator] = n
263                self.cache_total += n.size()
264                return n
265        return None
266
267    def get(self, locator):
268        with self._cache_lock:
269            return self._get(locator)
270
271    def reserve_cache(self, locator):
272        '''Reserve a cache slot for the specified locator,
273        or return the existing slot.'''
274        with self._cache_updating:
275            n = self._get(locator)
276            if n:
277                return n, False
278            else:
279                # Add a new cache slot for the locator
280                self._resize_cache(self.cache_max, self._max_slots-1)
281                while len(self._cache) >= self._max_slots:
282                    # If there isn't a slot available, need to wait
283                    # for something to happen that releases one of the
284                    # cache slots.  Idle for 200 ms or woken up by
285                    # another thread
286                    self._cache_updating.wait(timeout=0.2)
287                    self._resize_cache(self.cache_max, self._max_slots-1)
288
289                if self._disk_cache:
290                    n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291                else:
292                    n = KeepBlockCache._CacheSlot(locator)
293                self._cache[n.locator] = n
294                return n, True
295
296    def set(self, slot, blob):
297        try:
298            if slot.set(blob):
299                self.cache_total += slot.size()
300            return
301        except OSError as e:
302            if e.errno == errno.ENOMEM:
303                # Reduce max slots to current - 4, cap cache and retry
304                with self._cache_lock:
305                    self._max_slots = max(4, len(self._cache) - 4)
306            elif e.errno == errno.ENOSPC:
307                # Reduce disk max space to current - 256 MiB, cap cache and retry
308                with self._cache_lock:
309                    sm = sum(st.size() for st in self._cache.values())
310                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
311            elif e.errno == errno.ENODEV:
312                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
313        except Exception as e:
314            pass
315        finally:
316            # Check if we should evict things from the cache.  Either
317            # because we added a new thing or there was an error and
318            # we possibly adjusted the limits down, so we might need
319            # to push something out.
320            self.cap_cache()
321
322        try:
323            # Only gets here if there was an error the first time. The
324            # exception handler adjusts limits downward in some cases
325            # to free up resources, which would make the operation
326            # succeed.
327            if slot.set(blob):
328                self.cache_total += slot.size()
329        except Exception as e:
330            # It failed again.  Give up.
331            slot.set(None)
332            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
333
334        self.cap_cache()
335
336    def clear(self):
337        with self._cache_lock:
338            self._cache.clear()
339            self.cache_total = 0
KeepBlockCache(cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None)
136    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
137        self.cache_max = cache_max
138        self._cache = collections.OrderedDict()
139        self._cache_lock = threading.Lock()
140        self._max_slots = max_slots
141        self._disk_cache = disk_cache
142        self._disk_cache_dir = disk_cache_dir
143        self._cache_updating = threading.Condition(self._cache_lock)
144
145        if self._disk_cache and self._disk_cache_dir is None:
146            self._disk_cache_dir = str(basedirs.BaseDirectories('CACHE').storage_path('keep'))
147
148        if self._max_slots == 0:
149            if self._disk_cache:
150                # Each block uses two file descriptors, one used to
151                # open it initially and hold the flock(), and a second
152                # hidden one used by mmap().
153                #
154                # Set max slots to 1/8 of maximum file handles.  This
155                # means we'll use at most 1/4 of total file handles.
156                #
157                # NOFILE typically defaults to 1024 on Linux so this
158                # is 128 slots (256 file handles), which means we can
159                # cache up to 8 GiB of 64 MiB blocks.  This leaves
160                # 768 file handles for sockets and other stuff.
161                #
162                # When we want the ability to have more cache (e.g. in
163                # arv-mount) we'll increase rlimit before calling
164                # this.
165                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
166            else:
167                # RAM cache slots
168                self._max_slots = 512
169
170        if self.cache_max == 0:
171            if self._disk_cache:
172                fs = os.statvfs(self._disk_cache_dir)
173                # Calculation of available space incorporates existing cache usage
174                existing_usage = diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
175                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
176                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
177                # pick smallest of:
178                # 10% of total disk size
179                # 25% of available space
180                # max_slots * 64 MiB
181                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
182            else:
183                # 256 MiB in RAM
184                self.cache_max = (256 * 1024 * 1024)
185
186        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
187
188        self.cache_total = 0
189        if self._disk_cache:
190            self._cache = diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
191            for slot in self._cache.values():
192                self.cache_total += slot.size()
193            self.cap_cache()
cache_max
cache_total
def cap_cache(self):
243    def cap_cache(self):
244        '''Cap the cache size to self.cache_max'''
245        with self._cache_updating:
246            self._resize_cache(self.cache_max, self._max_slots)
247            self._cache_updating.notify_all()

Cap the cache size to self.cache_max

def get(self, locator):
267    def get(self, locator):
268        with self._cache_lock:
269            return self._get(locator)
def reserve_cache(self, locator):
271    def reserve_cache(self, locator):
272        '''Reserve a cache slot for the specified locator,
273        or return the existing slot.'''
274        with self._cache_updating:
275            n = self._get(locator)
276            if n:
277                return n, False
278            else:
279                # Add a new cache slot for the locator
280                self._resize_cache(self.cache_max, self._max_slots-1)
281                while len(self._cache) >= self._max_slots:
282                    # If there isn't a slot available, need to wait
283                    # for something to happen that releases one of the
284                    # cache slots.  Idle for 200 ms or woken up by
285                    # another thread
286                    self._cache_updating.wait(timeout=0.2)
287                    self._resize_cache(self.cache_max, self._max_slots-1)
288
289                if self._disk_cache:
290                    n = diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
291                else:
292                    n = KeepBlockCache._CacheSlot(locator)
293                self._cache[n.locator] = n
294                return n, True

Reserve a cache slot for the specified locator, or return the existing slot.

def set(self, slot, blob):
296    def set(self, slot, blob):
297        try:
298            if slot.set(blob):
299                self.cache_total += slot.size()
300            return
301        except OSError as e:
302            if e.errno == errno.ENOMEM:
303                # Reduce max slots to current - 4, cap cache and retry
304                with self._cache_lock:
305                    self._max_slots = max(4, len(self._cache) - 4)
306            elif e.errno == errno.ENOSPC:
307                # Reduce disk max space to current - 256 MiB, cap cache and retry
308                with self._cache_lock:
309                    sm = sum(st.size() for st in self._cache.values())
310                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
311            elif e.errno == errno.ENODEV:
312                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
313        except Exception as e:
314            pass
315        finally:
316            # Check if we should evict things from the cache.  Either
317            # because we added a new thing or there was an error and
318            # we possibly adjusted the limits down, so we might need
319            # to push something out.
320            self.cap_cache()
321
322        try:
323            # Only gets here if there was an error the first time. The
324            # exception handler adjusts limits downward in some cases
325            # to free up resources, which would make the operation
326            # succeed.
327            if slot.set(blob):
328                self.cache_total += slot.size()
329        except Exception as e:
330            # It failed again.  Give up.
331            slot.set(None)
332            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
333
334        self.cap_cache()
def clear(self):
336    def clear(self):
337        with self._cache_lock:
338            self._cache.clear()
339            self.cache_total = 0
class KeepClient:
 355class KeepClient(object):
 356    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 357    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 358
 359    class _KeepService(PyCurlHelper):
 360        """Make requests to a single Keep service, and track results.
 361
 362        A _KeepService is intended to last long enough to perform one
 363        transaction (GET or PUT) against one Keep service. This can
 364        involve calling either get() or put() multiple times in order
 365        to retry after transient failures. However, calling both get()
 366        and put() on a single instance -- or using the same instance
 367        to access two different Keep services -- will not produce
 368        sensible behavior.
 369        """
 370
 371        HTTP_ERRORS = (
 372            socket.error,
 373            ssl.SSLError,
 374            arvados.errors.HttpError,
 375        )
 376
 377        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 378                     upload_counter=None,
 379                     download_counter=None,
 380                     headers={},
 381                     insecure=False):
 382            super().__init__()
 383            self.root = root
 384            self._user_agent_pool = user_agent_pool
 385            self._result = {'error': None}
 386            self._usable = True
 387            self._session = None
 388            self._socket = None
 389            self.get_headers = {'Accept': 'application/octet-stream'}
 390            self.get_headers.update(headers)
 391            self.put_headers = headers
 392            self.upload_counter = upload_counter
 393            self.download_counter = download_counter
 394            self.insecure = insecure
 395
 396        def usable(self):
 397            """Is it worth attempting a request?"""
 398            return self._usable
 399
 400        def finished(self):
 401            """Did the request succeed or encounter permanent failure?"""
 402            return self._result['error'] == False or not self._usable
 403
 404        def last_result(self):
 405            return self._result
 406
 407        def _get_user_agent(self):
 408            try:
 409                return self._user_agent_pool.get(block=False)
 410            except queue.Empty:
 411                return pycurl.Curl()
 412
 413        def _put_user_agent(self, ua):
 414            try:
 415                ua.reset()
 416                self._user_agent_pool.put(ua, block=False)
 417            except:
 418                ua.close()
 419
 420        def get(self, locator, method="GET", timeout=None):
 421            # locator is a KeepLocator object.
 422            url = self.root + str(locator)
 423            _logger.debug("Request: %s %s", method, url)
 424            curl = self._get_user_agent()
 425            ok = None
 426            try:
 427                with Timer() as t:
 428                    self._headers = {}
 429                    response_body = BytesIO()
 430                    curl.setopt(pycurl.NOSIGNAL, 1)
 431                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 432                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 433                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 434                    curl.setopt(pycurl.HTTPHEADER, [
 435                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 436                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 437                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 438                    if self.insecure:
 439                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 440                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 441                    else:
 442                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 443                    if method == "HEAD":
 444                        curl.setopt(pycurl.NOBODY, True)
 445                    else:
 446                        curl.setopt(pycurl.HTTPGET, True)
 447                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 448
 449                    try:
 450                        curl.perform()
 451                    except Exception as e:
 452                        raise arvados.errors.HttpError(0, str(e))
 453                    finally:
 454                        if self._socket:
 455                            self._socket.close()
 456                            self._socket = None
 457                    self._result = {
 458                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 459                        'body': response_body.getvalue(),
 460                        'headers': self._headers,
 461                        'error': False,
 462                    }
 463
 464                ok = retry.check_http_response_success(self._result['status_code'])
 465                if not ok:
 466                    self._result['error'] = arvados.errors.HttpError(
 467                        self._result['status_code'],
 468                        self._headers.get('x-status-line', 'Error'))
 469            except self.HTTP_ERRORS as e:
 470                self._result = {
 471                    'error': e,
 472                }
 473            self._usable = ok != False
 474            if self._result.get('status_code', None):
 475                # The client worked well enough to get an HTTP status
 476                # code, so presumably any problems are just on the
 477                # server side and it's OK to reuse the client.
 478                self._put_user_agent(curl)
 479            else:
 480                # Don't return this client to the pool, in case it's
 481                # broken.
 482                curl.close()
 483            if not ok:
 484                _logger.debug("Request fail: GET %s => %s: %s",
 485                              url, type(self._result['error']), str(self._result['error']))
 486                return None
 487            if method == "HEAD":
 488                _logger.info("HEAD %s: %s bytes",
 489                         self._result['status_code'],
 490                         self._result.get('content-length'))
 491                if self._result['headers'].get('x-keep-locator'):
 492                    # This is a response to a remote block copy request, return
 493                    # the local copy block locator.
 494                    return self._result['headers'].get('x-keep-locator')
 495                return True
 496
 497            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 498                         self._result['status_code'],
 499                         len(self._result['body']),
 500                         t.msecs,
 501                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 502
 503            if self.download_counter:
 504                self.download_counter.add(len(self._result['body']))
 505            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 506            if resp_md5 != locator.md5sum:
 507                _logger.warning("Checksum fail: md5(%s) = %s",
 508                                url, resp_md5)
 509                self._result['error'] = arvados.errors.HttpError(
 510                    0, 'Checksum fail')
 511                return None
 512            return self._result['body']
 513
 514        def put(self, hash_s, body, timeout=None, headers={}):
 515            put_headers = copy.copy(self.put_headers)
 516            put_headers.update(headers)
 517            url = self.root + hash_s
 518            _logger.debug("Request: PUT %s", url)
 519            curl = self._get_user_agent()
 520            ok = None
 521            try:
 522                with Timer() as t:
 523                    self._headers = {}
 524                    body_reader = BytesIO(body)
 525                    response_body = BytesIO()
 526                    curl.setopt(pycurl.NOSIGNAL, 1)
 527                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 528                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 529                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 530                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 531                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 532                    # response) instead of sending the request body immediately.
 533                    # This allows the server to reject the request if the request
 534                    # is invalid or the server is read-only, without waiting for
 535                    # the client to send the entire block.
 536                    curl.setopt(pycurl.UPLOAD, True)
 537                    curl.setopt(pycurl.INFILESIZE, len(body))
 538                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 539                    curl.setopt(pycurl.HTTPHEADER, [
 540                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 541                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 542                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 543                    if self.insecure:
 544                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 545                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 546                    else:
 547                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 548                    self._setcurltimeouts(curl, timeout)
 549                    try:
 550                        curl.perform()
 551                    except Exception as e:
 552                        raise arvados.errors.HttpError(0, str(e))
 553                    finally:
 554                        if self._socket:
 555                            self._socket.close()
 556                            self._socket = None
 557                    self._result = {
 558                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 559                        'body': response_body.getvalue().decode('utf-8'),
 560                        'headers': self._headers,
 561                        'error': False,
 562                    }
 563                ok = retry.check_http_response_success(self._result['status_code'])
 564                if not ok:
 565                    self._result['error'] = arvados.errors.HttpError(
 566                        self._result['status_code'],
 567                        self._headers.get('x-status-line', 'Error'))
 568            except self.HTTP_ERRORS as e:
 569                self._result = {
 570                    'error': e,
 571                }
 572            self._usable = ok != False # still usable if ok is True or None
 573            if self._result.get('status_code', None):
 574                # Client is functional. See comment in get().
 575                self._put_user_agent(curl)
 576            else:
 577                curl.close()
 578            if not ok:
 579                _logger.debug("Request fail: PUT %s => %s: %s",
 580                              url, type(self._result['error']), str(self._result['error']))
 581                return False
 582            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 583                         self._result['status_code'],
 584                         len(body),
 585                         t.msecs,
 586                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 587            if self.upload_counter:
 588                self.upload_counter.add(len(body))
 589            return True
 590
 591
 592    class _KeepWriterQueue(queue.Queue):
 593        def __init__(self, copies, classes=[]):
 594            queue.Queue.__init__(self) # Old-style superclass
 595            self.wanted_copies = copies
 596            self.wanted_storage_classes = classes
 597            self.successful_copies = 0
 598            self.confirmed_storage_classes = {}
 599            self.response = None
 600            self.storage_classes_tracking = True
 601            self.queue_data_lock = threading.RLock()
 602            self.pending_tries = max(copies, len(classes))
 603            self.pending_tries_notification = threading.Condition()
 604
 605        def write_success(self, response, replicas_nr, classes_confirmed):
 606            with self.queue_data_lock:
 607                self.successful_copies += replicas_nr
 608                if classes_confirmed is None:
 609                    self.storage_classes_tracking = False
 610                elif self.storage_classes_tracking:
 611                    for st_class, st_copies in classes_confirmed.items():
 612                        try:
 613                            self.confirmed_storage_classes[st_class] += st_copies
 614                        except KeyError:
 615                            self.confirmed_storage_classes[st_class] = st_copies
 616                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 617                self.response = response
 618            with self.pending_tries_notification:
 619                self.pending_tries_notification.notify_all()
 620
 621        def write_fail(self, ks):
 622            with self.pending_tries_notification:
 623                self.pending_tries += 1
 624                self.pending_tries_notification.notify()
 625
 626        def pending_copies(self):
 627            with self.queue_data_lock:
 628                return self.wanted_copies - self.successful_copies
 629
 630        def satisfied_classes(self):
 631            with self.queue_data_lock:
 632                if not self.storage_classes_tracking:
 633                    # Notifies disabled storage classes expectation to
 634                    # the outer loop.
 635                    return None
 636            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 637
 638        def pending_classes(self):
 639            with self.queue_data_lock:
 640                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 641                    return []
 642                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 643                for st_class, st_copies in self.confirmed_storage_classes.items():
 644                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 645                        unsatisfied_classes.remove(st_class)
 646                return unsatisfied_classes
 647
 648        def get_next_task(self):
 649            with self.pending_tries_notification:
 650                while True:
 651                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 652                        # This notify_all() is unnecessary --
 653                        # write_success() already called notify_all()
 654                        # when pending<1 became true, so it's not
 655                        # possible for any other thread to be in
 656                        # wait() now -- but it's cheap insurance
 657                        # against deadlock so we do it anyway:
 658                        self.pending_tries_notification.notify_all()
 659                        # Drain the queue and then raise Queue.Empty
 660                        while True:
 661                            self.get_nowait()
 662                            self.task_done()
 663                    elif self.pending_tries > 0:
 664                        service, service_root = self.get_nowait()
 665                        if service.finished():
 666                            self.task_done()
 667                            continue
 668                        self.pending_tries -= 1
 669                        return service, service_root
 670                    elif self.empty():
 671                        self.pending_tries_notification.notify_all()
 672                        raise queue.Empty
 673                    else:
 674                        self.pending_tries_notification.wait()
 675
 676
 677    class _KeepWriterThreadPool:
 678        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 679            self.total_task_nr = 0
 680            if (not max_service_replicas) or (max_service_replicas >= copies):
 681                num_threads = 1
 682            else:
 683                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 684            _logger.debug("Pool max threads is %d", num_threads)
 685            self.workers = []
 686            self.queue = KeepClient._KeepWriterQueue(copies, classes)
 687            # Create workers
 688            for _ in range(num_threads):
 689                w = KeepClient._KeepWriterThread(self.queue, data, data_hash, timeout)
 690                self.workers.append(w)
 691
 692        def add_task(self, ks, service_root):
 693            self.queue.put((ks, service_root))
 694            self.total_task_nr += 1
 695
 696        def done(self):
 697            return self.queue.successful_copies, self.queue.satisfied_classes()
 698
 699        def join(self):
 700            # Start workers
 701            for worker in self.workers:
 702                worker.start()
 703            # Wait for finished work
 704            self.queue.join()
 705
 706        def response(self):
 707            return self.queue.response
 708
 709
 710    class _KeepWriterThread(threading.Thread):
 711        class TaskFailed(RuntimeError):
 712            """Exception for failed Keep writes
 713
 714            TODO: Move this class to the module top level and document it
 715
 716            @private
 717            """
 718
 719
 720        def __init__(self, queue, data, data_hash, timeout=None):
 721            super().__init__()
 722            self.timeout = timeout
 723            self.queue = queue
 724            self.data = data
 725            self.data_hash = data_hash
 726            self.daemon = True
 727
 728        def run(self):
 729            while True:
 730                try:
 731                    service, service_root = self.queue.get_next_task()
 732                except queue.Empty:
 733                    return
 734                try:
 735                    locator, copies, classes = self.do_task(service, service_root)
 736                except Exception as e:
 737                    if not isinstance(e, self.TaskFailed):
 738                        _logger.exception("Exception in _KeepWriterThread")
 739                    self.queue.write_fail(service)
 740                else:
 741                    self.queue.write_success(locator, copies, classes)
 742                finally:
 743                    self.queue.task_done()
 744
 745        def do_task(self, service, service_root):
 746            classes = self.queue.pending_classes()
 747            headers = {}
 748            if len(classes) > 0:
 749                classes.sort()
 750                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 751            success = bool(service.put(self.data_hash,
 752                                        self.data,
 753                                        timeout=self.timeout,
 754                                        headers=headers))
 755            result = service.last_result()
 756
 757            if not success:
 758                if result.get('status_code'):
 759                    _logger.debug("Request fail: PUT %s => %s %s",
 760                                  self.data_hash,
 761                                  result.get('status_code'),
 762                                  result.get('body'))
 763                raise self.TaskFailed()
 764
 765            _logger.debug("_KeepWriterThread %s succeeded %s+%i %s",
 766                          str(threading.current_thread()),
 767                          self.data_hash,
 768                          len(self.data),
 769                          service_root)
 770            try:
 771                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 772            except (KeyError, ValueError):
 773                replicas_stored = 1
 774
 775            classes_confirmed = {}
 776            try:
 777                scch = result['headers']['x-keep-storage-classes-confirmed']
 778                for confirmation in scch.replace(' ', '').split(','):
 779                    if '=' in confirmation:
 780                        stored_class, stored_copies = confirmation.split('=')[:2]
 781                        classes_confirmed[stored_class] = int(stored_copies)
 782            except (KeyError, ValueError):
 783                # Storage classes confirmed header missing or corrupt
 784                classes_confirmed = None
 785
 786            return result['body'].strip(), replicas_stored, classes_confirmed
 787
 788
 789    def __init__(self, api_client=None, proxy=None,
 790                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 791                 api_token=None, local_store=None, block_cache=None,
 792                 num_retries=10, session=None, num_prefetch_threads=None):
 793        """Initialize a new KeepClient.
 794
 795        Arguments:
 796        :api_client:
 797          The API client to use to find Keep services.  If not
 798          provided, KeepClient will build one from available Arvados
 799          configuration.
 800
 801        :proxy:
 802          If specified, this KeepClient will send requests to this Keep
 803          proxy.  Otherwise, KeepClient will fall back to the setting of the
 804          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 805          If you want to KeepClient does not use a proxy, pass in an empty
 806          string.
 807
 808        :timeout:
 809          The initial timeout (in seconds) for HTTP requests to Keep
 810          non-proxy servers.  A tuple of three floats is interpreted as
 811          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 812          will be aborted if the average traffic rate falls below
 813          minimum_bandwidth bytes per second over an interval of read_timeout
 814          seconds. Because timeouts are often a result of transient server
 815          load, the actual connection timeout will be increased by a factor
 816          of two on each retry.
 817          Default: (2, 256, 32768).
 818
 819        :proxy_timeout:
 820          The initial timeout (in seconds) for HTTP requests to
 821          Keep proxies. A tuple of three floats is interpreted as
 822          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 823          described above for adjusting connection timeouts on retry also
 824          applies.
 825          Default: (20, 256, 32768).
 826
 827        :api_token:
 828          If you're not using an API client, but only talking
 829          directly to a Keep proxy, this parameter specifies an API token
 830          to authenticate Keep requests.  It is an error to specify both
 831          api_client and api_token.  If you specify neither, KeepClient
 832          will use one available from the Arvados configuration.
 833
 834        :local_store:
 835          If specified, this KeepClient will bypass Keep
 836          services, and save data to the named directory.  If unspecified,
 837          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 838          environment variable.  If you want to ensure KeepClient does not
 839          use local storage, pass in an empty string.  This is primarily
 840          intended to mock a server for testing.
 841
 842        :num_retries:
 843          The default number of times to retry failed requests.
 844          This will be used as the default num_retries value when get() and
 845          put() are called.  Default 10.
 846        """
 847        self.lock = threading.Lock()
 848        if proxy is None:
 849            if config.get('ARVADOS_KEEP_SERVICES'):
 850                proxy = config.get('ARVADOS_KEEP_SERVICES')
 851            else:
 852                proxy = config.get('ARVADOS_KEEP_PROXY')
 853        if api_token is None:
 854            if api_client is None:
 855                api_token = config.get('ARVADOS_API_TOKEN')
 856            else:
 857                api_token = api_client.api_token
 858        elif api_client is not None:
 859            raise ValueError(
 860                "can't build KeepClient with both API client and token")
 861        if local_store is None:
 862            local_store = os.environ.get('KEEP_LOCAL_STORE')
 863
 864        if api_client is None:
 865            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 866        else:
 867            self.insecure = api_client.insecure
 868
 869        self.block_cache = block_cache if block_cache else KeepBlockCache()
 870        self.timeout = timeout
 871        self.proxy_timeout = proxy_timeout
 872        self._user_agent_pool = queue.LifoQueue()
 873        self.upload_counter = _Counter()
 874        self.download_counter = _Counter()
 875        self.put_counter = _Counter()
 876        self.get_counter = _Counter()
 877        self.hits_counter = _Counter()
 878        self.misses_counter = _Counter()
 879        self._storage_classes_unsupported_warning = False
 880        self._default_classes = []
 881        if num_prefetch_threads is not None:
 882            self.num_prefetch_threads = num_prefetch_threads
 883        else:
 884            self.num_prefetch_threads = 2
 885        self._prefetch_queue = None
 886        self._prefetch_threads = None
 887
 888        if local_store:
 889            self.local_store = local_store
 890            self.head = self.local_store_head
 891            self.get = self.local_store_get
 892            self.put = self.local_store_put
 893        else:
 894            self.num_retries = num_retries
 895            self.max_replicas_per_service = None
 896            if proxy:
 897                proxy_uris = proxy.split()
 898                for i in range(len(proxy_uris)):
 899                    if not proxy_uris[i].endswith('/'):
 900                        proxy_uris[i] += '/'
 901                    # URL validation
 902                    url = urllib.parse.urlparse(proxy_uris[i])
 903                    if not (url.scheme and url.netloc):
 904                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 905                self.api_token = api_token
 906                self._gateway_services = {}
 907                self._keep_services = [{
 908                    'uuid': "00000-bi6l4-%015d" % idx,
 909                    'service_type': 'proxy',
 910                    '_service_root': uri,
 911                    } for idx, uri in enumerate(proxy_uris)]
 912                self._writable_services = self._keep_services
 913                self.using_proxy = True
 914                self._static_services_list = True
 915            else:
 916                # It's important to avoid instantiating an API client
 917                # unless we actually need one, for testing's sake.
 918                if api_client is None:
 919                    api_client = arvados.api('v1')
 920                self.api_client = api_client
 921                self.api_token = api_client.api_token
 922                self._gateway_services = {}
 923                self._keep_services = None
 924                self._writable_services = None
 925                self.using_proxy = None
 926                self._static_services_list = False
 927                try:
 928                    self._default_classes = [
 929                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 930                except KeyError:
 931                    # We're talking to an old cluster
 932                    pass
 933
 934    def current_timeout(self, attempt_number):
 935        """Return the appropriate timeout to use for this client.
 936
 937        The proxy timeout setting if the backend service is currently a proxy,
 938        the regular timeout setting otherwise.  The `attempt_number` indicates
 939        how many times the operation has been tried already (starting from 0
 940        for the first try), and scales the connection timeout portion of the
 941        return value accordingly.
 942
 943        """
 944        # TODO(twp): the timeout should be a property of a
 945        # _KeepService, not a KeepClient. See #4488.
 946        t = self.proxy_timeout if self.using_proxy else self.timeout
 947        if len(t) == 2:
 948            return (t[0] * (1 << attempt_number), t[1])
 949        else:
 950            return (t[0] * (1 << attempt_number), t[1], t[2])
 951    def _any_nondisk_services(self, service_list):
 952        return any(ks.get('service_type', 'disk') != 'disk'
 953                   for ks in service_list)
 954
 955    def build_services_list(self, force_rebuild=False):
 956        if (self._static_services_list or
 957              (self._keep_services and not force_rebuild)):
 958            return
 959        with self.lock:
 960            try:
 961                keep_services = self.api_client.keep_services().accessible()
 962            except Exception:  # API server predates Keep services.
 963                keep_services = self.api_client.keep_disks().list()
 964
 965            # Gateway services are only used when specified by UUID,
 966            # so there's nothing to gain by filtering them by
 967            # service_type.
 968            self._gateway_services = {ks['uuid']: ks for ks in
 969                                      keep_services.execute()['items']}
 970            if not self._gateway_services:
 971                raise arvados.errors.NoKeepServersError()
 972
 973            # Precompute the base URI for each service.
 974            for r in self._gateway_services.values():
 975                host = r['service_host']
 976                if not host.startswith('[') and host.find(':') >= 0:
 977                    # IPv6 URIs must be formatted like http://[::1]:80/...
 978                    host = '[' + host + ']'
 979                r['_service_root'] = "{}://{}:{:d}/".format(
 980                    'https' if r['service_ssl_flag'] else 'http',
 981                    host,
 982                    r['service_port'])
 983
 984            _logger.debug(str(self._gateway_services))
 985            self._keep_services = [
 986                ks for ks in self._gateway_services.values()
 987                if not ks.get('service_type', '').startswith('gateway:')]
 988            self._writable_services = [ks for ks in self._keep_services
 989                                       if not ks.get('read_only')]
 990
 991            # For disk type services, max_replicas_per_service is 1
 992            # It is unknown (unlimited) for other service types.
 993            if self._any_nondisk_services(self._writable_services):
 994                self.max_replicas_per_service = None
 995            else:
 996                self.max_replicas_per_service = 1
 997
 998    def _service_weight(self, data_hash, service_uuid):
 999        """Compute the weight of a Keep service endpoint for a data
1000        block with a known hash.
1001
1002        The weight is md5(h + u) where u is the last 15 characters of
1003        the service endpoint's UUID.
1004        """
1005        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1006
1007    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1008        """Return an array of Keep service endpoints, in the order in
1009        which they should be probed when reading or writing data with
1010        the given hash+hints.
1011        """
1012        self.build_services_list(force_rebuild)
1013
1014        sorted_roots = []
1015        # Use the services indicated by the given +K@... remote
1016        # service hints, if any are present and can be resolved to a
1017        # URI.
1018        for hint in locator.hints:
1019            if hint.startswith('K@'):
1020                if len(hint) == 7:
1021                    sorted_roots.append(
1022                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1023                elif len(hint) == 29:
1024                    svc = self._gateway_services.get(hint[2:])
1025                    if svc:
1026                        sorted_roots.append(svc['_service_root'])
1027
1028        # Sort the available local services by weight (heaviest first)
1029        # for this locator, and return their service_roots (base URIs)
1030        # in that order.
1031        use_services = self._keep_services
1032        if need_writable:
1033            use_services = self._writable_services
1034        self.using_proxy = self._any_nondisk_services(use_services)
1035        sorted_roots.extend([
1036            svc['_service_root'] for svc in sorted(
1037                use_services,
1038                reverse=True,
1039                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1040        _logger.debug("{}: {}".format(locator, sorted_roots))
1041        return sorted_roots
1042
1043    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1044        # roots_map is a dictionary, mapping Keep service root strings
1045        # to _KeepService objects.  Poll for Keep services, and add any
1046        # new ones to roots_map.  Return the current list of local
1047        # root strings.
1048        headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1049        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1050        for root in local_roots:
1051            if root not in roots_map:
1052                roots_map[root] = self._KeepService(
1053                    root, self._user_agent_pool,
1054                    upload_counter=self.upload_counter,
1055                    download_counter=self.download_counter,
1056                    headers=headers,
1057                    insecure=self.insecure)
1058        return local_roots
1059
1060    @staticmethod
1061    def _check_loop_result(result):
1062        # KeepClient RetryLoops should save results as a 2-tuple: the
1063        # actual result of the request, and the number of servers available
1064        # to receive the request this round.
1065        # This method returns True if there's a real result, False if
1066        # there are no more servers available, otherwise None.
1067        if isinstance(result, Exception):
1068            return None
1069        result, tried_server_count = result
1070        if (result is not None) and (result is not False):
1071            return True
1072        elif tried_server_count < 1:
1073            _logger.info("No more Keep services to try; giving up")
1074            return False
1075        else:
1076            return None
1077
1078    def get_from_cache(self, loc_s):
1079        """Fetch a block only if is in the cache, otherwise return None."""
1080        locator = KeepLocator(loc_s)
1081        slot = self.block_cache.get(locator.md5sum)
1082        if slot is not None and slot.ready.is_set():
1083            return slot.get()
1084        else:
1085            return None
1086
1087    def refresh_signature(self, loc):
1088        """Ask Keep to get the remote block and return its local signature"""
1089        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1090        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1091
1092    @retry.retry_method
1093    def head(self, loc_s, **kwargs):
1094        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1095
1096    @retry.retry_method
1097    def get(self, loc_s, **kwargs):
1098        return self._get_or_head(loc_s, method="GET", **kwargs)
1099
1100    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1101        """Get data from Keep.
1102
1103        This method fetches one or more blocks of data from Keep.  It
1104        sends a request each Keep service registered with the API
1105        server (or the proxy provided when this client was
1106        instantiated), then each service named in location hints, in
1107        sequence.  As soon as one service provides the data, it's
1108        returned.
1109
1110        Arguments:
1111        * loc_s: A string of one or more comma-separated locators to fetch.
1112          This method returns the concatenation of these blocks.
1113        * num_retries: The number of times to retry GET requests to
1114          *each* Keep server if it returns temporary failures, with
1115          exponential backoff.  Note that, in each loop, the method may try
1116          to fetch data from every available Keep service, along with any
1117          that are named in location hints in the locator.  The default value
1118          is set when the KeepClient is initialized.
1119        """
1120        if ',' in loc_s:
1121            return ''.join(self.get(x) for x in loc_s.split(','))
1122
1123        self.get_counter.add(1)
1124
1125        request_id = (request_id or
1126                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1127                      arvados.util.new_request_id())
1128        if headers is None:
1129            headers = {}
1130        headers['X-Request-Id'] = request_id
1131
1132        slot = None
1133        blob = None
1134        try:
1135            locator = KeepLocator(loc_s)
1136            if method == "GET":
1137                while slot is None:
1138                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1139                    if first:
1140                        # Fresh and empty "first time it is used" slot
1141                        break
1142                    if prefetch:
1143                        # this is request for a prefetch to fill in
1144                        # the cache, don't need to wait for the
1145                        # result, so if it is already in flight return
1146                        # immediately.  Clear 'slot' to prevent
1147                        # finally block from calling slot.set()
1148                        if slot.ready.is_set():
1149                            slot.get()
1150                        slot = None
1151                        return None
1152
1153                    blob = slot.get()
1154                    if blob is not None:
1155                        self.hits_counter.add(1)
1156                        return blob
1157
1158                    # If blob is None, this means either
1159                    #
1160                    # (a) another thread was fetching this block and
1161                    # failed with an error or
1162                    #
1163                    # (b) cache thrashing caused the slot to be
1164                    # evicted (content set to None) by another thread
1165                    # between the call to reserve_cache() and get().
1166                    #
1167                    # We'll handle these cases by reserving a new slot
1168                    # and then doing a full GET request.
1169                    slot = None
1170
1171            self.misses_counter.add(1)
1172
1173            # If the locator has hints specifying a prefix (indicating a
1174            # remote keepproxy) or the UUID of a local gateway service,
1175            # read data from the indicated service(s) instead of the usual
1176            # list of local disk services.
1177            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1178                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1179            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1180                               for hint in locator.hints if (
1181                                       hint.startswith('K@') and
1182                                       len(hint) == 29 and
1183                                       self._gateway_services.get(hint[2:])
1184                                       )])
1185            # Map root URLs to their _KeepService objects.
1186            roots_map = {
1187                root: self._KeepService(root, self._user_agent_pool,
1188                                       upload_counter=self.upload_counter,
1189                                       download_counter=self.download_counter,
1190                                       headers=headers,
1191                                       insecure=self.insecure)
1192                for root in hint_roots
1193            }
1194
1195            # See #3147 for a discussion of the loop implementation.  Highlights:
1196            # * Refresh the list of Keep services after each failure, in case
1197            #   it's being updated.
1198            # * Retry until we succeed, we're out of retries, or every available
1199            #   service has returned permanent failure.
1200            sorted_roots = []
1201            roots_map = {}
1202            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1203                                   backoff_start=2)
1204            for tries_left in loop:
1205                try:
1206                    sorted_roots = self.map_new_services(
1207                        roots_map, locator,
1208                        force_rebuild=(tries_left < num_retries),
1209                        need_writable=False,
1210                        headers=headers)
1211                except Exception as error:
1212                    loop.save_result(error)
1213                    continue
1214
1215                # Query _KeepService objects that haven't returned
1216                # permanent failure, in our specified shuffle order.
1217                services_to_try = [roots_map[root]
1218                                   for root in sorted_roots
1219                                   if roots_map[root].usable()]
1220                for keep_service in services_to_try:
1221                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1222                    if blob is not None:
1223                        break
1224                loop.save_result((blob, len(services_to_try)))
1225
1226            # Always cache the result, then return it if we succeeded.
1227            if loop.success():
1228                return blob
1229        finally:
1230            if slot is not None:
1231                self.block_cache.set(slot, blob)
1232
1233        # Q: Including 403 is necessary for the Keep tests to continue
1234        # passing, but maybe they should expect KeepReadError instead?
1235        not_founds = sum(1 for key in sorted_roots
1236                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1237        service_errors = ((key, roots_map[key].last_result()['error'])
1238                          for key in sorted_roots)
1239        if not roots_map:
1240            raise arvados.errors.KeepReadError(
1241                "[{}] failed to read {}: no Keep services available ({})".format(
1242                    request_id, loc_s, loop.last_result()))
1243        elif not_founds == len(sorted_roots):
1244            raise arvados.errors.NotFoundError(
1245                "[{}] {} not found".format(request_id, loc_s), service_errors)
1246        else:
1247            raise arvados.errors.KeepReadError(
1248                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1249
1250    @retry.retry_method
1251    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1252        """Save data in Keep.
1253
1254        This method will get a list of Keep services from the API server, and
1255        send the data to each one simultaneously in a new thread.  Once the
1256        uploads are finished, if enough copies are saved, this method returns
1257        the most recent HTTP response body.  If requests fail to upload
1258        enough copies, this method raises KeepWriteError.
1259
1260        Arguments:
1261        * data: The string of data to upload.
1262        * copies: The number of copies that the user requires be saved.
1263          Default 2.
1264        * num_retries: The number of times to retry PUT requests to
1265          *each* Keep server if it returns temporary failures, with
1266          exponential backoff.  The default value is set when the
1267          KeepClient is initialized.
1268        * classes: An optional list of storage class names where copies should
1269          be written.
1270        """
1271
1272        classes = classes or self._default_classes
1273
1274        if not isinstance(data, bytes):
1275            data = data.encode()
1276
1277        self.put_counter.add(1)
1278
1279        data_hash = hashlib.md5(data).hexdigest()
1280        loc_s = data_hash + '+' + str(len(data))
1281        if copies < 1:
1282            return loc_s
1283        locator = KeepLocator(loc_s)
1284
1285        request_id = (request_id or
1286                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1287                      arvados.util.new_request_id())
1288        headers = {
1289            'X-Request-Id': request_id,
1290            'X-Keep-Desired-Replicas': str(copies),
1291        }
1292        roots_map = {}
1293        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1294                               backoff_start=2)
1295        done_copies = 0
1296        done_classes = []
1297        for tries_left in loop:
1298            try:
1299                sorted_roots = self.map_new_services(
1300                    roots_map, locator,
1301                    force_rebuild=(tries_left < num_retries),
1302                    need_writable=True,
1303                    headers=headers)
1304            except Exception as error:
1305                loop.save_result(error)
1306                continue
1307
1308            pending_classes = []
1309            if done_classes is not None:
1310                pending_classes = list(set(classes) - set(done_classes))
1311            writer_pool = KeepClient._KeepWriterThreadPool(
1312                data=data,
1313                data_hash=data_hash,
1314                copies=copies - done_copies,
1315                max_service_replicas=self.max_replicas_per_service,
1316                timeout=self.current_timeout(num_retries - tries_left),
1317                classes=pending_classes,
1318            )
1319            for service_root, ks in [(root, roots_map[root])
1320                                     for root in sorted_roots]:
1321                if ks.finished():
1322                    continue
1323                writer_pool.add_task(ks, service_root)
1324            writer_pool.join()
1325            pool_copies, pool_classes = writer_pool.done()
1326            done_copies += pool_copies
1327            if (done_classes is not None) and (pool_classes is not None):
1328                done_classes += pool_classes
1329                loop.save_result(
1330                    (done_copies >= copies and set(done_classes) == set(classes),
1331                    writer_pool.total_task_nr))
1332            else:
1333                # Old keepstore contacted without storage classes support:
1334                # success is determined only by successful copies.
1335                #
1336                # Disable storage classes tracking from this point forward.
1337                if not self._storage_classes_unsupported_warning:
1338                    self._storage_classes_unsupported_warning = True
1339                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1340                done_classes = None
1341                loop.save_result(
1342                    (done_copies >= copies, writer_pool.total_task_nr))
1343
1344        if loop.success():
1345            return writer_pool.response()
1346        if not roots_map:
1347            raise arvados.errors.KeepWriteError(
1348                "[{}] failed to write {}: no Keep services available ({})".format(
1349                    request_id, data_hash, loop.last_result()))
1350        else:
1351            service_errors = ((key, roots_map[key].last_result()['error'])
1352                              for key in sorted_roots
1353                              if roots_map[key].last_result()['error'])
1354            raise arvados.errors.KeepWriteError(
1355                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1356                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1357
1358    def _block_prefetch_worker(self):
1359        """The background downloader thread."""
1360        while True:
1361            try:
1362                b = self._prefetch_queue.get()
1363                if b is None:
1364                    return
1365                self.get(b, prefetch=True)
1366            except Exception:
1367                _logger.exception("Exception doing block prefetch")
1368
1369    def _start_prefetch_threads(self):
1370        if self._prefetch_threads is None:
1371            with self.lock:
1372                if self._prefetch_threads is not None:
1373                    return
1374                self._prefetch_queue = queue.Queue()
1375                self._prefetch_threads = []
1376                for i in range(0, self.num_prefetch_threads):
1377                    thread = threading.Thread(target=self._block_prefetch_worker)
1378                    self._prefetch_threads.append(thread)
1379                    thread.daemon = True
1380                    thread.start()
1381
1382    def block_prefetch(self, locator):
1383        """
1384        This relies on the fact that KeepClient implements a block cache,
1385        so repeated requests for the same block will not result in repeated
1386        downloads (unless the block is evicted from the cache.)  This method
1387        does not block.
1388        """
1389
1390        if self.block_cache.get(locator) is not None:
1391            return
1392
1393        self._start_prefetch_threads()
1394        self._prefetch_queue.put(locator)
1395
1396    def stop_prefetch_threads(self):
1397        with self.lock:
1398            if self._prefetch_threads is not None:
1399                for t in self._prefetch_threads:
1400                    self._prefetch_queue.put(None)
1401                for t in self._prefetch_threads:
1402                    t.join()
1403            self._prefetch_threads = None
1404            self._prefetch_queue = None
1405
1406    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1407        """A stub for put().
1408
1409        This method is used in place of the real put() method when
1410        using local storage (see constructor's local_store argument).
1411
1412        copies and num_retries arguments are ignored: they are here
1413        only for the sake of offering the same call signature as
1414        put().
1415
1416        Data stored this way can be retrieved via local_store_get().
1417        """
1418        md5 = hashlib.md5(data).hexdigest()
1419        locator = '%s+%d' % (md5, len(data))
1420        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1421            f.write(data)
1422        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1423                  os.path.join(self.local_store, md5))
1424        return locator
1425
1426    def local_store_get(self, loc_s, num_retries=None):
1427        """Companion to local_store_put()."""
1428        try:
1429            locator = KeepLocator(loc_s)
1430        except ValueError:
1431            raise arvados.errors.NotFoundError(
1432                "Invalid data locator: '%s'" % loc_s)
1433        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1434            return b''
1435        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1436            return f.read()
1437
1438    def local_store_head(self, loc_s, num_retries=None):
1439        """Companion to local_store_put()."""
1440        try:
1441            locator = KeepLocator(loc_s)
1442        except ValueError:
1443            raise arvados.errors.NotFoundError(
1444                "Invalid data locator: '%s'" % loc_s)
1445        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1446            return True
1447        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1448            return True
KeepClient( api_client=None, proxy=None, timeout=(2, 256, 32768), proxy_timeout=(20, 256, 32768), api_token=None, local_store=None, block_cache=None, num_retries=10, session=None, num_prefetch_threads=None)
789    def __init__(self, api_client=None, proxy=None,
790                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
791                 api_token=None, local_store=None, block_cache=None,
792                 num_retries=10, session=None, num_prefetch_threads=None):
793        """Initialize a new KeepClient.
794
795        Arguments:
796        :api_client:
797          The API client to use to find Keep services.  If not
798          provided, KeepClient will build one from available Arvados
799          configuration.
800
801        :proxy:
802          If specified, this KeepClient will send requests to this Keep
803          proxy.  Otherwise, KeepClient will fall back to the setting of the
804          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
805          If you want to KeepClient does not use a proxy, pass in an empty
806          string.
807
808        :timeout:
809          The initial timeout (in seconds) for HTTP requests to Keep
810          non-proxy servers.  A tuple of three floats is interpreted as
811          (connection_timeout, read_timeout, minimum_bandwidth). A connection
812          will be aborted if the average traffic rate falls below
813          minimum_bandwidth bytes per second over an interval of read_timeout
814          seconds. Because timeouts are often a result of transient server
815          load, the actual connection timeout will be increased by a factor
816          of two on each retry.
817          Default: (2, 256, 32768).
818
819        :proxy_timeout:
820          The initial timeout (in seconds) for HTTP requests to
821          Keep proxies. A tuple of three floats is interpreted as
822          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
823          described above for adjusting connection timeouts on retry also
824          applies.
825          Default: (20, 256, 32768).
826
827        :api_token:
828          If you're not using an API client, but only talking
829          directly to a Keep proxy, this parameter specifies an API token
830          to authenticate Keep requests.  It is an error to specify both
831          api_client and api_token.  If you specify neither, KeepClient
832          will use one available from the Arvados configuration.
833
834        :local_store:
835          If specified, this KeepClient will bypass Keep
836          services, and save data to the named directory.  If unspecified,
837          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
838          environment variable.  If you want to ensure KeepClient does not
839          use local storage, pass in an empty string.  This is primarily
840          intended to mock a server for testing.
841
842        :num_retries:
843          The default number of times to retry failed requests.
844          This will be used as the default num_retries value when get() and
845          put() are called.  Default 10.
846        """
847        self.lock = threading.Lock()
848        if proxy is None:
849            if config.get('ARVADOS_KEEP_SERVICES'):
850                proxy = config.get('ARVADOS_KEEP_SERVICES')
851            else:
852                proxy = config.get('ARVADOS_KEEP_PROXY')
853        if api_token is None:
854            if api_client is None:
855                api_token = config.get('ARVADOS_API_TOKEN')
856            else:
857                api_token = api_client.api_token
858        elif api_client is not None:
859            raise ValueError(
860                "can't build KeepClient with both API client and token")
861        if local_store is None:
862            local_store = os.environ.get('KEEP_LOCAL_STORE')
863
864        if api_client is None:
865            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
866        else:
867            self.insecure = api_client.insecure
868
869        self.block_cache = block_cache if block_cache else KeepBlockCache()
870        self.timeout = timeout
871        self.proxy_timeout = proxy_timeout
872        self._user_agent_pool = queue.LifoQueue()
873        self.upload_counter = _Counter()
874        self.download_counter = _Counter()
875        self.put_counter = _Counter()
876        self.get_counter = _Counter()
877        self.hits_counter = _Counter()
878        self.misses_counter = _Counter()
879        self._storage_classes_unsupported_warning = False
880        self._default_classes = []
881        if num_prefetch_threads is not None:
882            self.num_prefetch_threads = num_prefetch_threads
883        else:
884            self.num_prefetch_threads = 2
885        self._prefetch_queue = None
886        self._prefetch_threads = None
887
888        if local_store:
889            self.local_store = local_store
890            self.head = self.local_store_head
891            self.get = self.local_store_get
892            self.put = self.local_store_put
893        else:
894            self.num_retries = num_retries
895            self.max_replicas_per_service = None
896            if proxy:
897                proxy_uris = proxy.split()
898                for i in range(len(proxy_uris)):
899                    if not proxy_uris[i].endswith('/'):
900                        proxy_uris[i] += '/'
901                    # URL validation
902                    url = urllib.parse.urlparse(proxy_uris[i])
903                    if not (url.scheme and url.netloc):
904                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
905                self.api_token = api_token
906                self._gateway_services = {}
907                self._keep_services = [{
908                    'uuid': "00000-bi6l4-%015d" % idx,
909                    'service_type': 'proxy',
910                    '_service_root': uri,
911                    } for idx, uri in enumerate(proxy_uris)]
912                self._writable_services = self._keep_services
913                self.using_proxy = True
914                self._static_services_list = True
915            else:
916                # It's important to avoid instantiating an API client
917                # unless we actually need one, for testing's sake.
918                if api_client is None:
919                    api_client = arvados.api('v1')
920                self.api_client = api_client
921                self.api_token = api_client.api_token
922                self._gateway_services = {}
923                self._keep_services = None
924                self._writable_services = None
925                self.using_proxy = None
926                self._static_services_list = False
927                try:
928                    self._default_classes = [
929                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
930                except KeyError:
931                    # We're talking to an old cluster
932                    pass

Initialize a new KeepClient.

Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.

:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.

:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).

:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).

:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.

:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.

:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.

DEFAULT_TIMEOUT = (2, 256, 32768)
DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
lock
block_cache
timeout
proxy_timeout
upload_counter
download_counter
put_counter
get_counter
hits_counter
misses_counter
def current_timeout(self, attempt_number):
934    def current_timeout(self, attempt_number):
935        """Return the appropriate timeout to use for this client.
936
937        The proxy timeout setting if the backend service is currently a proxy,
938        the regular timeout setting otherwise.  The `attempt_number` indicates
939        how many times the operation has been tried already (starting from 0
940        for the first try), and scales the connection timeout portion of the
941        return value accordingly.
942
943        """
944        # TODO(twp): the timeout should be a property of a
945        # _KeepService, not a KeepClient. See #4488.
946        t = self.proxy_timeout if self.using_proxy else self.timeout
947        if len(t) == 2:
948            return (t[0] * (1 << attempt_number), t[1])
949        else:
950            return (t[0] * (1 << attempt_number), t[1], t[2])

Return the appropriate timeout to use for this client.

The proxy timeout setting if the backend service is currently a proxy, the regular timeout setting otherwise. The attempt_number indicates how many times the operation has been tried already (starting from 0 for the first try), and scales the connection timeout portion of the return value accordingly.

def build_services_list(self, force_rebuild=False):
955    def build_services_list(self, force_rebuild=False):
956        if (self._static_services_list or
957              (self._keep_services and not force_rebuild)):
958            return
959        with self.lock:
960            try:
961                keep_services = self.api_client.keep_services().accessible()
962            except Exception:  # API server predates Keep services.
963                keep_services = self.api_client.keep_disks().list()
964
965            # Gateway services are only used when specified by UUID,
966            # so there's nothing to gain by filtering them by
967            # service_type.
968            self._gateway_services = {ks['uuid']: ks for ks in
969                                      keep_services.execute()['items']}
970            if not self._gateway_services:
971                raise arvados.errors.NoKeepServersError()
972
973            # Precompute the base URI for each service.
974            for r in self._gateway_services.values():
975                host = r['service_host']
976                if not host.startswith('[') and host.find(':') >= 0:
977                    # IPv6 URIs must be formatted like http://[::1]:80/...
978                    host = '[' + host + ']'
979                r['_service_root'] = "{}://{}:{:d}/".format(
980                    'https' if r['service_ssl_flag'] else 'http',
981                    host,
982                    r['service_port'])
983
984            _logger.debug(str(self._gateway_services))
985            self._keep_services = [
986                ks for ks in self._gateway_services.values()
987                if not ks.get('service_type', '').startswith('gateway:')]
988            self._writable_services = [ks for ks in self._keep_services
989                                       if not ks.get('read_only')]
990
991            # For disk type services, max_replicas_per_service is 1
992            # It is unknown (unlimited) for other service types.
993            if self._any_nondisk_services(self._writable_services):
994                self.max_replicas_per_service = None
995            else:
996                self.max_replicas_per_service = 1
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1007    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1008        """Return an array of Keep service endpoints, in the order in
1009        which they should be probed when reading or writing data with
1010        the given hash+hints.
1011        """
1012        self.build_services_list(force_rebuild)
1013
1014        sorted_roots = []
1015        # Use the services indicated by the given +K@... remote
1016        # service hints, if any are present and can be resolved to a
1017        # URI.
1018        for hint in locator.hints:
1019            if hint.startswith('K@'):
1020                if len(hint) == 7:
1021                    sorted_roots.append(
1022                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1023                elif len(hint) == 29:
1024                    svc = self._gateway_services.get(hint[2:])
1025                    if svc:
1026                        sorted_roots.append(svc['_service_root'])
1027
1028        # Sort the available local services by weight (heaviest first)
1029        # for this locator, and return their service_roots (base URIs)
1030        # in that order.
1031        use_services = self._keep_services
1032        if need_writable:
1033            use_services = self._writable_services
1034        self.using_proxy = self._any_nondisk_services(use_services)
1035        sorted_roots.extend([
1036            svc['_service_root'] for svc in sorted(
1037                use_services,
1038                reverse=True,
1039                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1040        _logger.debug("{}: {}".format(locator, sorted_roots))
1041        return sorted_roots

Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.

def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1043    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1044        # roots_map is a dictionary, mapping Keep service root strings
1045        # to _KeepService objects.  Poll for Keep services, and add any
1046        # new ones to roots_map.  Return the current list of local
1047        # root strings.
1048        headers.setdefault('Authorization', "Bearer %s" % (self.api_token,))
1049        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1050        for root in local_roots:
1051            if root not in roots_map:
1052                roots_map[root] = self._KeepService(
1053                    root, self._user_agent_pool,
1054                    upload_counter=self.upload_counter,
1055                    download_counter=self.download_counter,
1056                    headers=headers,
1057                    insecure=self.insecure)
1058        return local_roots
def get_from_cache(self, loc_s):
1078    def get_from_cache(self, loc_s):
1079        """Fetch a block only if is in the cache, otherwise return None."""
1080        locator = KeepLocator(loc_s)
1081        slot = self.block_cache.get(locator.md5sum)
1082        if slot is not None and slot.ready.is_set():
1083            return slot.get()
1084        else:
1085            return None

Fetch a block only if is in the cache, otherwise return None.

def refresh_signature(self, loc):
1087    def refresh_signature(self, loc):
1088        """Ask Keep to get the remote block and return its local signature"""
1089        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1090        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})

Ask Keep to get the remote block and return its local signature

@retry.retry_method
def head(self, loc_s, **kwargs):
1092    @retry.retry_method
1093    def head(self, loc_s, **kwargs):
1094        return self._get_or_head(loc_s, method="HEAD", **kwargs)
@retry.retry_method
def get(self, loc_s, **kwargs):
1096    @retry.retry_method
1097    def get(self, loc_s, **kwargs):
1098        return self._get_or_head(loc_s, method="GET", **kwargs)
@retry.retry_method
def put( self, data, copies=2, num_retries=None, request_id=None, classes=None):
1250    @retry.retry_method
1251    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1252        """Save data in Keep.
1253
1254        This method will get a list of Keep services from the API server, and
1255        send the data to each one simultaneously in a new thread.  Once the
1256        uploads are finished, if enough copies are saved, this method returns
1257        the most recent HTTP response body.  If requests fail to upload
1258        enough copies, this method raises KeepWriteError.
1259
1260        Arguments:
1261        * data: The string of data to upload.
1262        * copies: The number of copies that the user requires be saved.
1263          Default 2.
1264        * num_retries: The number of times to retry PUT requests to
1265          *each* Keep server if it returns temporary failures, with
1266          exponential backoff.  The default value is set when the
1267          KeepClient is initialized.
1268        * classes: An optional list of storage class names where copies should
1269          be written.
1270        """
1271
1272        classes = classes or self._default_classes
1273
1274        if not isinstance(data, bytes):
1275            data = data.encode()
1276
1277        self.put_counter.add(1)
1278
1279        data_hash = hashlib.md5(data).hexdigest()
1280        loc_s = data_hash + '+' + str(len(data))
1281        if copies < 1:
1282            return loc_s
1283        locator = KeepLocator(loc_s)
1284
1285        request_id = (request_id or
1286                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1287                      arvados.util.new_request_id())
1288        headers = {
1289            'X-Request-Id': request_id,
1290            'X-Keep-Desired-Replicas': str(copies),
1291        }
1292        roots_map = {}
1293        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1294                               backoff_start=2)
1295        done_copies = 0
1296        done_classes = []
1297        for tries_left in loop:
1298            try:
1299                sorted_roots = self.map_new_services(
1300                    roots_map, locator,
1301                    force_rebuild=(tries_left < num_retries),
1302                    need_writable=True,
1303                    headers=headers)
1304            except Exception as error:
1305                loop.save_result(error)
1306                continue
1307
1308            pending_classes = []
1309            if done_classes is not None:
1310                pending_classes = list(set(classes) - set(done_classes))
1311            writer_pool = KeepClient._KeepWriterThreadPool(
1312                data=data,
1313                data_hash=data_hash,
1314                copies=copies - done_copies,
1315                max_service_replicas=self.max_replicas_per_service,
1316                timeout=self.current_timeout(num_retries - tries_left),
1317                classes=pending_classes,
1318            )
1319            for service_root, ks in [(root, roots_map[root])
1320                                     for root in sorted_roots]:
1321                if ks.finished():
1322                    continue
1323                writer_pool.add_task(ks, service_root)
1324            writer_pool.join()
1325            pool_copies, pool_classes = writer_pool.done()
1326            done_copies += pool_copies
1327            if (done_classes is not None) and (pool_classes is not None):
1328                done_classes += pool_classes
1329                loop.save_result(
1330                    (done_copies >= copies and set(done_classes) == set(classes),
1331                    writer_pool.total_task_nr))
1332            else:
1333                # Old keepstore contacted without storage classes support:
1334                # success is determined only by successful copies.
1335                #
1336                # Disable storage classes tracking from this point forward.
1337                if not self._storage_classes_unsupported_warning:
1338                    self._storage_classes_unsupported_warning = True
1339                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1340                done_classes = None
1341                loop.save_result(
1342                    (done_copies >= copies, writer_pool.total_task_nr))
1343
1344        if loop.success():
1345            return writer_pool.response()
1346        if not roots_map:
1347            raise arvados.errors.KeepWriteError(
1348                "[{}] failed to write {}: no Keep services available ({})".format(
1349                    request_id, data_hash, loop.last_result()))
1350        else:
1351            service_errors = ((key, roots_map[key].last_result()['error'])
1352                              for key in sorted_roots
1353                              if roots_map[key].last_result()['error'])
1354            raise arvados.errors.KeepWriteError(
1355                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1356                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")

Save data in Keep.

This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.

Arguments:

  • data: The string of data to upload.
  • copies: The number of copies that the user requires be saved. Default 2.
  • num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
  • classes: An optional list of storage class names where copies should be written.
def block_prefetch(self, locator):
1382    def block_prefetch(self, locator):
1383        """
1384        This relies on the fact that KeepClient implements a block cache,
1385        so repeated requests for the same block will not result in repeated
1386        downloads (unless the block is evicted from the cache.)  This method
1387        does not block.
1388        """
1389
1390        if self.block_cache.get(locator) is not None:
1391            return
1392
1393        self._start_prefetch_threads()
1394        self._prefetch_queue.put(locator)

This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.

def stop_prefetch_threads(self):
1396    def stop_prefetch_threads(self):
1397        with self.lock:
1398            if self._prefetch_threads is not None:
1399                for t in self._prefetch_threads:
1400                    self._prefetch_queue.put(None)
1401                for t in self._prefetch_threads:
1402                    t.join()
1403            self._prefetch_threads = None
1404            self._prefetch_queue = None
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1406    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1407        """A stub for put().
1408
1409        This method is used in place of the real put() method when
1410        using local storage (see constructor's local_store argument).
1411
1412        copies and num_retries arguments are ignored: they are here
1413        only for the sake of offering the same call signature as
1414        put().
1415
1416        Data stored this way can be retrieved via local_store_get().
1417        """
1418        md5 = hashlib.md5(data).hexdigest()
1419        locator = '%s+%d' % (md5, len(data))
1420        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1421            f.write(data)
1422        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1423                  os.path.join(self.local_store, md5))
1424        return locator

A stub for put().

This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).

copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().

Data stored this way can be retrieved via local_store_get().

def local_store_get(self, loc_s, num_retries=None):
1426    def local_store_get(self, loc_s, num_retries=None):
1427        """Companion to local_store_put()."""
1428        try:
1429            locator = KeepLocator(loc_s)
1430        except ValueError:
1431            raise arvados.errors.NotFoundError(
1432                "Invalid data locator: '%s'" % loc_s)
1433        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1434            return b''
1435        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1436            return f.read()

Companion to local_store_put().

def local_store_head(self, loc_s, num_retries=None):
1438    def local_store_head(self, loc_s, num_retries=None):
1439        """Companion to local_store_put()."""
1440        try:
1441            locator = KeepLocator(loc_s)
1442        except ValueError:
1443            raise arvados.errors.NotFoundError(
1444                "Invalid data locator: '%s'" % loc_s)
1445        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1446            return True
1447        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1448            return True

Companion to local_store_put().