arvados.keep

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5import copy
   6import collections
   7import datetime
   8import hashlib
   9import errno
  10import io
  11import logging
  12import math
  13import os
  14import pycurl
  15import queue
  16import re
  17import socket
  18import ssl
  19import sys
  20import threading
  21import resource
  22import urllib.parse
  23import traceback
  24import weakref
  25
  26from io import BytesIO
  27
  28import arvados
  29import arvados.config as config
  30import arvados.errors
  31import arvados.retry as retry
  32import arvados.util
  33import arvados.diskcache
  34from arvados._pycurlhelper import PyCurlHelper
  35from . import timer
  36
  37_logger = logging.getLogger('arvados.keep')
  38global_client_object = None
  39
  40# Monkey patch TCP constants when not available (apple). Values sourced from:
  41# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
  42if sys.platform == 'darwin':
  43    if not hasattr(socket, 'TCP_KEEPALIVE'):
  44        socket.TCP_KEEPALIVE = 0x010
  45    if not hasattr(socket, 'TCP_KEEPINTVL'):
  46        socket.TCP_KEEPINTVL = 0x101
  47    if not hasattr(socket, 'TCP_KEEPCNT'):
  48        socket.TCP_KEEPCNT = 0x102
  49
  50class KeepLocator(object):
  51    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
  52    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
  53
  54    def __init__(self, locator_str):
  55        self.hints = []
  56        self._perm_sig = None
  57        self._perm_expiry = None
  58        pieces = iter(locator_str.split('+'))
  59        self.md5sum = next(pieces)
  60        try:
  61            self.size = int(next(pieces))
  62        except StopIteration:
  63            self.size = None
  64        for hint in pieces:
  65            if self.HINT_RE.match(hint) is None:
  66                raise ValueError("invalid hint format: {}".format(hint))
  67            elif hint.startswith('A'):
  68                self.parse_permission_hint(hint)
  69            else:
  70                self.hints.append(hint)
  71
  72    def __str__(self):
  73        return '+'.join(
  74            str(s)
  75            for s in [self.md5sum, self.size,
  76                      self.permission_hint()] + self.hints
  77            if s is not None)
  78
  79    def stripped(self):
  80        if self.size is not None:
  81            return "%s+%i" % (self.md5sum, self.size)
  82        else:
  83            return self.md5sum
  84
  85    def _make_hex_prop(name, length):
  86        # Build and return a new property with the given name that
  87        # must be a hex string of the given length.
  88        data_name = '_{}'.format(name)
  89        def getter(self):
  90            return getattr(self, data_name)
  91        def setter(self, hex_str):
  92            if not arvados.util.is_hex(hex_str, length):
  93                raise ValueError("{} is not a {}-digit hex string: {!r}".
  94                                 format(name, length, hex_str))
  95            setattr(self, data_name, hex_str)
  96        return property(getter, setter)
  97
  98    md5sum = _make_hex_prop('md5sum', 32)
  99    perm_sig = _make_hex_prop('perm_sig', 40)
 100
 101    @property
 102    def perm_expiry(self):
 103        return self._perm_expiry
 104
 105    @perm_expiry.setter
 106    def perm_expiry(self, value):
 107        if not arvados.util.is_hex(value, 1, 8):
 108            raise ValueError(
 109                "permission timestamp must be a hex Unix timestamp: {}".
 110                format(value))
 111        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
 112
 113    def permission_hint(self):
 114        data = [self.perm_sig, self.perm_expiry]
 115        if None in data:
 116            return None
 117        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
 118        return "A{}@{:08x}".format(*data)
 119
 120    def parse_permission_hint(self, s):
 121        try:
 122            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
 123        except IndexError:
 124            raise ValueError("bad permission hint {}".format(s))
 125
 126    def permission_expired(self, as_of_dt=None):
 127        if self.perm_expiry is None:
 128            return False
 129        elif as_of_dt is None:
 130            as_of_dt = datetime.datetime.now()
 131        return self.perm_expiry <= as_of_dt
 132
 133
 134class Keep(object):
 135    """Simple interface to a global KeepClient object.
 136
 137    THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
 138    own API client.  The global KeepClient will build an API client from the
 139    current Arvados configuration, which may not match the one you built.
 140    """
 141    _last_key = None
 142
 143    @classmethod
 144    def global_client_object(cls):
 145        global global_client_object
 146        # Previously, KeepClient would change its behavior at runtime based
 147        # on these configuration settings.  We simulate that behavior here
 148        # by checking the values and returning a new KeepClient if any of
 149        # them have changed.
 150        key = (config.get('ARVADOS_API_HOST'),
 151               config.get('ARVADOS_API_TOKEN'),
 152               config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
 153               config.get('ARVADOS_KEEP_PROXY'),
 154               os.environ.get('KEEP_LOCAL_STORE'))
 155        if (global_client_object is None) or (cls._last_key != key):
 156            global_client_object = KeepClient()
 157            cls._last_key = key
 158        return global_client_object
 159
 160    @staticmethod
 161    def get(locator, **kwargs):
 162        return Keep.global_client_object().get(locator, **kwargs)
 163
 164    @staticmethod
 165    def put(data, **kwargs):
 166        return Keep.global_client_object().put(data, **kwargs)
 167
 168class KeepBlockCache(object):
 169    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
 170        self.cache_max = cache_max
 171        self._cache = collections.OrderedDict()
 172        self._cache_lock = threading.Lock()
 173        self._max_slots = max_slots
 174        self._disk_cache = disk_cache
 175        self._disk_cache_dir = disk_cache_dir
 176        self._cache_updating = threading.Condition(self._cache_lock)
 177
 178        if self._disk_cache and self._disk_cache_dir is None:
 179            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
 180            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
 181
 182        if self._max_slots == 0:
 183            if self._disk_cache:
 184                # Each block uses two file descriptors, one used to
 185                # open it initially and hold the flock(), and a second
 186                # hidden one used by mmap().
 187                #
 188                # Set max slots to 1/8 of maximum file handles.  This
 189                # means we'll use at most 1/4 of total file handles.
 190                #
 191                # NOFILE typically defaults to 1024 on Linux so this
 192                # is 128 slots (256 file handles), which means we can
 193                # cache up to 8 GiB of 64 MiB blocks.  This leaves
 194                # 768 file handles for sockets and other stuff.
 195                #
 196                # When we want the ability to have more cache (e.g. in
 197                # arv-mount) we'll increase rlimit before calling
 198                # this.
 199                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
 200            else:
 201                # RAM cache slots
 202                self._max_slots = 512
 203
 204        if self.cache_max == 0:
 205            if self._disk_cache:
 206                fs = os.statvfs(self._disk_cache_dir)
 207                # Calculation of available space incorporates existing cache usage
 208                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
 209                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
 210                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
 211                # pick smallest of:
 212                # 10% of total disk size
 213                # 25% of available space
 214                # max_slots * 64 MiB
 215                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
 216            else:
 217                # 256 MiB in RAM
 218                self.cache_max = (256 * 1024 * 1024)
 219
 220        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 221
 222        self.cache_total = 0
 223        if self._disk_cache:
 224            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
 225            for slot in self._cache.values():
 226                self.cache_total += slot.size()
 227            self.cap_cache()
 228
 229    class CacheSlot(object):
 230        __slots__ = ("locator", "ready", "content")
 231
 232        def __init__(self, locator):
 233            self.locator = locator
 234            self.ready = threading.Event()
 235            self.content = None
 236
 237        def get(self):
 238            self.ready.wait()
 239            return self.content
 240
 241        def set(self, value):
 242            if self.content is not None:
 243                return False
 244            self.content = value
 245            self.ready.set()
 246            return True
 247
 248        def size(self):
 249            if self.content is None:
 250                return 0
 251            else:
 252                return len(self.content)
 253
 254        def evict(self):
 255            self.content = None
 256
 257
 258    def _resize_cache(self, cache_max, max_slots):
 259        # Try and make sure the contents of the cache do not exceed
 260        # the supplied maximums.
 261
 262        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
 263            return
 264
 265        _evict_candidates = collections.deque(self._cache.values())
 266        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
 267            slot = _evict_candidates.popleft()
 268            if not slot.ready.is_set():
 269                continue
 270
 271            sz = slot.size()
 272            slot.evict()
 273            self.cache_total -= sz
 274            del self._cache[slot.locator]
 275
 276
 277    def cap_cache(self):
 278        '''Cap the cache size to self.cache_max'''
 279        with self._cache_updating:
 280            self._resize_cache(self.cache_max, self._max_slots)
 281            self._cache_updating.notify_all()
 282
 283    def _get(self, locator):
 284        # Test if the locator is already in the cache
 285        if locator in self._cache:
 286            n = self._cache[locator]
 287            if n.ready.is_set() and n.content is None:
 288                del self._cache[n.locator]
 289                return None
 290            self._cache.move_to_end(locator)
 291            return n
 292        if self._disk_cache:
 293            # see if it exists on disk
 294            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
 295            if n is not None:
 296                self._cache[n.locator] = n
 297                self.cache_total += n.size()
 298                return n
 299        return None
 300
 301    def get(self, locator):
 302        with self._cache_lock:
 303            return self._get(locator)
 304
 305    def reserve_cache(self, locator):
 306        '''Reserve a cache slot for the specified locator,
 307        or return the existing slot.'''
 308        with self._cache_updating:
 309            n = self._get(locator)
 310            if n:
 311                return n, False
 312            else:
 313                # Add a new cache slot for the locator
 314                self._resize_cache(self.cache_max, self._max_slots-1)
 315                while len(self._cache) >= self._max_slots:
 316                    # If there isn't a slot available, need to wait
 317                    # for something to happen that releases one of the
 318                    # cache slots.  Idle for 200 ms or woken up by
 319                    # another thread
 320                    self._cache_updating.wait(timeout=0.2)
 321                    self._resize_cache(self.cache_max, self._max_slots-1)
 322
 323                if self._disk_cache:
 324                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
 325                else:
 326                    n = KeepBlockCache.CacheSlot(locator)
 327                self._cache[n.locator] = n
 328                return n, True
 329
 330    def set(self, slot, blob):
 331        try:
 332            if slot.set(blob):
 333                self.cache_total += slot.size()
 334            return
 335        except OSError as e:
 336            if e.errno == errno.ENOMEM:
 337                # Reduce max slots to current - 4, cap cache and retry
 338                with self._cache_lock:
 339                    self._max_slots = max(4, len(self._cache) - 4)
 340            elif e.errno == errno.ENOSPC:
 341                # Reduce disk max space to current - 256 MiB, cap cache and retry
 342                with self._cache_lock:
 343                    sm = sum(st.size() for st in self._cache.values())
 344                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
 345            elif e.errno == errno.ENODEV:
 346                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
 347        except Exception as e:
 348            pass
 349        finally:
 350            # Check if we should evict things from the cache.  Either
 351            # because we added a new thing or there was an error and
 352            # we possibly adjusted the limits down, so we might need
 353            # to push something out.
 354            self.cap_cache()
 355
 356        try:
 357            # Only gets here if there was an error the first time. The
 358            # exception handler adjusts limits downward in some cases
 359            # to free up resources, which would make the operation
 360            # succeed.
 361            if slot.set(blob):
 362                self.cache_total += slot.size()
 363        except Exception as e:
 364            # It failed again.  Give up.
 365            slot.set(None)
 366            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
 367
 368        self.cap_cache()
 369
 370class Counter(object):
 371    def __init__(self, v=0):
 372        self._lk = threading.Lock()
 373        self._val = v
 374
 375    def add(self, v):
 376        with self._lk:
 377            self._val += v
 378
 379    def get(self):
 380        with self._lk:
 381            return self._val
 382
 383
 384class KeepClient(object):
 385    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 386    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 387
 388    class KeepService(PyCurlHelper):
 389        """Make requests to a single Keep service, and track results.
 390
 391        A KeepService is intended to last long enough to perform one
 392        transaction (GET or PUT) against one Keep service. This can
 393        involve calling either get() or put() multiple times in order
 394        to retry after transient failures. However, calling both get()
 395        and put() on a single instance -- or using the same instance
 396        to access two different Keep services -- will not produce
 397        sensible behavior.
 398        """
 399
 400        HTTP_ERRORS = (
 401            socket.error,
 402            ssl.SSLError,
 403            arvados.errors.HttpError,
 404        )
 405
 406        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 407                     upload_counter=None,
 408                     download_counter=None,
 409                     headers={},
 410                     insecure=False):
 411            super(KeepClient.KeepService, self).__init__()
 412            self.root = root
 413            self._user_agent_pool = user_agent_pool
 414            self._result = {'error': None}
 415            self._usable = True
 416            self._session = None
 417            self._socket = None
 418            self.get_headers = {'Accept': 'application/octet-stream'}
 419            self.get_headers.update(headers)
 420            self.put_headers = headers
 421            self.upload_counter = upload_counter
 422            self.download_counter = download_counter
 423            self.insecure = insecure
 424
 425        def usable(self):
 426            """Is it worth attempting a request?"""
 427            return self._usable
 428
 429        def finished(self):
 430            """Did the request succeed or encounter permanent failure?"""
 431            return self._result['error'] == False or not self._usable
 432
 433        def last_result(self):
 434            return self._result
 435
 436        def _get_user_agent(self):
 437            try:
 438                return self._user_agent_pool.get(block=False)
 439            except queue.Empty:
 440                return pycurl.Curl()
 441
 442        def _put_user_agent(self, ua):
 443            try:
 444                ua.reset()
 445                self._user_agent_pool.put(ua, block=False)
 446            except:
 447                ua.close()
 448
 449        def get(self, locator, method="GET", timeout=None):
 450            # locator is a KeepLocator object.
 451            url = self.root + str(locator)
 452            _logger.debug("Request: %s %s", method, url)
 453            curl = self._get_user_agent()
 454            ok = None
 455            try:
 456                with timer.Timer() as t:
 457                    self._headers = {}
 458                    response_body = BytesIO()
 459                    curl.setopt(pycurl.NOSIGNAL, 1)
 460                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 461                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 462                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 463                    curl.setopt(pycurl.HTTPHEADER, [
 464                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 465                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 466                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 467                    if self.insecure:
 468                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 469                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 470                    else:
 471                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 472                    if method == "HEAD":
 473                        curl.setopt(pycurl.NOBODY, True)
 474                    else:
 475                        curl.setopt(pycurl.HTTPGET, True)
 476                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 477
 478                    try:
 479                        curl.perform()
 480                    except Exception as e:
 481                        raise arvados.errors.HttpError(0, str(e))
 482                    finally:
 483                        if self._socket:
 484                            self._socket.close()
 485                            self._socket = None
 486                    self._result = {
 487                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 488                        'body': response_body.getvalue(),
 489                        'headers': self._headers,
 490                        'error': False,
 491                    }
 492
 493                ok = retry.check_http_response_success(self._result['status_code'])
 494                if not ok:
 495                    self._result['error'] = arvados.errors.HttpError(
 496                        self._result['status_code'],
 497                        self._headers.get('x-status-line', 'Error'))
 498            except self.HTTP_ERRORS as e:
 499                self._result = {
 500                    'error': e,
 501                }
 502            self._usable = ok != False
 503            if self._result.get('status_code', None):
 504                # The client worked well enough to get an HTTP status
 505                # code, so presumably any problems are just on the
 506                # server side and it's OK to reuse the client.
 507                self._put_user_agent(curl)
 508            else:
 509                # Don't return this client to the pool, in case it's
 510                # broken.
 511                curl.close()
 512            if not ok:
 513                _logger.debug("Request fail: GET %s => %s: %s",
 514                              url, type(self._result['error']), str(self._result['error']))
 515                return None
 516            if method == "HEAD":
 517                _logger.info("HEAD %s: %s bytes",
 518                         self._result['status_code'],
 519                         self._result.get('content-length'))
 520                if self._result['headers'].get('x-keep-locator'):
 521                    # This is a response to a remote block copy request, return
 522                    # the local copy block locator.
 523                    return self._result['headers'].get('x-keep-locator')
 524                return True
 525
 526            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 527                         self._result['status_code'],
 528                         len(self._result['body']),
 529                         t.msecs,
 530                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 531
 532            if self.download_counter:
 533                self.download_counter.add(len(self._result['body']))
 534            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 535            if resp_md5 != locator.md5sum:
 536                _logger.warning("Checksum fail: md5(%s) = %s",
 537                                url, resp_md5)
 538                self._result['error'] = arvados.errors.HttpError(
 539                    0, 'Checksum fail')
 540                return None
 541            return self._result['body']
 542
 543        def put(self, hash_s, body, timeout=None, headers={}):
 544            put_headers = copy.copy(self.put_headers)
 545            put_headers.update(headers)
 546            url = self.root + hash_s
 547            _logger.debug("Request: PUT %s", url)
 548            curl = self._get_user_agent()
 549            ok = None
 550            try:
 551                with timer.Timer() as t:
 552                    self._headers = {}
 553                    body_reader = BytesIO(body)
 554                    response_body = BytesIO()
 555                    curl.setopt(pycurl.NOSIGNAL, 1)
 556                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 557                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 558                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 559                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 560                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 561                    # response) instead of sending the request body immediately.
 562                    # This allows the server to reject the request if the request
 563                    # is invalid or the server is read-only, without waiting for
 564                    # the client to send the entire block.
 565                    curl.setopt(pycurl.UPLOAD, True)
 566                    curl.setopt(pycurl.INFILESIZE, len(body))
 567                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 568                    curl.setopt(pycurl.HTTPHEADER, [
 569                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 570                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 571                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 572                    if self.insecure:
 573                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 574                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 575                    else:
 576                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 577                    self._setcurltimeouts(curl, timeout)
 578                    try:
 579                        curl.perform()
 580                    except Exception as e:
 581                        raise arvados.errors.HttpError(0, str(e))
 582                    finally:
 583                        if self._socket:
 584                            self._socket.close()
 585                            self._socket = None
 586                    self._result = {
 587                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 588                        'body': response_body.getvalue().decode('utf-8'),
 589                        'headers': self._headers,
 590                        'error': False,
 591                    }
 592                ok = retry.check_http_response_success(self._result['status_code'])
 593                if not ok:
 594                    self._result['error'] = arvados.errors.HttpError(
 595                        self._result['status_code'],
 596                        self._headers.get('x-status-line', 'Error'))
 597            except self.HTTP_ERRORS as e:
 598                self._result = {
 599                    'error': e,
 600                }
 601            self._usable = ok != False # still usable if ok is True or None
 602            if self._result.get('status_code', None):
 603                # Client is functional. See comment in get().
 604                self._put_user_agent(curl)
 605            else:
 606                curl.close()
 607            if not ok:
 608                _logger.debug("Request fail: PUT %s => %s: %s",
 609                              url, type(self._result['error']), str(self._result['error']))
 610                return False
 611            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 612                         self._result['status_code'],
 613                         len(body),
 614                         t.msecs,
 615                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 616            if self.upload_counter:
 617                self.upload_counter.add(len(body))
 618            return True
 619
 620
 621    class KeepWriterQueue(queue.Queue):
 622        def __init__(self, copies, classes=[]):
 623            queue.Queue.__init__(self) # Old-style superclass
 624            self.wanted_copies = copies
 625            self.wanted_storage_classes = classes
 626            self.successful_copies = 0
 627            self.confirmed_storage_classes = {}
 628            self.response = None
 629            self.storage_classes_tracking = True
 630            self.queue_data_lock = threading.RLock()
 631            self.pending_tries = max(copies, len(classes))
 632            self.pending_tries_notification = threading.Condition()
 633
 634        def write_success(self, response, replicas_nr, classes_confirmed):
 635            with self.queue_data_lock:
 636                self.successful_copies += replicas_nr
 637                if classes_confirmed is None:
 638                    self.storage_classes_tracking = False
 639                elif self.storage_classes_tracking:
 640                    for st_class, st_copies in classes_confirmed.items():
 641                        try:
 642                            self.confirmed_storage_classes[st_class] += st_copies
 643                        except KeyError:
 644                            self.confirmed_storage_classes[st_class] = st_copies
 645                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 646                self.response = response
 647            with self.pending_tries_notification:
 648                self.pending_tries_notification.notify_all()
 649
 650        def write_fail(self, ks):
 651            with self.pending_tries_notification:
 652                self.pending_tries += 1
 653                self.pending_tries_notification.notify()
 654
 655        def pending_copies(self):
 656            with self.queue_data_lock:
 657                return self.wanted_copies - self.successful_copies
 658
 659        def satisfied_classes(self):
 660            with self.queue_data_lock:
 661                if not self.storage_classes_tracking:
 662                    # Notifies disabled storage classes expectation to
 663                    # the outer loop.
 664                    return None
 665            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 666
 667        def pending_classes(self):
 668            with self.queue_data_lock:
 669                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 670                    return []
 671                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 672                for st_class, st_copies in self.confirmed_storage_classes.items():
 673                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 674                        unsatisfied_classes.remove(st_class)
 675                return unsatisfied_classes
 676
 677        def get_next_task(self):
 678            with self.pending_tries_notification:
 679                while True:
 680                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 681                        # This notify_all() is unnecessary --
 682                        # write_success() already called notify_all()
 683                        # when pending<1 became true, so it's not
 684                        # possible for any other thread to be in
 685                        # wait() now -- but it's cheap insurance
 686                        # against deadlock so we do it anyway:
 687                        self.pending_tries_notification.notify_all()
 688                        # Drain the queue and then raise Queue.Empty
 689                        while True:
 690                            self.get_nowait()
 691                            self.task_done()
 692                    elif self.pending_tries > 0:
 693                        service, service_root = self.get_nowait()
 694                        if service.finished():
 695                            self.task_done()
 696                            continue
 697                        self.pending_tries -= 1
 698                        return service, service_root
 699                    elif self.empty():
 700                        self.pending_tries_notification.notify_all()
 701                        raise queue.Empty
 702                    else:
 703                        self.pending_tries_notification.wait()
 704
 705
 706    class KeepWriterThreadPool(object):
 707        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 708            self.total_task_nr = 0
 709            if (not max_service_replicas) or (max_service_replicas >= copies):
 710                num_threads = 1
 711            else:
 712                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 713            _logger.debug("Pool max threads is %d", num_threads)
 714            self.workers = []
 715            self.queue = KeepClient.KeepWriterQueue(copies, classes)
 716            # Create workers
 717            for _ in range(num_threads):
 718                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
 719                self.workers.append(w)
 720
 721        def add_task(self, ks, service_root):
 722            self.queue.put((ks, service_root))
 723            self.total_task_nr += 1
 724
 725        def done(self):
 726            return self.queue.successful_copies, self.queue.satisfied_classes()
 727
 728        def join(self):
 729            # Start workers
 730            for worker in self.workers:
 731                worker.start()
 732            # Wait for finished work
 733            self.queue.join()
 734
 735        def response(self):
 736            return self.queue.response
 737
 738
 739    class KeepWriterThread(threading.Thread):
 740        class TaskFailed(RuntimeError): pass
 741
 742        def __init__(self, queue, data, data_hash, timeout=None):
 743            super(KeepClient.KeepWriterThread, self).__init__()
 744            self.timeout = timeout
 745            self.queue = queue
 746            self.data = data
 747            self.data_hash = data_hash
 748            self.daemon = True
 749
 750        def run(self):
 751            while True:
 752                try:
 753                    service, service_root = self.queue.get_next_task()
 754                except queue.Empty:
 755                    return
 756                try:
 757                    locator, copies, classes = self.do_task(service, service_root)
 758                except Exception as e:
 759                    if not isinstance(e, self.TaskFailed):
 760                        _logger.exception("Exception in KeepWriterThread")
 761                    self.queue.write_fail(service)
 762                else:
 763                    self.queue.write_success(locator, copies, classes)
 764                finally:
 765                    self.queue.task_done()
 766
 767        def do_task(self, service, service_root):
 768            classes = self.queue.pending_classes()
 769            headers = {}
 770            if len(classes) > 0:
 771                classes.sort()
 772                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 773            success = bool(service.put(self.data_hash,
 774                                        self.data,
 775                                        timeout=self.timeout,
 776                                        headers=headers))
 777            result = service.last_result()
 778
 779            if not success:
 780                if result.get('status_code'):
 781                    _logger.debug("Request fail: PUT %s => %s %s",
 782                                  self.data_hash,
 783                                  result.get('status_code'),
 784                                  result.get('body'))
 785                raise self.TaskFailed()
 786
 787            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
 788                          str(threading.current_thread()),
 789                          self.data_hash,
 790                          len(self.data),
 791                          service_root)
 792            try:
 793                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 794            except (KeyError, ValueError):
 795                replicas_stored = 1
 796
 797            classes_confirmed = {}
 798            try:
 799                scch = result['headers']['x-keep-storage-classes-confirmed']
 800                for confirmation in scch.replace(' ', '').split(','):
 801                    if '=' in confirmation:
 802                        stored_class, stored_copies = confirmation.split('=')[:2]
 803                        classes_confirmed[stored_class] = int(stored_copies)
 804            except (KeyError, ValueError):
 805                # Storage classes confirmed header missing or corrupt
 806                classes_confirmed = None
 807
 808            return result['body'].strip(), replicas_stored, classes_confirmed
 809
 810
 811    def __init__(self, api_client=None, proxy=None,
 812                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 813                 api_token=None, local_store=None, block_cache=None,
 814                 num_retries=10, session=None, num_prefetch_threads=None):
 815        """Initialize a new KeepClient.
 816
 817        Arguments:
 818        :api_client:
 819          The API client to use to find Keep services.  If not
 820          provided, KeepClient will build one from available Arvados
 821          configuration.
 822
 823        :proxy:
 824          If specified, this KeepClient will send requests to this Keep
 825          proxy.  Otherwise, KeepClient will fall back to the setting of the
 826          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 827          If you want to KeepClient does not use a proxy, pass in an empty
 828          string.
 829
 830        :timeout:
 831          The initial timeout (in seconds) for HTTP requests to Keep
 832          non-proxy servers.  A tuple of three floats is interpreted as
 833          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 834          will be aborted if the average traffic rate falls below
 835          minimum_bandwidth bytes per second over an interval of read_timeout
 836          seconds. Because timeouts are often a result of transient server
 837          load, the actual connection timeout will be increased by a factor
 838          of two on each retry.
 839          Default: (2, 256, 32768).
 840
 841        :proxy_timeout:
 842          The initial timeout (in seconds) for HTTP requests to
 843          Keep proxies. A tuple of three floats is interpreted as
 844          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 845          described above for adjusting connection timeouts on retry also
 846          applies.
 847          Default: (20, 256, 32768).
 848
 849        :api_token:
 850          If you're not using an API client, but only talking
 851          directly to a Keep proxy, this parameter specifies an API token
 852          to authenticate Keep requests.  It is an error to specify both
 853          api_client and api_token.  If you specify neither, KeepClient
 854          will use one available from the Arvados configuration.
 855
 856        :local_store:
 857          If specified, this KeepClient will bypass Keep
 858          services, and save data to the named directory.  If unspecified,
 859          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 860          environment variable.  If you want to ensure KeepClient does not
 861          use local storage, pass in an empty string.  This is primarily
 862          intended to mock a server for testing.
 863
 864        :num_retries:
 865          The default number of times to retry failed requests.
 866          This will be used as the default num_retries value when get() and
 867          put() are called.  Default 10.
 868        """
 869        self.lock = threading.Lock()
 870        if proxy is None:
 871            if config.get('ARVADOS_KEEP_SERVICES'):
 872                proxy = config.get('ARVADOS_KEEP_SERVICES')
 873            else:
 874                proxy = config.get('ARVADOS_KEEP_PROXY')
 875        if api_token is None:
 876            if api_client is None:
 877                api_token = config.get('ARVADOS_API_TOKEN')
 878            else:
 879                api_token = api_client.api_token
 880        elif api_client is not None:
 881            raise ValueError(
 882                "can't build KeepClient with both API client and token")
 883        if local_store is None:
 884            local_store = os.environ.get('KEEP_LOCAL_STORE')
 885
 886        if api_client is None:
 887            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 888        else:
 889            self.insecure = api_client.insecure
 890
 891        self.block_cache = block_cache if block_cache else KeepBlockCache()
 892        self.timeout = timeout
 893        self.proxy_timeout = proxy_timeout
 894        self._user_agent_pool = queue.LifoQueue()
 895        self.upload_counter = Counter()
 896        self.download_counter = Counter()
 897        self.put_counter = Counter()
 898        self.get_counter = Counter()
 899        self.hits_counter = Counter()
 900        self.misses_counter = Counter()
 901        self._storage_classes_unsupported_warning = False
 902        self._default_classes = []
 903        if num_prefetch_threads is not None:
 904            self.num_prefetch_threads = num_prefetch_threads
 905        else:
 906            self.num_prefetch_threads = 2
 907        self._prefetch_queue = None
 908        self._prefetch_threads = None
 909
 910        if local_store:
 911            self.local_store = local_store
 912            self.head = self.local_store_head
 913            self.get = self.local_store_get
 914            self.put = self.local_store_put
 915        else:
 916            self.num_retries = num_retries
 917            self.max_replicas_per_service = None
 918            if proxy:
 919                proxy_uris = proxy.split()
 920                for i in range(len(proxy_uris)):
 921                    if not proxy_uris[i].endswith('/'):
 922                        proxy_uris[i] += '/'
 923                    # URL validation
 924                    url = urllib.parse.urlparse(proxy_uris[i])
 925                    if not (url.scheme and url.netloc):
 926                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 927                self.api_token = api_token
 928                self._gateway_services = {}
 929                self._keep_services = [{
 930                    'uuid': "00000-bi6l4-%015d" % idx,
 931                    'service_type': 'proxy',
 932                    '_service_root': uri,
 933                    } for idx, uri in enumerate(proxy_uris)]
 934                self._writable_services = self._keep_services
 935                self.using_proxy = True
 936                self._static_services_list = True
 937            else:
 938                # It's important to avoid instantiating an API client
 939                # unless we actually need one, for testing's sake.
 940                if api_client is None:
 941                    api_client = arvados.api('v1')
 942                self.api_client = api_client
 943                self.api_token = api_client.api_token
 944                self._gateway_services = {}
 945                self._keep_services = None
 946                self._writable_services = None
 947                self.using_proxy = None
 948                self._static_services_list = False
 949                try:
 950                    self._default_classes = [
 951                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 952                except KeyError:
 953                    # We're talking to an old cluster
 954                    pass
 955
 956    def current_timeout(self, attempt_number):
 957        """Return the appropriate timeout to use for this client.
 958
 959        The proxy timeout setting if the backend service is currently a proxy,
 960        the regular timeout setting otherwise.  The `attempt_number` indicates
 961        how many times the operation has been tried already (starting from 0
 962        for the first try), and scales the connection timeout portion of the
 963        return value accordingly.
 964
 965        """
 966        # TODO(twp): the timeout should be a property of a
 967        # KeepService, not a KeepClient. See #4488.
 968        t = self.proxy_timeout if self.using_proxy else self.timeout
 969        if len(t) == 2:
 970            return (t[0] * (1 << attempt_number), t[1])
 971        else:
 972            return (t[0] * (1 << attempt_number), t[1], t[2])
 973    def _any_nondisk_services(self, service_list):
 974        return any(ks.get('service_type', 'disk') != 'disk'
 975                   for ks in service_list)
 976
 977    def build_services_list(self, force_rebuild=False):
 978        if (self._static_services_list or
 979              (self._keep_services and not force_rebuild)):
 980            return
 981        with self.lock:
 982            try:
 983                keep_services = self.api_client.keep_services().accessible()
 984            except Exception:  # API server predates Keep services.
 985                keep_services = self.api_client.keep_disks().list()
 986
 987            # Gateway services are only used when specified by UUID,
 988            # so there's nothing to gain by filtering them by
 989            # service_type.
 990            self._gateway_services = {ks['uuid']: ks for ks in
 991                                      keep_services.execute()['items']}
 992            if not self._gateway_services:
 993                raise arvados.errors.NoKeepServersError()
 994
 995            # Precompute the base URI for each service.
 996            for r in self._gateway_services.values():
 997                host = r['service_host']
 998                if not host.startswith('[') and host.find(':') >= 0:
 999                    # IPv6 URIs must be formatted like http://[::1]:80/...
1000                    host = '[' + host + ']'
1001                r['_service_root'] = "{}://{}:{:d}/".format(
1002                    'https' if r['service_ssl_flag'] else 'http',
1003                    host,
1004                    r['service_port'])
1005
1006            _logger.debug(str(self._gateway_services))
1007            self._keep_services = [
1008                ks for ks in self._gateway_services.values()
1009                if not ks.get('service_type', '').startswith('gateway:')]
1010            self._writable_services = [ks for ks in self._keep_services
1011                                       if not ks.get('read_only')]
1012
1013            # For disk type services, max_replicas_per_service is 1
1014            # It is unknown (unlimited) for other service types.
1015            if self._any_nondisk_services(self._writable_services):
1016                self.max_replicas_per_service = None
1017            else:
1018                self.max_replicas_per_service = 1
1019
1020    def _service_weight(self, data_hash, service_uuid):
1021        """Compute the weight of a Keep service endpoint for a data
1022        block with a known hash.
1023
1024        The weight is md5(h + u) where u is the last 15 characters of
1025        the service endpoint's UUID.
1026        """
1027        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1028
1029    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1030        """Return an array of Keep service endpoints, in the order in
1031        which they should be probed when reading or writing data with
1032        the given hash+hints.
1033        """
1034        self.build_services_list(force_rebuild)
1035
1036        sorted_roots = []
1037        # Use the services indicated by the given +K@... remote
1038        # service hints, if any are present and can be resolved to a
1039        # URI.
1040        for hint in locator.hints:
1041            if hint.startswith('K@'):
1042                if len(hint) == 7:
1043                    sorted_roots.append(
1044                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1045                elif len(hint) == 29:
1046                    svc = self._gateway_services.get(hint[2:])
1047                    if svc:
1048                        sorted_roots.append(svc['_service_root'])
1049
1050        # Sort the available local services by weight (heaviest first)
1051        # for this locator, and return their service_roots (base URIs)
1052        # in that order.
1053        use_services = self._keep_services
1054        if need_writable:
1055            use_services = self._writable_services
1056        self.using_proxy = self._any_nondisk_services(use_services)
1057        sorted_roots.extend([
1058            svc['_service_root'] for svc in sorted(
1059                use_services,
1060                reverse=True,
1061                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1062        _logger.debug("{}: {}".format(locator, sorted_roots))
1063        return sorted_roots
1064
1065    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1066        # roots_map is a dictionary, mapping Keep service root strings
1067        # to KeepService objects.  Poll for Keep services, and add any
1068        # new ones to roots_map.  Return the current list of local
1069        # root strings.
1070        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1071        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1072        for root in local_roots:
1073            if root not in roots_map:
1074                roots_map[root] = self.KeepService(
1075                    root, self._user_agent_pool,
1076                    upload_counter=self.upload_counter,
1077                    download_counter=self.download_counter,
1078                    headers=headers,
1079                    insecure=self.insecure)
1080        return local_roots
1081
1082    @staticmethod
1083    def _check_loop_result(result):
1084        # KeepClient RetryLoops should save results as a 2-tuple: the
1085        # actual result of the request, and the number of servers available
1086        # to receive the request this round.
1087        # This method returns True if there's a real result, False if
1088        # there are no more servers available, otherwise None.
1089        if isinstance(result, Exception):
1090            return None
1091        result, tried_server_count = result
1092        if (result is not None) and (result is not False):
1093            return True
1094        elif tried_server_count < 1:
1095            _logger.info("No more Keep services to try; giving up")
1096            return False
1097        else:
1098            return None
1099
1100    def get_from_cache(self, loc_s):
1101        """Fetch a block only if is in the cache, otherwise return None."""
1102        locator = KeepLocator(loc_s)
1103        slot = self.block_cache.get(locator.md5sum)
1104        if slot is not None and slot.ready.is_set():
1105            return slot.get()
1106        else:
1107            return None
1108
1109    def refresh_signature(self, loc):
1110        """Ask Keep to get the remote block and return its local signature"""
1111        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1112        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1113
1114    @retry.retry_method
1115    def head(self, loc_s, **kwargs):
1116        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1117
1118    @retry.retry_method
1119    def get(self, loc_s, **kwargs):
1120        return self._get_or_head(loc_s, method="GET", **kwargs)
1121
1122    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1123        """Get data from Keep.
1124
1125        This method fetches one or more blocks of data from Keep.  It
1126        sends a request each Keep service registered with the API
1127        server (or the proxy provided when this client was
1128        instantiated), then each service named in location hints, in
1129        sequence.  As soon as one service provides the data, it's
1130        returned.
1131
1132        Arguments:
1133        * loc_s: A string of one or more comma-separated locators to fetch.
1134          This method returns the concatenation of these blocks.
1135        * num_retries: The number of times to retry GET requests to
1136          *each* Keep server if it returns temporary failures, with
1137          exponential backoff.  Note that, in each loop, the method may try
1138          to fetch data from every available Keep service, along with any
1139          that are named in location hints in the locator.  The default value
1140          is set when the KeepClient is initialized.
1141        """
1142        if ',' in loc_s:
1143            return ''.join(self.get(x) for x in loc_s.split(','))
1144
1145        self.get_counter.add(1)
1146
1147        request_id = (request_id or
1148                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1149                      arvados.util.new_request_id())
1150        if headers is None:
1151            headers = {}
1152        headers['X-Request-Id'] = request_id
1153
1154        slot = None
1155        blob = None
1156        try:
1157            locator = KeepLocator(loc_s)
1158            if method == "GET":
1159                while slot is None:
1160                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1161                    if first:
1162                        # Fresh and empty "first time it is used" slot
1163                        break
1164                    if prefetch:
1165                        # this is request for a prefetch to fill in
1166                        # the cache, don't need to wait for the
1167                        # result, so if it is already in flight return
1168                        # immediately.  Clear 'slot' to prevent
1169                        # finally block from calling slot.set()
1170                        if slot.ready.is_set():
1171                            slot.get()
1172                        slot = None
1173                        return None
1174
1175                    blob = slot.get()
1176                    if blob is not None:
1177                        self.hits_counter.add(1)
1178                        return blob
1179
1180                    # If blob is None, this means either
1181                    #
1182                    # (a) another thread was fetching this block and
1183                    # failed with an error or
1184                    #
1185                    # (b) cache thrashing caused the slot to be
1186                    # evicted (content set to None) by another thread
1187                    # between the call to reserve_cache() and get().
1188                    #
1189                    # We'll handle these cases by reserving a new slot
1190                    # and then doing a full GET request.
1191                    slot = None
1192
1193            self.misses_counter.add(1)
1194
1195            # If the locator has hints specifying a prefix (indicating a
1196            # remote keepproxy) or the UUID of a local gateway service,
1197            # read data from the indicated service(s) instead of the usual
1198            # list of local disk services.
1199            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1200                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1201            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1202                               for hint in locator.hints if (
1203                                       hint.startswith('K@') and
1204                                       len(hint) == 29 and
1205                                       self._gateway_services.get(hint[2:])
1206                                       )])
1207            # Map root URLs to their KeepService objects.
1208            roots_map = {
1209                root: self.KeepService(root, self._user_agent_pool,
1210                                       upload_counter=self.upload_counter,
1211                                       download_counter=self.download_counter,
1212                                       headers=headers,
1213                                       insecure=self.insecure)
1214                for root in hint_roots
1215            }
1216
1217            # See #3147 for a discussion of the loop implementation.  Highlights:
1218            # * Refresh the list of Keep services after each failure, in case
1219            #   it's being updated.
1220            # * Retry until we succeed, we're out of retries, or every available
1221            #   service has returned permanent failure.
1222            sorted_roots = []
1223            roots_map = {}
1224            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1225                                   backoff_start=2)
1226            for tries_left in loop:
1227                try:
1228                    sorted_roots = self.map_new_services(
1229                        roots_map, locator,
1230                        force_rebuild=(tries_left < num_retries),
1231                        need_writable=False,
1232                        headers=headers)
1233                except Exception as error:
1234                    loop.save_result(error)
1235                    continue
1236
1237                # Query KeepService objects that haven't returned
1238                # permanent failure, in our specified shuffle order.
1239                services_to_try = [roots_map[root]
1240                                   for root in sorted_roots
1241                                   if roots_map[root].usable()]
1242                for keep_service in services_to_try:
1243                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1244                    if blob is not None:
1245                        break
1246                loop.save_result((blob, len(services_to_try)))
1247
1248            # Always cache the result, then return it if we succeeded.
1249            if loop.success():
1250                return blob
1251        finally:
1252            if slot is not None:
1253                self.block_cache.set(slot, blob)
1254
1255        # Q: Including 403 is necessary for the Keep tests to continue
1256        # passing, but maybe they should expect KeepReadError instead?
1257        not_founds = sum(1 for key in sorted_roots
1258                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1259        service_errors = ((key, roots_map[key].last_result()['error'])
1260                          for key in sorted_roots)
1261        if not roots_map:
1262            raise arvados.errors.KeepReadError(
1263                "[{}] failed to read {}: no Keep services available ({})".format(
1264                    request_id, loc_s, loop.last_result()))
1265        elif not_founds == len(sorted_roots):
1266            raise arvados.errors.NotFoundError(
1267                "[{}] {} not found".format(request_id, loc_s), service_errors)
1268        else:
1269            raise arvados.errors.KeepReadError(
1270                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1271
1272    @retry.retry_method
1273    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1274        """Save data in Keep.
1275
1276        This method will get a list of Keep services from the API server, and
1277        send the data to each one simultaneously in a new thread.  Once the
1278        uploads are finished, if enough copies are saved, this method returns
1279        the most recent HTTP response body.  If requests fail to upload
1280        enough copies, this method raises KeepWriteError.
1281
1282        Arguments:
1283        * data: The string of data to upload.
1284        * copies: The number of copies that the user requires be saved.
1285          Default 2.
1286        * num_retries: The number of times to retry PUT requests to
1287          *each* Keep server if it returns temporary failures, with
1288          exponential backoff.  The default value is set when the
1289          KeepClient is initialized.
1290        * classes: An optional list of storage class names where copies should
1291          be written.
1292        """
1293
1294        classes = classes or self._default_classes
1295
1296        if not isinstance(data, bytes):
1297            data = data.encode()
1298
1299        self.put_counter.add(1)
1300
1301        data_hash = hashlib.md5(data).hexdigest()
1302        loc_s = data_hash + '+' + str(len(data))
1303        if copies < 1:
1304            return loc_s
1305        locator = KeepLocator(loc_s)
1306
1307        request_id = (request_id or
1308                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1309                      arvados.util.new_request_id())
1310        headers = {
1311            'X-Request-Id': request_id,
1312            'X-Keep-Desired-Replicas': str(copies),
1313        }
1314        roots_map = {}
1315        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1316                               backoff_start=2)
1317        done_copies = 0
1318        done_classes = []
1319        for tries_left in loop:
1320            try:
1321                sorted_roots = self.map_new_services(
1322                    roots_map, locator,
1323                    force_rebuild=(tries_left < num_retries),
1324                    need_writable=True,
1325                    headers=headers)
1326            except Exception as error:
1327                loop.save_result(error)
1328                continue
1329
1330            pending_classes = []
1331            if done_classes is not None:
1332                pending_classes = list(set(classes) - set(done_classes))
1333            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1334                                                        data_hash=data_hash,
1335                                                        copies=copies - done_copies,
1336                                                        max_service_replicas=self.max_replicas_per_service,
1337                                                        timeout=self.current_timeout(num_retries - tries_left),
1338                                                        classes=pending_classes)
1339            for service_root, ks in [(root, roots_map[root])
1340                                     for root in sorted_roots]:
1341                if ks.finished():
1342                    continue
1343                writer_pool.add_task(ks, service_root)
1344            writer_pool.join()
1345            pool_copies, pool_classes = writer_pool.done()
1346            done_copies += pool_copies
1347            if (done_classes is not None) and (pool_classes is not None):
1348                done_classes += pool_classes
1349                loop.save_result(
1350                    (done_copies >= copies and set(done_classes) == set(classes),
1351                    writer_pool.total_task_nr))
1352            else:
1353                # Old keepstore contacted without storage classes support:
1354                # success is determined only by successful copies.
1355                #
1356                # Disable storage classes tracking from this point forward.
1357                if not self._storage_classes_unsupported_warning:
1358                    self._storage_classes_unsupported_warning = True
1359                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1360                done_classes = None
1361                loop.save_result(
1362                    (done_copies >= copies, writer_pool.total_task_nr))
1363
1364        if loop.success():
1365            return writer_pool.response()
1366        if not roots_map:
1367            raise arvados.errors.KeepWriteError(
1368                "[{}] failed to write {}: no Keep services available ({})".format(
1369                    request_id, data_hash, loop.last_result()))
1370        else:
1371            service_errors = ((key, roots_map[key].last_result()['error'])
1372                              for key in sorted_roots
1373                              if roots_map[key].last_result()['error'])
1374            raise arvados.errors.KeepWriteError(
1375                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1376                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1377
1378    def _block_prefetch_worker(self):
1379        """The background downloader thread."""
1380        while True:
1381            try:
1382                b = self._prefetch_queue.get()
1383                if b is None:
1384                    return
1385                self.get(b, prefetch=True)
1386            except Exception:
1387                _logger.exception("Exception doing block prefetch")
1388
1389    def _start_prefetch_threads(self):
1390        if self._prefetch_threads is None:
1391            with self.lock:
1392                if self._prefetch_threads is not None:
1393                    return
1394                self._prefetch_queue = queue.Queue()
1395                self._prefetch_threads = []
1396                for i in range(0, self.num_prefetch_threads):
1397                    thread = threading.Thread(target=self._block_prefetch_worker)
1398                    self._prefetch_threads.append(thread)
1399                    thread.daemon = True
1400                    thread.start()
1401
1402    def block_prefetch(self, locator):
1403        """
1404        This relies on the fact that KeepClient implements a block cache,
1405        so repeated requests for the same block will not result in repeated
1406        downloads (unless the block is evicted from the cache.)  This method
1407        does not block.
1408        """
1409
1410        if self.block_cache.get(locator) is not None:
1411            return
1412
1413        self._start_prefetch_threads()
1414        self._prefetch_queue.put(locator)
1415
1416    def stop_prefetch_threads(self):
1417        with self.lock:
1418            if self._prefetch_threads is not None:
1419                for t in self._prefetch_threads:
1420                    self._prefetch_queue.put(None)
1421                for t in self._prefetch_threads:
1422                    t.join()
1423            self._prefetch_threads = None
1424            self._prefetch_queue = None
1425
1426    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1427        """A stub for put().
1428
1429        This method is used in place of the real put() method when
1430        using local storage (see constructor's local_store argument).
1431
1432        copies and num_retries arguments are ignored: they are here
1433        only for the sake of offering the same call signature as
1434        put().
1435
1436        Data stored this way can be retrieved via local_store_get().
1437        """
1438        md5 = hashlib.md5(data).hexdigest()
1439        locator = '%s+%d' % (md5, len(data))
1440        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1441            f.write(data)
1442        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1443                  os.path.join(self.local_store, md5))
1444        return locator
1445
1446    def local_store_get(self, loc_s, num_retries=None):
1447        """Companion to local_store_put()."""
1448        try:
1449            locator = KeepLocator(loc_s)
1450        except ValueError:
1451            raise arvados.errors.NotFoundError(
1452                "Invalid data locator: '%s'" % loc_s)
1453        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1454            return b''
1455        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1456            return f.read()
1457
1458    def local_store_head(self, loc_s, num_retries=None):
1459        """Companion to local_store_put()."""
1460        try:
1461            locator = KeepLocator(loc_s)
1462        except ValueError:
1463            raise arvados.errors.NotFoundError(
1464                "Invalid data locator: '%s'" % loc_s)
1465        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1466            return True
1467        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1468            return True
global_client_object = None
class KeepLocator:
 51class KeepLocator(object):
 52    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
 53    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 54
 55    def __init__(self, locator_str):
 56        self.hints = []
 57        self._perm_sig = None
 58        self._perm_expiry = None
 59        pieces = iter(locator_str.split('+'))
 60        self.md5sum = next(pieces)
 61        try:
 62            self.size = int(next(pieces))
 63        except StopIteration:
 64            self.size = None
 65        for hint in pieces:
 66            if self.HINT_RE.match(hint) is None:
 67                raise ValueError("invalid hint format: {}".format(hint))
 68            elif hint.startswith('A'):
 69                self.parse_permission_hint(hint)
 70            else:
 71                self.hints.append(hint)
 72
 73    def __str__(self):
 74        return '+'.join(
 75            str(s)
 76            for s in [self.md5sum, self.size,
 77                      self.permission_hint()] + self.hints
 78            if s is not None)
 79
 80    def stripped(self):
 81        if self.size is not None:
 82            return "%s+%i" % (self.md5sum, self.size)
 83        else:
 84            return self.md5sum
 85
 86    def _make_hex_prop(name, length):
 87        # Build and return a new property with the given name that
 88        # must be a hex string of the given length.
 89        data_name = '_{}'.format(name)
 90        def getter(self):
 91            return getattr(self, data_name)
 92        def setter(self, hex_str):
 93            if not arvados.util.is_hex(hex_str, length):
 94                raise ValueError("{} is not a {}-digit hex string: {!r}".
 95                                 format(name, length, hex_str))
 96            setattr(self, data_name, hex_str)
 97        return property(getter, setter)
 98
 99    md5sum = _make_hex_prop('md5sum', 32)
100    perm_sig = _make_hex_prop('perm_sig', 40)
101
102    @property
103    def perm_expiry(self):
104        return self._perm_expiry
105
106    @perm_expiry.setter
107    def perm_expiry(self, value):
108        if not arvados.util.is_hex(value, 1, 8):
109            raise ValueError(
110                "permission timestamp must be a hex Unix timestamp: {}".
111                format(value))
112        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113
114    def permission_hint(self):
115        data = [self.perm_sig, self.perm_expiry]
116        if None in data:
117            return None
118        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119        return "A{}@{:08x}".format(*data)
120
121    def parse_permission_hint(self, s):
122        try:
123            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124        except IndexError:
125            raise ValueError("bad permission hint {}".format(s))
126
127    def permission_expired(self, as_of_dt=None):
128        if self.perm_expiry is None:
129            return False
130        elif as_of_dt is None:
131            as_of_dt = datetime.datetime.now()
132        return self.perm_expiry <= as_of_dt
KeepLocator(locator_str)
55    def __init__(self, locator_str):
56        self.hints = []
57        self._perm_sig = None
58        self._perm_expiry = None
59        pieces = iter(locator_str.split('+'))
60        self.md5sum = next(pieces)
61        try:
62            self.size = int(next(pieces))
63        except StopIteration:
64            self.size = None
65        for hint in pieces:
66            if self.HINT_RE.match(hint) is None:
67                raise ValueError("invalid hint format: {}".format(hint))
68            elif hint.startswith('A'):
69                self.parse_permission_hint(hint)
70            else:
71                self.hints.append(hint)
EPOCH_DATETIME = datetime.datetime(1970, 1, 1, 0, 0)
HINT_RE = re.compile('^[A-Z][A-Za-z0-9@_-]+$')
hints
md5sum
90        def getter(self):
91            return getattr(self, data_name)
def stripped(self):
80    def stripped(self):
81        if self.size is not None:
82            return "%s+%i" % (self.md5sum, self.size)
83        else:
84            return self.md5sum
perm_sig
90        def getter(self):
91            return getattr(self, data_name)
perm_expiry
102    @property
103    def perm_expiry(self):
104        return self._perm_expiry
def permission_hint(self):
114    def permission_hint(self):
115        data = [self.perm_sig, self.perm_expiry]
116        if None in data:
117            return None
118        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119        return "A{}@{:08x}".format(*data)
def parse_permission_hint(self, s):
121    def parse_permission_hint(self, s):
122        try:
123            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124        except IndexError:
125            raise ValueError("bad permission hint {}".format(s))
def permission_expired(self, as_of_dt=None):
127    def permission_expired(self, as_of_dt=None):
128        if self.perm_expiry is None:
129            return False
130        elif as_of_dt is None:
131            as_of_dt = datetime.datetime.now()
132        return self.perm_expiry <= as_of_dt
class Keep:
135class Keep(object):
136    """Simple interface to a global KeepClient object.
137
138    THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
139    own API client.  The global KeepClient will build an API client from the
140    current Arvados configuration, which may not match the one you built.
141    """
142    _last_key = None
143
144    @classmethod
145    def global_client_object(cls):
146        global global_client_object
147        # Previously, KeepClient would change its behavior at runtime based
148        # on these configuration settings.  We simulate that behavior here
149        # by checking the values and returning a new KeepClient if any of
150        # them have changed.
151        key = (config.get('ARVADOS_API_HOST'),
152               config.get('ARVADOS_API_TOKEN'),
153               config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
154               config.get('ARVADOS_KEEP_PROXY'),
155               os.environ.get('KEEP_LOCAL_STORE'))
156        if (global_client_object is None) or (cls._last_key != key):
157            global_client_object = KeepClient()
158            cls._last_key = key
159        return global_client_object
160
161    @staticmethod
162    def get(locator, **kwargs):
163        return Keep.global_client_object().get(locator, **kwargs)
164
165    @staticmethod
166    def put(data, **kwargs):
167        return Keep.global_client_object().put(data, **kwargs)

Simple interface to a global KeepClient object.

THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your own API client. The global KeepClient will build an API client from the current Arvados configuration, which may not match the one you built.

@classmethod
def global_client_object(cls):
144    @classmethod
145    def global_client_object(cls):
146        global global_client_object
147        # Previously, KeepClient would change its behavior at runtime based
148        # on these configuration settings.  We simulate that behavior here
149        # by checking the values and returning a new KeepClient if any of
150        # them have changed.
151        key = (config.get('ARVADOS_API_HOST'),
152               config.get('ARVADOS_API_TOKEN'),
153               config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
154               config.get('ARVADOS_KEEP_PROXY'),
155               os.environ.get('KEEP_LOCAL_STORE'))
156        if (global_client_object is None) or (cls._last_key != key):
157            global_client_object = KeepClient()
158            cls._last_key = key
159        return global_client_object
@staticmethod
def get(locator, **kwargs):
161    @staticmethod
162    def get(locator, **kwargs):
163        return Keep.global_client_object().get(locator, **kwargs)
@staticmethod
def put(data, **kwargs):
165    @staticmethod
166    def put(data, **kwargs):
167        return Keep.global_client_object().put(data, **kwargs)
class KeepBlockCache:
169class KeepBlockCache(object):
170    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
171        self.cache_max = cache_max
172        self._cache = collections.OrderedDict()
173        self._cache_lock = threading.Lock()
174        self._max_slots = max_slots
175        self._disk_cache = disk_cache
176        self._disk_cache_dir = disk_cache_dir
177        self._cache_updating = threading.Condition(self._cache_lock)
178
179        if self._disk_cache and self._disk_cache_dir is None:
180            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
181            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
182
183        if self._max_slots == 0:
184            if self._disk_cache:
185                # Each block uses two file descriptors, one used to
186                # open it initially and hold the flock(), and a second
187                # hidden one used by mmap().
188                #
189                # Set max slots to 1/8 of maximum file handles.  This
190                # means we'll use at most 1/4 of total file handles.
191                #
192                # NOFILE typically defaults to 1024 on Linux so this
193                # is 128 slots (256 file handles), which means we can
194                # cache up to 8 GiB of 64 MiB blocks.  This leaves
195                # 768 file handles for sockets and other stuff.
196                #
197                # When we want the ability to have more cache (e.g. in
198                # arv-mount) we'll increase rlimit before calling
199                # this.
200                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
201            else:
202                # RAM cache slots
203                self._max_slots = 512
204
205        if self.cache_max == 0:
206            if self._disk_cache:
207                fs = os.statvfs(self._disk_cache_dir)
208                # Calculation of available space incorporates existing cache usage
209                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
210                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
211                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
212                # pick smallest of:
213                # 10% of total disk size
214                # 25% of available space
215                # max_slots * 64 MiB
216                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217            else:
218                # 256 MiB in RAM
219                self.cache_max = (256 * 1024 * 1024)
220
221        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222
223        self.cache_total = 0
224        if self._disk_cache:
225            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226            for slot in self._cache.values():
227                self.cache_total += slot.size()
228            self.cap_cache()
229
230    class CacheSlot(object):
231        __slots__ = ("locator", "ready", "content")
232
233        def __init__(self, locator):
234            self.locator = locator
235            self.ready = threading.Event()
236            self.content = None
237
238        def get(self):
239            self.ready.wait()
240            return self.content
241
242        def set(self, value):
243            if self.content is not None:
244                return False
245            self.content = value
246            self.ready.set()
247            return True
248
249        def size(self):
250            if self.content is None:
251                return 0
252            else:
253                return len(self.content)
254
255        def evict(self):
256            self.content = None
257
258
259    def _resize_cache(self, cache_max, max_slots):
260        # Try and make sure the contents of the cache do not exceed
261        # the supplied maximums.
262
263        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
264            return
265
266        _evict_candidates = collections.deque(self._cache.values())
267        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
268            slot = _evict_candidates.popleft()
269            if not slot.ready.is_set():
270                continue
271
272            sz = slot.size()
273            slot.evict()
274            self.cache_total -= sz
275            del self._cache[slot.locator]
276
277
278    def cap_cache(self):
279        '''Cap the cache size to self.cache_max'''
280        with self._cache_updating:
281            self._resize_cache(self.cache_max, self._max_slots)
282            self._cache_updating.notify_all()
283
284    def _get(self, locator):
285        # Test if the locator is already in the cache
286        if locator in self._cache:
287            n = self._cache[locator]
288            if n.ready.is_set() and n.content is None:
289                del self._cache[n.locator]
290                return None
291            self._cache.move_to_end(locator)
292            return n
293        if self._disk_cache:
294            # see if it exists on disk
295            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
296            if n is not None:
297                self._cache[n.locator] = n
298                self.cache_total += n.size()
299                return n
300        return None
301
302    def get(self, locator):
303        with self._cache_lock:
304            return self._get(locator)
305
306    def reserve_cache(self, locator):
307        '''Reserve a cache slot for the specified locator,
308        or return the existing slot.'''
309        with self._cache_updating:
310            n = self._get(locator)
311            if n:
312                return n, False
313            else:
314                # Add a new cache slot for the locator
315                self._resize_cache(self.cache_max, self._max_slots-1)
316                while len(self._cache) >= self._max_slots:
317                    # If there isn't a slot available, need to wait
318                    # for something to happen that releases one of the
319                    # cache slots.  Idle for 200 ms or woken up by
320                    # another thread
321                    self._cache_updating.wait(timeout=0.2)
322                    self._resize_cache(self.cache_max, self._max_slots-1)
323
324                if self._disk_cache:
325                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
326                else:
327                    n = KeepBlockCache.CacheSlot(locator)
328                self._cache[n.locator] = n
329                return n, True
330
331    def set(self, slot, blob):
332        try:
333            if slot.set(blob):
334                self.cache_total += slot.size()
335            return
336        except OSError as e:
337            if e.errno == errno.ENOMEM:
338                # Reduce max slots to current - 4, cap cache and retry
339                with self._cache_lock:
340                    self._max_slots = max(4, len(self._cache) - 4)
341            elif e.errno == errno.ENOSPC:
342                # Reduce disk max space to current - 256 MiB, cap cache and retry
343                with self._cache_lock:
344                    sm = sum(st.size() for st in self._cache.values())
345                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
346            elif e.errno == errno.ENODEV:
347                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
348        except Exception as e:
349            pass
350        finally:
351            # Check if we should evict things from the cache.  Either
352            # because we added a new thing or there was an error and
353            # we possibly adjusted the limits down, so we might need
354            # to push something out.
355            self.cap_cache()
356
357        try:
358            # Only gets here if there was an error the first time. The
359            # exception handler adjusts limits downward in some cases
360            # to free up resources, which would make the operation
361            # succeed.
362            if slot.set(blob):
363                self.cache_total += slot.size()
364        except Exception as e:
365            # It failed again.  Give up.
366            slot.set(None)
367            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
368
369        self.cap_cache()
KeepBlockCache(cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None)
170    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
171        self.cache_max = cache_max
172        self._cache = collections.OrderedDict()
173        self._cache_lock = threading.Lock()
174        self._max_slots = max_slots
175        self._disk_cache = disk_cache
176        self._disk_cache_dir = disk_cache_dir
177        self._cache_updating = threading.Condition(self._cache_lock)
178
179        if self._disk_cache and self._disk_cache_dir is None:
180            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
181            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
182
183        if self._max_slots == 0:
184            if self._disk_cache:
185                # Each block uses two file descriptors, one used to
186                # open it initially and hold the flock(), and a second
187                # hidden one used by mmap().
188                #
189                # Set max slots to 1/8 of maximum file handles.  This
190                # means we'll use at most 1/4 of total file handles.
191                #
192                # NOFILE typically defaults to 1024 on Linux so this
193                # is 128 slots (256 file handles), which means we can
194                # cache up to 8 GiB of 64 MiB blocks.  This leaves
195                # 768 file handles for sockets and other stuff.
196                #
197                # When we want the ability to have more cache (e.g. in
198                # arv-mount) we'll increase rlimit before calling
199                # this.
200                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
201            else:
202                # RAM cache slots
203                self._max_slots = 512
204
205        if self.cache_max == 0:
206            if self._disk_cache:
207                fs = os.statvfs(self._disk_cache_dir)
208                # Calculation of available space incorporates existing cache usage
209                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
210                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
211                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
212                # pick smallest of:
213                # 10% of total disk size
214                # 25% of available space
215                # max_slots * 64 MiB
216                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
217            else:
218                # 256 MiB in RAM
219                self.cache_max = (256 * 1024 * 1024)
220
221        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
222
223        self.cache_total = 0
224        if self._disk_cache:
225            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
226            for slot in self._cache.values():
227                self.cache_total += slot.size()
228            self.cap_cache()
cache_max
cache_total
def cap_cache(self):
278    def cap_cache(self):
279        '''Cap the cache size to self.cache_max'''
280        with self._cache_updating:
281            self._resize_cache(self.cache_max, self._max_slots)
282            self._cache_updating.notify_all()

Cap the cache size to self.cache_max

def get(self, locator):
302    def get(self, locator):
303        with self._cache_lock:
304            return self._get(locator)
def reserve_cache(self, locator):
306    def reserve_cache(self, locator):
307        '''Reserve a cache slot for the specified locator,
308        or return the existing slot.'''
309        with self._cache_updating:
310            n = self._get(locator)
311            if n:
312                return n, False
313            else:
314                # Add a new cache slot for the locator
315                self._resize_cache(self.cache_max, self._max_slots-1)
316                while len(self._cache) >= self._max_slots:
317                    # If there isn't a slot available, need to wait
318                    # for something to happen that releases one of the
319                    # cache slots.  Idle for 200 ms or woken up by
320                    # another thread
321                    self._cache_updating.wait(timeout=0.2)
322                    self._resize_cache(self.cache_max, self._max_slots-1)
323
324                if self._disk_cache:
325                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
326                else:
327                    n = KeepBlockCache.CacheSlot(locator)
328                self._cache[n.locator] = n
329                return n, True

Reserve a cache slot for the specified locator, or return the existing slot.

def set(self, slot, blob):
331    def set(self, slot, blob):
332        try:
333            if slot.set(blob):
334                self.cache_total += slot.size()
335            return
336        except OSError as e:
337            if e.errno == errno.ENOMEM:
338                # Reduce max slots to current - 4, cap cache and retry
339                with self._cache_lock:
340                    self._max_slots = max(4, len(self._cache) - 4)
341            elif e.errno == errno.ENOSPC:
342                # Reduce disk max space to current - 256 MiB, cap cache and retry
343                with self._cache_lock:
344                    sm = sum(st.size() for st in self._cache.values())
345                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
346            elif e.errno == errno.ENODEV:
347                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
348        except Exception as e:
349            pass
350        finally:
351            # Check if we should evict things from the cache.  Either
352            # because we added a new thing or there was an error and
353            # we possibly adjusted the limits down, so we might need
354            # to push something out.
355            self.cap_cache()
356
357        try:
358            # Only gets here if there was an error the first time. The
359            # exception handler adjusts limits downward in some cases
360            # to free up resources, which would make the operation
361            # succeed.
362            if slot.set(blob):
363                self.cache_total += slot.size()
364        except Exception as e:
365            # It failed again.  Give up.
366            slot.set(None)
367            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
368
369        self.cap_cache()
class KeepBlockCache.CacheSlot:
230    class CacheSlot(object):
231        __slots__ = ("locator", "ready", "content")
232
233        def __init__(self, locator):
234            self.locator = locator
235            self.ready = threading.Event()
236            self.content = None
237
238        def get(self):
239            self.ready.wait()
240            return self.content
241
242        def set(self, value):
243            if self.content is not None:
244                return False
245            self.content = value
246            self.ready.set()
247            return True
248
249        def size(self):
250            if self.content is None:
251                return 0
252            else:
253                return len(self.content)
254
255        def evict(self):
256            self.content = None
KeepBlockCache.CacheSlot(locator)
233        def __init__(self, locator):
234            self.locator = locator
235            self.ready = threading.Event()
236            self.content = None
locator
ready
content
def get(self):
238        def get(self):
239            self.ready.wait()
240            return self.content
def set(self, value):
242        def set(self, value):
243            if self.content is not None:
244                return False
245            self.content = value
246            self.ready.set()
247            return True
def size(self):
249        def size(self):
250            if self.content is None:
251                return 0
252            else:
253                return len(self.content)
def evict(self):
255        def evict(self):
256            self.content = None
class Counter:
371class Counter(object):
372    def __init__(self, v=0):
373        self._lk = threading.Lock()
374        self._val = v
375
376    def add(self, v):
377        with self._lk:
378            self._val += v
379
380    def get(self):
381        with self._lk:
382            return self._val
Counter(v=0)
372    def __init__(self, v=0):
373        self._lk = threading.Lock()
374        self._val = v
def add(self, v):
376    def add(self, v):
377        with self._lk:
378            self._val += v
def get(self):
380    def get(self):
381        with self._lk:
382            return self._val
class KeepClient:
 385class KeepClient(object):
 386    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 387    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 388
 389    class KeepService(PyCurlHelper):
 390        """Make requests to a single Keep service, and track results.
 391
 392        A KeepService is intended to last long enough to perform one
 393        transaction (GET or PUT) against one Keep service. This can
 394        involve calling either get() or put() multiple times in order
 395        to retry after transient failures. However, calling both get()
 396        and put() on a single instance -- or using the same instance
 397        to access two different Keep services -- will not produce
 398        sensible behavior.
 399        """
 400
 401        HTTP_ERRORS = (
 402            socket.error,
 403            ssl.SSLError,
 404            arvados.errors.HttpError,
 405        )
 406
 407        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 408                     upload_counter=None,
 409                     download_counter=None,
 410                     headers={},
 411                     insecure=False):
 412            super(KeepClient.KeepService, self).__init__()
 413            self.root = root
 414            self._user_agent_pool = user_agent_pool
 415            self._result = {'error': None}
 416            self._usable = True
 417            self._session = None
 418            self._socket = None
 419            self.get_headers = {'Accept': 'application/octet-stream'}
 420            self.get_headers.update(headers)
 421            self.put_headers = headers
 422            self.upload_counter = upload_counter
 423            self.download_counter = download_counter
 424            self.insecure = insecure
 425
 426        def usable(self):
 427            """Is it worth attempting a request?"""
 428            return self._usable
 429
 430        def finished(self):
 431            """Did the request succeed or encounter permanent failure?"""
 432            return self._result['error'] == False or not self._usable
 433
 434        def last_result(self):
 435            return self._result
 436
 437        def _get_user_agent(self):
 438            try:
 439                return self._user_agent_pool.get(block=False)
 440            except queue.Empty:
 441                return pycurl.Curl()
 442
 443        def _put_user_agent(self, ua):
 444            try:
 445                ua.reset()
 446                self._user_agent_pool.put(ua, block=False)
 447            except:
 448                ua.close()
 449
 450        def get(self, locator, method="GET", timeout=None):
 451            # locator is a KeepLocator object.
 452            url = self.root + str(locator)
 453            _logger.debug("Request: %s %s", method, url)
 454            curl = self._get_user_agent()
 455            ok = None
 456            try:
 457                with timer.Timer() as t:
 458                    self._headers = {}
 459                    response_body = BytesIO()
 460                    curl.setopt(pycurl.NOSIGNAL, 1)
 461                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 462                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 463                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 464                    curl.setopt(pycurl.HTTPHEADER, [
 465                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 466                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 467                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 468                    if self.insecure:
 469                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 470                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 471                    else:
 472                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 473                    if method == "HEAD":
 474                        curl.setopt(pycurl.NOBODY, True)
 475                    else:
 476                        curl.setopt(pycurl.HTTPGET, True)
 477                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 478
 479                    try:
 480                        curl.perform()
 481                    except Exception as e:
 482                        raise arvados.errors.HttpError(0, str(e))
 483                    finally:
 484                        if self._socket:
 485                            self._socket.close()
 486                            self._socket = None
 487                    self._result = {
 488                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 489                        'body': response_body.getvalue(),
 490                        'headers': self._headers,
 491                        'error': False,
 492                    }
 493
 494                ok = retry.check_http_response_success(self._result['status_code'])
 495                if not ok:
 496                    self._result['error'] = arvados.errors.HttpError(
 497                        self._result['status_code'],
 498                        self._headers.get('x-status-line', 'Error'))
 499            except self.HTTP_ERRORS as e:
 500                self._result = {
 501                    'error': e,
 502                }
 503            self._usable = ok != False
 504            if self._result.get('status_code', None):
 505                # The client worked well enough to get an HTTP status
 506                # code, so presumably any problems are just on the
 507                # server side and it's OK to reuse the client.
 508                self._put_user_agent(curl)
 509            else:
 510                # Don't return this client to the pool, in case it's
 511                # broken.
 512                curl.close()
 513            if not ok:
 514                _logger.debug("Request fail: GET %s => %s: %s",
 515                              url, type(self._result['error']), str(self._result['error']))
 516                return None
 517            if method == "HEAD":
 518                _logger.info("HEAD %s: %s bytes",
 519                         self._result['status_code'],
 520                         self._result.get('content-length'))
 521                if self._result['headers'].get('x-keep-locator'):
 522                    # This is a response to a remote block copy request, return
 523                    # the local copy block locator.
 524                    return self._result['headers'].get('x-keep-locator')
 525                return True
 526
 527            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 528                         self._result['status_code'],
 529                         len(self._result['body']),
 530                         t.msecs,
 531                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 532
 533            if self.download_counter:
 534                self.download_counter.add(len(self._result['body']))
 535            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 536            if resp_md5 != locator.md5sum:
 537                _logger.warning("Checksum fail: md5(%s) = %s",
 538                                url, resp_md5)
 539                self._result['error'] = arvados.errors.HttpError(
 540                    0, 'Checksum fail')
 541                return None
 542            return self._result['body']
 543
 544        def put(self, hash_s, body, timeout=None, headers={}):
 545            put_headers = copy.copy(self.put_headers)
 546            put_headers.update(headers)
 547            url = self.root + hash_s
 548            _logger.debug("Request: PUT %s", url)
 549            curl = self._get_user_agent()
 550            ok = None
 551            try:
 552                with timer.Timer() as t:
 553                    self._headers = {}
 554                    body_reader = BytesIO(body)
 555                    response_body = BytesIO()
 556                    curl.setopt(pycurl.NOSIGNAL, 1)
 557                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 558                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 559                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 560                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 561                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 562                    # response) instead of sending the request body immediately.
 563                    # This allows the server to reject the request if the request
 564                    # is invalid or the server is read-only, without waiting for
 565                    # the client to send the entire block.
 566                    curl.setopt(pycurl.UPLOAD, True)
 567                    curl.setopt(pycurl.INFILESIZE, len(body))
 568                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 569                    curl.setopt(pycurl.HTTPHEADER, [
 570                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 571                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 572                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 573                    if self.insecure:
 574                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 575                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 576                    else:
 577                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 578                    self._setcurltimeouts(curl, timeout)
 579                    try:
 580                        curl.perform()
 581                    except Exception as e:
 582                        raise arvados.errors.HttpError(0, str(e))
 583                    finally:
 584                        if self._socket:
 585                            self._socket.close()
 586                            self._socket = None
 587                    self._result = {
 588                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 589                        'body': response_body.getvalue().decode('utf-8'),
 590                        'headers': self._headers,
 591                        'error': False,
 592                    }
 593                ok = retry.check_http_response_success(self._result['status_code'])
 594                if not ok:
 595                    self._result['error'] = arvados.errors.HttpError(
 596                        self._result['status_code'],
 597                        self._headers.get('x-status-line', 'Error'))
 598            except self.HTTP_ERRORS as e:
 599                self._result = {
 600                    'error': e,
 601                }
 602            self._usable = ok != False # still usable if ok is True or None
 603            if self._result.get('status_code', None):
 604                # Client is functional. See comment in get().
 605                self._put_user_agent(curl)
 606            else:
 607                curl.close()
 608            if not ok:
 609                _logger.debug("Request fail: PUT %s => %s: %s",
 610                              url, type(self._result['error']), str(self._result['error']))
 611                return False
 612            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 613                         self._result['status_code'],
 614                         len(body),
 615                         t.msecs,
 616                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 617            if self.upload_counter:
 618                self.upload_counter.add(len(body))
 619            return True
 620
 621
 622    class KeepWriterQueue(queue.Queue):
 623        def __init__(self, copies, classes=[]):
 624            queue.Queue.__init__(self) # Old-style superclass
 625            self.wanted_copies = copies
 626            self.wanted_storage_classes = classes
 627            self.successful_copies = 0
 628            self.confirmed_storage_classes = {}
 629            self.response = None
 630            self.storage_classes_tracking = True
 631            self.queue_data_lock = threading.RLock()
 632            self.pending_tries = max(copies, len(classes))
 633            self.pending_tries_notification = threading.Condition()
 634
 635        def write_success(self, response, replicas_nr, classes_confirmed):
 636            with self.queue_data_lock:
 637                self.successful_copies += replicas_nr
 638                if classes_confirmed is None:
 639                    self.storage_classes_tracking = False
 640                elif self.storage_classes_tracking:
 641                    for st_class, st_copies in classes_confirmed.items():
 642                        try:
 643                            self.confirmed_storage_classes[st_class] += st_copies
 644                        except KeyError:
 645                            self.confirmed_storage_classes[st_class] = st_copies
 646                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 647                self.response = response
 648            with self.pending_tries_notification:
 649                self.pending_tries_notification.notify_all()
 650
 651        def write_fail(self, ks):
 652            with self.pending_tries_notification:
 653                self.pending_tries += 1
 654                self.pending_tries_notification.notify()
 655
 656        def pending_copies(self):
 657            with self.queue_data_lock:
 658                return self.wanted_copies - self.successful_copies
 659
 660        def satisfied_classes(self):
 661            with self.queue_data_lock:
 662                if not self.storage_classes_tracking:
 663                    # Notifies disabled storage classes expectation to
 664                    # the outer loop.
 665                    return None
 666            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 667
 668        def pending_classes(self):
 669            with self.queue_data_lock:
 670                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 671                    return []
 672                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 673                for st_class, st_copies in self.confirmed_storage_classes.items():
 674                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 675                        unsatisfied_classes.remove(st_class)
 676                return unsatisfied_classes
 677
 678        def get_next_task(self):
 679            with self.pending_tries_notification:
 680                while True:
 681                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 682                        # This notify_all() is unnecessary --
 683                        # write_success() already called notify_all()
 684                        # when pending<1 became true, so it's not
 685                        # possible for any other thread to be in
 686                        # wait() now -- but it's cheap insurance
 687                        # against deadlock so we do it anyway:
 688                        self.pending_tries_notification.notify_all()
 689                        # Drain the queue and then raise Queue.Empty
 690                        while True:
 691                            self.get_nowait()
 692                            self.task_done()
 693                    elif self.pending_tries > 0:
 694                        service, service_root = self.get_nowait()
 695                        if service.finished():
 696                            self.task_done()
 697                            continue
 698                        self.pending_tries -= 1
 699                        return service, service_root
 700                    elif self.empty():
 701                        self.pending_tries_notification.notify_all()
 702                        raise queue.Empty
 703                    else:
 704                        self.pending_tries_notification.wait()
 705
 706
 707    class KeepWriterThreadPool(object):
 708        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 709            self.total_task_nr = 0
 710            if (not max_service_replicas) or (max_service_replicas >= copies):
 711                num_threads = 1
 712            else:
 713                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 714            _logger.debug("Pool max threads is %d", num_threads)
 715            self.workers = []
 716            self.queue = KeepClient.KeepWriterQueue(copies, classes)
 717            # Create workers
 718            for _ in range(num_threads):
 719                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
 720                self.workers.append(w)
 721
 722        def add_task(self, ks, service_root):
 723            self.queue.put((ks, service_root))
 724            self.total_task_nr += 1
 725
 726        def done(self):
 727            return self.queue.successful_copies, self.queue.satisfied_classes()
 728
 729        def join(self):
 730            # Start workers
 731            for worker in self.workers:
 732                worker.start()
 733            # Wait for finished work
 734            self.queue.join()
 735
 736        def response(self):
 737            return self.queue.response
 738
 739
 740    class KeepWriterThread(threading.Thread):
 741        class TaskFailed(RuntimeError): pass
 742
 743        def __init__(self, queue, data, data_hash, timeout=None):
 744            super(KeepClient.KeepWriterThread, self).__init__()
 745            self.timeout = timeout
 746            self.queue = queue
 747            self.data = data
 748            self.data_hash = data_hash
 749            self.daemon = True
 750
 751        def run(self):
 752            while True:
 753                try:
 754                    service, service_root = self.queue.get_next_task()
 755                except queue.Empty:
 756                    return
 757                try:
 758                    locator, copies, classes = self.do_task(service, service_root)
 759                except Exception as e:
 760                    if not isinstance(e, self.TaskFailed):
 761                        _logger.exception("Exception in KeepWriterThread")
 762                    self.queue.write_fail(service)
 763                else:
 764                    self.queue.write_success(locator, copies, classes)
 765                finally:
 766                    self.queue.task_done()
 767
 768        def do_task(self, service, service_root):
 769            classes = self.queue.pending_classes()
 770            headers = {}
 771            if len(classes) > 0:
 772                classes.sort()
 773                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 774            success = bool(service.put(self.data_hash,
 775                                        self.data,
 776                                        timeout=self.timeout,
 777                                        headers=headers))
 778            result = service.last_result()
 779
 780            if not success:
 781                if result.get('status_code'):
 782                    _logger.debug("Request fail: PUT %s => %s %s",
 783                                  self.data_hash,
 784                                  result.get('status_code'),
 785                                  result.get('body'))
 786                raise self.TaskFailed()
 787
 788            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
 789                          str(threading.current_thread()),
 790                          self.data_hash,
 791                          len(self.data),
 792                          service_root)
 793            try:
 794                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 795            except (KeyError, ValueError):
 796                replicas_stored = 1
 797
 798            classes_confirmed = {}
 799            try:
 800                scch = result['headers']['x-keep-storage-classes-confirmed']
 801                for confirmation in scch.replace(' ', '').split(','):
 802                    if '=' in confirmation:
 803                        stored_class, stored_copies = confirmation.split('=')[:2]
 804                        classes_confirmed[stored_class] = int(stored_copies)
 805            except (KeyError, ValueError):
 806                # Storage classes confirmed header missing or corrupt
 807                classes_confirmed = None
 808
 809            return result['body'].strip(), replicas_stored, classes_confirmed
 810
 811
 812    def __init__(self, api_client=None, proxy=None,
 813                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 814                 api_token=None, local_store=None, block_cache=None,
 815                 num_retries=10, session=None, num_prefetch_threads=None):
 816        """Initialize a new KeepClient.
 817
 818        Arguments:
 819        :api_client:
 820          The API client to use to find Keep services.  If not
 821          provided, KeepClient will build one from available Arvados
 822          configuration.
 823
 824        :proxy:
 825          If specified, this KeepClient will send requests to this Keep
 826          proxy.  Otherwise, KeepClient will fall back to the setting of the
 827          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 828          If you want to KeepClient does not use a proxy, pass in an empty
 829          string.
 830
 831        :timeout:
 832          The initial timeout (in seconds) for HTTP requests to Keep
 833          non-proxy servers.  A tuple of three floats is interpreted as
 834          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 835          will be aborted if the average traffic rate falls below
 836          minimum_bandwidth bytes per second over an interval of read_timeout
 837          seconds. Because timeouts are often a result of transient server
 838          load, the actual connection timeout will be increased by a factor
 839          of two on each retry.
 840          Default: (2, 256, 32768).
 841
 842        :proxy_timeout:
 843          The initial timeout (in seconds) for HTTP requests to
 844          Keep proxies. A tuple of three floats is interpreted as
 845          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 846          described above for adjusting connection timeouts on retry also
 847          applies.
 848          Default: (20, 256, 32768).
 849
 850        :api_token:
 851          If you're not using an API client, but only talking
 852          directly to a Keep proxy, this parameter specifies an API token
 853          to authenticate Keep requests.  It is an error to specify both
 854          api_client and api_token.  If you specify neither, KeepClient
 855          will use one available from the Arvados configuration.
 856
 857        :local_store:
 858          If specified, this KeepClient will bypass Keep
 859          services, and save data to the named directory.  If unspecified,
 860          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 861          environment variable.  If you want to ensure KeepClient does not
 862          use local storage, pass in an empty string.  This is primarily
 863          intended to mock a server for testing.
 864
 865        :num_retries:
 866          The default number of times to retry failed requests.
 867          This will be used as the default num_retries value when get() and
 868          put() are called.  Default 10.
 869        """
 870        self.lock = threading.Lock()
 871        if proxy is None:
 872            if config.get('ARVADOS_KEEP_SERVICES'):
 873                proxy = config.get('ARVADOS_KEEP_SERVICES')
 874            else:
 875                proxy = config.get('ARVADOS_KEEP_PROXY')
 876        if api_token is None:
 877            if api_client is None:
 878                api_token = config.get('ARVADOS_API_TOKEN')
 879            else:
 880                api_token = api_client.api_token
 881        elif api_client is not None:
 882            raise ValueError(
 883                "can't build KeepClient with both API client and token")
 884        if local_store is None:
 885            local_store = os.environ.get('KEEP_LOCAL_STORE')
 886
 887        if api_client is None:
 888            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 889        else:
 890            self.insecure = api_client.insecure
 891
 892        self.block_cache = block_cache if block_cache else KeepBlockCache()
 893        self.timeout = timeout
 894        self.proxy_timeout = proxy_timeout
 895        self._user_agent_pool = queue.LifoQueue()
 896        self.upload_counter = Counter()
 897        self.download_counter = Counter()
 898        self.put_counter = Counter()
 899        self.get_counter = Counter()
 900        self.hits_counter = Counter()
 901        self.misses_counter = Counter()
 902        self._storage_classes_unsupported_warning = False
 903        self._default_classes = []
 904        if num_prefetch_threads is not None:
 905            self.num_prefetch_threads = num_prefetch_threads
 906        else:
 907            self.num_prefetch_threads = 2
 908        self._prefetch_queue = None
 909        self._prefetch_threads = None
 910
 911        if local_store:
 912            self.local_store = local_store
 913            self.head = self.local_store_head
 914            self.get = self.local_store_get
 915            self.put = self.local_store_put
 916        else:
 917            self.num_retries = num_retries
 918            self.max_replicas_per_service = None
 919            if proxy:
 920                proxy_uris = proxy.split()
 921                for i in range(len(proxy_uris)):
 922                    if not proxy_uris[i].endswith('/'):
 923                        proxy_uris[i] += '/'
 924                    # URL validation
 925                    url = urllib.parse.urlparse(proxy_uris[i])
 926                    if not (url.scheme and url.netloc):
 927                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 928                self.api_token = api_token
 929                self._gateway_services = {}
 930                self._keep_services = [{
 931                    'uuid': "00000-bi6l4-%015d" % idx,
 932                    'service_type': 'proxy',
 933                    '_service_root': uri,
 934                    } for idx, uri in enumerate(proxy_uris)]
 935                self._writable_services = self._keep_services
 936                self.using_proxy = True
 937                self._static_services_list = True
 938            else:
 939                # It's important to avoid instantiating an API client
 940                # unless we actually need one, for testing's sake.
 941                if api_client is None:
 942                    api_client = arvados.api('v1')
 943                self.api_client = api_client
 944                self.api_token = api_client.api_token
 945                self._gateway_services = {}
 946                self._keep_services = None
 947                self._writable_services = None
 948                self.using_proxy = None
 949                self._static_services_list = False
 950                try:
 951                    self._default_classes = [
 952                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 953                except KeyError:
 954                    # We're talking to an old cluster
 955                    pass
 956
 957    def current_timeout(self, attempt_number):
 958        """Return the appropriate timeout to use for this client.
 959
 960        The proxy timeout setting if the backend service is currently a proxy,
 961        the regular timeout setting otherwise.  The `attempt_number` indicates
 962        how many times the operation has been tried already (starting from 0
 963        for the first try), and scales the connection timeout portion of the
 964        return value accordingly.
 965
 966        """
 967        # TODO(twp): the timeout should be a property of a
 968        # KeepService, not a KeepClient. See #4488.
 969        t = self.proxy_timeout if self.using_proxy else self.timeout
 970        if len(t) == 2:
 971            return (t[0] * (1 << attempt_number), t[1])
 972        else:
 973            return (t[0] * (1 << attempt_number), t[1], t[2])
 974    def _any_nondisk_services(self, service_list):
 975        return any(ks.get('service_type', 'disk') != 'disk'
 976                   for ks in service_list)
 977
 978    def build_services_list(self, force_rebuild=False):
 979        if (self._static_services_list or
 980              (self._keep_services and not force_rebuild)):
 981            return
 982        with self.lock:
 983            try:
 984                keep_services = self.api_client.keep_services().accessible()
 985            except Exception:  # API server predates Keep services.
 986                keep_services = self.api_client.keep_disks().list()
 987
 988            # Gateway services are only used when specified by UUID,
 989            # so there's nothing to gain by filtering them by
 990            # service_type.
 991            self._gateway_services = {ks['uuid']: ks for ks in
 992                                      keep_services.execute()['items']}
 993            if not self._gateway_services:
 994                raise arvados.errors.NoKeepServersError()
 995
 996            # Precompute the base URI for each service.
 997            for r in self._gateway_services.values():
 998                host = r['service_host']
 999                if not host.startswith('[') and host.find(':') >= 0:
1000                    # IPv6 URIs must be formatted like http://[::1]:80/...
1001                    host = '[' + host + ']'
1002                r['_service_root'] = "{}://{}:{:d}/".format(
1003                    'https' if r['service_ssl_flag'] else 'http',
1004                    host,
1005                    r['service_port'])
1006
1007            _logger.debug(str(self._gateway_services))
1008            self._keep_services = [
1009                ks for ks in self._gateway_services.values()
1010                if not ks.get('service_type', '').startswith('gateway:')]
1011            self._writable_services = [ks for ks in self._keep_services
1012                                       if not ks.get('read_only')]
1013
1014            # For disk type services, max_replicas_per_service is 1
1015            # It is unknown (unlimited) for other service types.
1016            if self._any_nondisk_services(self._writable_services):
1017                self.max_replicas_per_service = None
1018            else:
1019                self.max_replicas_per_service = 1
1020
1021    def _service_weight(self, data_hash, service_uuid):
1022        """Compute the weight of a Keep service endpoint for a data
1023        block with a known hash.
1024
1025        The weight is md5(h + u) where u is the last 15 characters of
1026        the service endpoint's UUID.
1027        """
1028        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1029
1030    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1031        """Return an array of Keep service endpoints, in the order in
1032        which they should be probed when reading or writing data with
1033        the given hash+hints.
1034        """
1035        self.build_services_list(force_rebuild)
1036
1037        sorted_roots = []
1038        # Use the services indicated by the given +K@... remote
1039        # service hints, if any are present and can be resolved to a
1040        # URI.
1041        for hint in locator.hints:
1042            if hint.startswith('K@'):
1043                if len(hint) == 7:
1044                    sorted_roots.append(
1045                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1046                elif len(hint) == 29:
1047                    svc = self._gateway_services.get(hint[2:])
1048                    if svc:
1049                        sorted_roots.append(svc['_service_root'])
1050
1051        # Sort the available local services by weight (heaviest first)
1052        # for this locator, and return their service_roots (base URIs)
1053        # in that order.
1054        use_services = self._keep_services
1055        if need_writable:
1056            use_services = self._writable_services
1057        self.using_proxy = self._any_nondisk_services(use_services)
1058        sorted_roots.extend([
1059            svc['_service_root'] for svc in sorted(
1060                use_services,
1061                reverse=True,
1062                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1063        _logger.debug("{}: {}".format(locator, sorted_roots))
1064        return sorted_roots
1065
1066    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1067        # roots_map is a dictionary, mapping Keep service root strings
1068        # to KeepService objects.  Poll for Keep services, and add any
1069        # new ones to roots_map.  Return the current list of local
1070        # root strings.
1071        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1072        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1073        for root in local_roots:
1074            if root not in roots_map:
1075                roots_map[root] = self.KeepService(
1076                    root, self._user_agent_pool,
1077                    upload_counter=self.upload_counter,
1078                    download_counter=self.download_counter,
1079                    headers=headers,
1080                    insecure=self.insecure)
1081        return local_roots
1082
1083    @staticmethod
1084    def _check_loop_result(result):
1085        # KeepClient RetryLoops should save results as a 2-tuple: the
1086        # actual result of the request, and the number of servers available
1087        # to receive the request this round.
1088        # This method returns True if there's a real result, False if
1089        # there are no more servers available, otherwise None.
1090        if isinstance(result, Exception):
1091            return None
1092        result, tried_server_count = result
1093        if (result is not None) and (result is not False):
1094            return True
1095        elif tried_server_count < 1:
1096            _logger.info("No more Keep services to try; giving up")
1097            return False
1098        else:
1099            return None
1100
1101    def get_from_cache(self, loc_s):
1102        """Fetch a block only if is in the cache, otherwise return None."""
1103        locator = KeepLocator(loc_s)
1104        slot = self.block_cache.get(locator.md5sum)
1105        if slot is not None and slot.ready.is_set():
1106            return slot.get()
1107        else:
1108            return None
1109
1110    def refresh_signature(self, loc):
1111        """Ask Keep to get the remote block and return its local signature"""
1112        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1113        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1114
1115    @retry.retry_method
1116    def head(self, loc_s, **kwargs):
1117        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1118
1119    @retry.retry_method
1120    def get(self, loc_s, **kwargs):
1121        return self._get_or_head(loc_s, method="GET", **kwargs)
1122
1123    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1124        """Get data from Keep.
1125
1126        This method fetches one or more blocks of data from Keep.  It
1127        sends a request each Keep service registered with the API
1128        server (or the proxy provided when this client was
1129        instantiated), then each service named in location hints, in
1130        sequence.  As soon as one service provides the data, it's
1131        returned.
1132
1133        Arguments:
1134        * loc_s: A string of one or more comma-separated locators to fetch.
1135          This method returns the concatenation of these blocks.
1136        * num_retries: The number of times to retry GET requests to
1137          *each* Keep server if it returns temporary failures, with
1138          exponential backoff.  Note that, in each loop, the method may try
1139          to fetch data from every available Keep service, along with any
1140          that are named in location hints in the locator.  The default value
1141          is set when the KeepClient is initialized.
1142        """
1143        if ',' in loc_s:
1144            return ''.join(self.get(x) for x in loc_s.split(','))
1145
1146        self.get_counter.add(1)
1147
1148        request_id = (request_id or
1149                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1150                      arvados.util.new_request_id())
1151        if headers is None:
1152            headers = {}
1153        headers['X-Request-Id'] = request_id
1154
1155        slot = None
1156        blob = None
1157        try:
1158            locator = KeepLocator(loc_s)
1159            if method == "GET":
1160                while slot is None:
1161                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1162                    if first:
1163                        # Fresh and empty "first time it is used" slot
1164                        break
1165                    if prefetch:
1166                        # this is request for a prefetch to fill in
1167                        # the cache, don't need to wait for the
1168                        # result, so if it is already in flight return
1169                        # immediately.  Clear 'slot' to prevent
1170                        # finally block from calling slot.set()
1171                        if slot.ready.is_set():
1172                            slot.get()
1173                        slot = None
1174                        return None
1175
1176                    blob = slot.get()
1177                    if blob is not None:
1178                        self.hits_counter.add(1)
1179                        return blob
1180
1181                    # If blob is None, this means either
1182                    #
1183                    # (a) another thread was fetching this block and
1184                    # failed with an error or
1185                    #
1186                    # (b) cache thrashing caused the slot to be
1187                    # evicted (content set to None) by another thread
1188                    # between the call to reserve_cache() and get().
1189                    #
1190                    # We'll handle these cases by reserving a new slot
1191                    # and then doing a full GET request.
1192                    slot = None
1193
1194            self.misses_counter.add(1)
1195
1196            # If the locator has hints specifying a prefix (indicating a
1197            # remote keepproxy) or the UUID of a local gateway service,
1198            # read data from the indicated service(s) instead of the usual
1199            # list of local disk services.
1200            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1201                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1202            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1203                               for hint in locator.hints if (
1204                                       hint.startswith('K@') and
1205                                       len(hint) == 29 and
1206                                       self._gateway_services.get(hint[2:])
1207                                       )])
1208            # Map root URLs to their KeepService objects.
1209            roots_map = {
1210                root: self.KeepService(root, self._user_agent_pool,
1211                                       upload_counter=self.upload_counter,
1212                                       download_counter=self.download_counter,
1213                                       headers=headers,
1214                                       insecure=self.insecure)
1215                for root in hint_roots
1216            }
1217
1218            # See #3147 for a discussion of the loop implementation.  Highlights:
1219            # * Refresh the list of Keep services after each failure, in case
1220            #   it's being updated.
1221            # * Retry until we succeed, we're out of retries, or every available
1222            #   service has returned permanent failure.
1223            sorted_roots = []
1224            roots_map = {}
1225            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1226                                   backoff_start=2)
1227            for tries_left in loop:
1228                try:
1229                    sorted_roots = self.map_new_services(
1230                        roots_map, locator,
1231                        force_rebuild=(tries_left < num_retries),
1232                        need_writable=False,
1233                        headers=headers)
1234                except Exception as error:
1235                    loop.save_result(error)
1236                    continue
1237
1238                # Query KeepService objects that haven't returned
1239                # permanent failure, in our specified shuffle order.
1240                services_to_try = [roots_map[root]
1241                                   for root in sorted_roots
1242                                   if roots_map[root].usable()]
1243                for keep_service in services_to_try:
1244                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1245                    if blob is not None:
1246                        break
1247                loop.save_result((blob, len(services_to_try)))
1248
1249            # Always cache the result, then return it if we succeeded.
1250            if loop.success():
1251                return blob
1252        finally:
1253            if slot is not None:
1254                self.block_cache.set(slot, blob)
1255
1256        # Q: Including 403 is necessary for the Keep tests to continue
1257        # passing, but maybe they should expect KeepReadError instead?
1258        not_founds = sum(1 for key in sorted_roots
1259                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1260        service_errors = ((key, roots_map[key].last_result()['error'])
1261                          for key in sorted_roots)
1262        if not roots_map:
1263            raise arvados.errors.KeepReadError(
1264                "[{}] failed to read {}: no Keep services available ({})".format(
1265                    request_id, loc_s, loop.last_result()))
1266        elif not_founds == len(sorted_roots):
1267            raise arvados.errors.NotFoundError(
1268                "[{}] {} not found".format(request_id, loc_s), service_errors)
1269        else:
1270            raise arvados.errors.KeepReadError(
1271                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1272
1273    @retry.retry_method
1274    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1275        """Save data in Keep.
1276
1277        This method will get a list of Keep services from the API server, and
1278        send the data to each one simultaneously in a new thread.  Once the
1279        uploads are finished, if enough copies are saved, this method returns
1280        the most recent HTTP response body.  If requests fail to upload
1281        enough copies, this method raises KeepWriteError.
1282
1283        Arguments:
1284        * data: The string of data to upload.
1285        * copies: The number of copies that the user requires be saved.
1286          Default 2.
1287        * num_retries: The number of times to retry PUT requests to
1288          *each* Keep server if it returns temporary failures, with
1289          exponential backoff.  The default value is set when the
1290          KeepClient is initialized.
1291        * classes: An optional list of storage class names where copies should
1292          be written.
1293        """
1294
1295        classes = classes or self._default_classes
1296
1297        if not isinstance(data, bytes):
1298            data = data.encode()
1299
1300        self.put_counter.add(1)
1301
1302        data_hash = hashlib.md5(data).hexdigest()
1303        loc_s = data_hash + '+' + str(len(data))
1304        if copies < 1:
1305            return loc_s
1306        locator = KeepLocator(loc_s)
1307
1308        request_id = (request_id or
1309                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1310                      arvados.util.new_request_id())
1311        headers = {
1312            'X-Request-Id': request_id,
1313            'X-Keep-Desired-Replicas': str(copies),
1314        }
1315        roots_map = {}
1316        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1317                               backoff_start=2)
1318        done_copies = 0
1319        done_classes = []
1320        for tries_left in loop:
1321            try:
1322                sorted_roots = self.map_new_services(
1323                    roots_map, locator,
1324                    force_rebuild=(tries_left < num_retries),
1325                    need_writable=True,
1326                    headers=headers)
1327            except Exception as error:
1328                loop.save_result(error)
1329                continue
1330
1331            pending_classes = []
1332            if done_classes is not None:
1333                pending_classes = list(set(classes) - set(done_classes))
1334            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1335                                                        data_hash=data_hash,
1336                                                        copies=copies - done_copies,
1337                                                        max_service_replicas=self.max_replicas_per_service,
1338                                                        timeout=self.current_timeout(num_retries - tries_left),
1339                                                        classes=pending_classes)
1340            for service_root, ks in [(root, roots_map[root])
1341                                     for root in sorted_roots]:
1342                if ks.finished():
1343                    continue
1344                writer_pool.add_task(ks, service_root)
1345            writer_pool.join()
1346            pool_copies, pool_classes = writer_pool.done()
1347            done_copies += pool_copies
1348            if (done_classes is not None) and (pool_classes is not None):
1349                done_classes += pool_classes
1350                loop.save_result(
1351                    (done_copies >= copies and set(done_classes) == set(classes),
1352                    writer_pool.total_task_nr))
1353            else:
1354                # Old keepstore contacted without storage classes support:
1355                # success is determined only by successful copies.
1356                #
1357                # Disable storage classes tracking from this point forward.
1358                if not self._storage_classes_unsupported_warning:
1359                    self._storage_classes_unsupported_warning = True
1360                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1361                done_classes = None
1362                loop.save_result(
1363                    (done_copies >= copies, writer_pool.total_task_nr))
1364
1365        if loop.success():
1366            return writer_pool.response()
1367        if not roots_map:
1368            raise arvados.errors.KeepWriteError(
1369                "[{}] failed to write {}: no Keep services available ({})".format(
1370                    request_id, data_hash, loop.last_result()))
1371        else:
1372            service_errors = ((key, roots_map[key].last_result()['error'])
1373                              for key in sorted_roots
1374                              if roots_map[key].last_result()['error'])
1375            raise arvados.errors.KeepWriteError(
1376                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1377                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1378
1379    def _block_prefetch_worker(self):
1380        """The background downloader thread."""
1381        while True:
1382            try:
1383                b = self._prefetch_queue.get()
1384                if b is None:
1385                    return
1386                self.get(b, prefetch=True)
1387            except Exception:
1388                _logger.exception("Exception doing block prefetch")
1389
1390    def _start_prefetch_threads(self):
1391        if self._prefetch_threads is None:
1392            with self.lock:
1393                if self._prefetch_threads is not None:
1394                    return
1395                self._prefetch_queue = queue.Queue()
1396                self._prefetch_threads = []
1397                for i in range(0, self.num_prefetch_threads):
1398                    thread = threading.Thread(target=self._block_prefetch_worker)
1399                    self._prefetch_threads.append(thread)
1400                    thread.daemon = True
1401                    thread.start()
1402
1403    def block_prefetch(self, locator):
1404        """
1405        This relies on the fact that KeepClient implements a block cache,
1406        so repeated requests for the same block will not result in repeated
1407        downloads (unless the block is evicted from the cache.)  This method
1408        does not block.
1409        """
1410
1411        if self.block_cache.get(locator) is not None:
1412            return
1413
1414        self._start_prefetch_threads()
1415        self._prefetch_queue.put(locator)
1416
1417    def stop_prefetch_threads(self):
1418        with self.lock:
1419            if self._prefetch_threads is not None:
1420                for t in self._prefetch_threads:
1421                    self._prefetch_queue.put(None)
1422                for t in self._prefetch_threads:
1423                    t.join()
1424            self._prefetch_threads = None
1425            self._prefetch_queue = None
1426
1427    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1428        """A stub for put().
1429
1430        This method is used in place of the real put() method when
1431        using local storage (see constructor's local_store argument).
1432
1433        copies and num_retries arguments are ignored: they are here
1434        only for the sake of offering the same call signature as
1435        put().
1436
1437        Data stored this way can be retrieved via local_store_get().
1438        """
1439        md5 = hashlib.md5(data).hexdigest()
1440        locator = '%s+%d' % (md5, len(data))
1441        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1442            f.write(data)
1443        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1444                  os.path.join(self.local_store, md5))
1445        return locator
1446
1447    def local_store_get(self, loc_s, num_retries=None):
1448        """Companion to local_store_put()."""
1449        try:
1450            locator = KeepLocator(loc_s)
1451        except ValueError:
1452            raise arvados.errors.NotFoundError(
1453                "Invalid data locator: '%s'" % loc_s)
1454        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1455            return b''
1456        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1457            return f.read()
1458
1459    def local_store_head(self, loc_s, num_retries=None):
1460        """Companion to local_store_put()."""
1461        try:
1462            locator = KeepLocator(loc_s)
1463        except ValueError:
1464            raise arvados.errors.NotFoundError(
1465                "Invalid data locator: '%s'" % loc_s)
1466        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1467            return True
1468        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1469            return True
KeepClient( api_client=None, proxy=None, timeout=(2, 256, 32768), proxy_timeout=(20, 256, 32768), api_token=None, local_store=None, block_cache=None, num_retries=10, session=None, num_prefetch_threads=None)
812    def __init__(self, api_client=None, proxy=None,
813                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
814                 api_token=None, local_store=None, block_cache=None,
815                 num_retries=10, session=None, num_prefetch_threads=None):
816        """Initialize a new KeepClient.
817
818        Arguments:
819        :api_client:
820          The API client to use to find Keep services.  If not
821          provided, KeepClient will build one from available Arvados
822          configuration.
823
824        :proxy:
825          If specified, this KeepClient will send requests to this Keep
826          proxy.  Otherwise, KeepClient will fall back to the setting of the
827          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
828          If you want to KeepClient does not use a proxy, pass in an empty
829          string.
830
831        :timeout:
832          The initial timeout (in seconds) for HTTP requests to Keep
833          non-proxy servers.  A tuple of three floats is interpreted as
834          (connection_timeout, read_timeout, minimum_bandwidth). A connection
835          will be aborted if the average traffic rate falls below
836          minimum_bandwidth bytes per second over an interval of read_timeout
837          seconds. Because timeouts are often a result of transient server
838          load, the actual connection timeout will be increased by a factor
839          of two on each retry.
840          Default: (2, 256, 32768).
841
842        :proxy_timeout:
843          The initial timeout (in seconds) for HTTP requests to
844          Keep proxies. A tuple of three floats is interpreted as
845          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
846          described above for adjusting connection timeouts on retry also
847          applies.
848          Default: (20, 256, 32768).
849
850        :api_token:
851          If you're not using an API client, but only talking
852          directly to a Keep proxy, this parameter specifies an API token
853          to authenticate Keep requests.  It is an error to specify both
854          api_client and api_token.  If you specify neither, KeepClient
855          will use one available from the Arvados configuration.
856
857        :local_store:
858          If specified, this KeepClient will bypass Keep
859          services, and save data to the named directory.  If unspecified,
860          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
861          environment variable.  If you want to ensure KeepClient does not
862          use local storage, pass in an empty string.  This is primarily
863          intended to mock a server for testing.
864
865        :num_retries:
866          The default number of times to retry failed requests.
867          This will be used as the default num_retries value when get() and
868          put() are called.  Default 10.
869        """
870        self.lock = threading.Lock()
871        if proxy is None:
872            if config.get('ARVADOS_KEEP_SERVICES'):
873                proxy = config.get('ARVADOS_KEEP_SERVICES')
874            else:
875                proxy = config.get('ARVADOS_KEEP_PROXY')
876        if api_token is None:
877            if api_client is None:
878                api_token = config.get('ARVADOS_API_TOKEN')
879            else:
880                api_token = api_client.api_token
881        elif api_client is not None:
882            raise ValueError(
883                "can't build KeepClient with both API client and token")
884        if local_store is None:
885            local_store = os.environ.get('KEEP_LOCAL_STORE')
886
887        if api_client is None:
888            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
889        else:
890            self.insecure = api_client.insecure
891
892        self.block_cache = block_cache if block_cache else KeepBlockCache()
893        self.timeout = timeout
894        self.proxy_timeout = proxy_timeout
895        self._user_agent_pool = queue.LifoQueue()
896        self.upload_counter = Counter()
897        self.download_counter = Counter()
898        self.put_counter = Counter()
899        self.get_counter = Counter()
900        self.hits_counter = Counter()
901        self.misses_counter = Counter()
902        self._storage_classes_unsupported_warning = False
903        self._default_classes = []
904        if num_prefetch_threads is not None:
905            self.num_prefetch_threads = num_prefetch_threads
906        else:
907            self.num_prefetch_threads = 2
908        self._prefetch_queue = None
909        self._prefetch_threads = None
910
911        if local_store:
912            self.local_store = local_store
913            self.head = self.local_store_head
914            self.get = self.local_store_get
915            self.put = self.local_store_put
916        else:
917            self.num_retries = num_retries
918            self.max_replicas_per_service = None
919            if proxy:
920                proxy_uris = proxy.split()
921                for i in range(len(proxy_uris)):
922                    if not proxy_uris[i].endswith('/'):
923                        proxy_uris[i] += '/'
924                    # URL validation
925                    url = urllib.parse.urlparse(proxy_uris[i])
926                    if not (url.scheme and url.netloc):
927                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
928                self.api_token = api_token
929                self._gateway_services = {}
930                self._keep_services = [{
931                    'uuid': "00000-bi6l4-%015d" % idx,
932                    'service_type': 'proxy',
933                    '_service_root': uri,
934                    } for idx, uri in enumerate(proxy_uris)]
935                self._writable_services = self._keep_services
936                self.using_proxy = True
937                self._static_services_list = True
938            else:
939                # It's important to avoid instantiating an API client
940                # unless we actually need one, for testing's sake.
941                if api_client is None:
942                    api_client = arvados.api('v1')
943                self.api_client = api_client
944                self.api_token = api_client.api_token
945                self._gateway_services = {}
946                self._keep_services = None
947                self._writable_services = None
948                self.using_proxy = None
949                self._static_services_list = False
950                try:
951                    self._default_classes = [
952                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
953                except KeyError:
954                    # We're talking to an old cluster
955                    pass

Initialize a new KeepClient.

Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.

:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.

:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).

:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).

:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.

:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.

:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.

DEFAULT_TIMEOUT = (2, 256, 32768)
DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
lock
block_cache
timeout
proxy_timeout
upload_counter
download_counter
put_counter
get_counter
hits_counter
misses_counter
def current_timeout(self, attempt_number):
957    def current_timeout(self, attempt_number):
958        """Return the appropriate timeout to use for this client.
959
960        The proxy timeout setting if the backend service is currently a proxy,
961        the regular timeout setting otherwise.  The `attempt_number` indicates
962        how many times the operation has been tried already (starting from 0
963        for the first try), and scales the connection timeout portion of the
964        return value accordingly.
965
966        """
967        # TODO(twp): the timeout should be a property of a
968        # KeepService, not a KeepClient. See #4488.
969        t = self.proxy_timeout if self.using_proxy else self.timeout
970        if len(t) == 2:
971            return (t[0] * (1 << attempt_number), t[1])
972        else:
973            return (t[0] * (1 << attempt_number), t[1], t[2])

Return the appropriate timeout to use for this client.

The proxy timeout setting if the backend service is currently a proxy, the regular timeout setting otherwise. The attempt_number indicates how many times the operation has been tried already (starting from 0 for the first try), and scales the connection timeout portion of the return value accordingly.

def build_services_list(self, force_rebuild=False):
 978    def build_services_list(self, force_rebuild=False):
 979        if (self._static_services_list or
 980              (self._keep_services and not force_rebuild)):
 981            return
 982        with self.lock:
 983            try:
 984                keep_services = self.api_client.keep_services().accessible()
 985            except Exception:  # API server predates Keep services.
 986                keep_services = self.api_client.keep_disks().list()
 987
 988            # Gateway services are only used when specified by UUID,
 989            # so there's nothing to gain by filtering them by
 990            # service_type.
 991            self._gateway_services = {ks['uuid']: ks for ks in
 992                                      keep_services.execute()['items']}
 993            if not self._gateway_services:
 994                raise arvados.errors.NoKeepServersError()
 995
 996            # Precompute the base URI for each service.
 997            for r in self._gateway_services.values():
 998                host = r['service_host']
 999                if not host.startswith('[') and host.find(':') >= 0:
1000                    # IPv6 URIs must be formatted like http://[::1]:80/...
1001                    host = '[' + host + ']'
1002                r['_service_root'] = "{}://{}:{:d}/".format(
1003                    'https' if r['service_ssl_flag'] else 'http',
1004                    host,
1005                    r['service_port'])
1006
1007            _logger.debug(str(self._gateway_services))
1008            self._keep_services = [
1009                ks for ks in self._gateway_services.values()
1010                if not ks.get('service_type', '').startswith('gateway:')]
1011            self._writable_services = [ks for ks in self._keep_services
1012                                       if not ks.get('read_only')]
1013
1014            # For disk type services, max_replicas_per_service is 1
1015            # It is unknown (unlimited) for other service types.
1016            if self._any_nondisk_services(self._writable_services):
1017                self.max_replicas_per_service = None
1018            else:
1019                self.max_replicas_per_service = 1
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1030    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1031        """Return an array of Keep service endpoints, in the order in
1032        which they should be probed when reading or writing data with
1033        the given hash+hints.
1034        """
1035        self.build_services_list(force_rebuild)
1036
1037        sorted_roots = []
1038        # Use the services indicated by the given +K@... remote
1039        # service hints, if any are present and can be resolved to a
1040        # URI.
1041        for hint in locator.hints:
1042            if hint.startswith('K@'):
1043                if len(hint) == 7:
1044                    sorted_roots.append(
1045                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1046                elif len(hint) == 29:
1047                    svc = self._gateway_services.get(hint[2:])
1048                    if svc:
1049                        sorted_roots.append(svc['_service_root'])
1050
1051        # Sort the available local services by weight (heaviest first)
1052        # for this locator, and return their service_roots (base URIs)
1053        # in that order.
1054        use_services = self._keep_services
1055        if need_writable:
1056            use_services = self._writable_services
1057        self.using_proxy = self._any_nondisk_services(use_services)
1058        sorted_roots.extend([
1059            svc['_service_root'] for svc in sorted(
1060                use_services,
1061                reverse=True,
1062                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1063        _logger.debug("{}: {}".format(locator, sorted_roots))
1064        return sorted_roots

Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.

def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1066    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1067        # roots_map is a dictionary, mapping Keep service root strings
1068        # to KeepService objects.  Poll for Keep services, and add any
1069        # new ones to roots_map.  Return the current list of local
1070        # root strings.
1071        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1072        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1073        for root in local_roots:
1074            if root not in roots_map:
1075                roots_map[root] = self.KeepService(
1076                    root, self._user_agent_pool,
1077                    upload_counter=self.upload_counter,
1078                    download_counter=self.download_counter,
1079                    headers=headers,
1080                    insecure=self.insecure)
1081        return local_roots
def get_from_cache(self, loc_s):
1101    def get_from_cache(self, loc_s):
1102        """Fetch a block only if is in the cache, otherwise return None."""
1103        locator = KeepLocator(loc_s)
1104        slot = self.block_cache.get(locator.md5sum)
1105        if slot is not None and slot.ready.is_set():
1106            return slot.get()
1107        else:
1108            return None

Fetch a block only if is in the cache, otherwise return None.

def refresh_signature(self, loc):
1110    def refresh_signature(self, loc):
1111        """Ask Keep to get the remote block and return its local signature"""
1112        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1113        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})

Ask Keep to get the remote block and return its local signature

@retry.retry_method
def head(self, loc_s, **kwargs):
1115    @retry.retry_method
1116    def head(self, loc_s, **kwargs):
1117        return self._get_or_head(loc_s, method="HEAD", **kwargs)
@retry.retry_method
def get(self, loc_s, **kwargs):
1119    @retry.retry_method
1120    def get(self, loc_s, **kwargs):
1121        return self._get_or_head(loc_s, method="GET", **kwargs)
@retry.retry_method
def put( self, data, copies=2, num_retries=None, request_id=None, classes=None):
1273    @retry.retry_method
1274    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1275        """Save data in Keep.
1276
1277        This method will get a list of Keep services from the API server, and
1278        send the data to each one simultaneously in a new thread.  Once the
1279        uploads are finished, if enough copies are saved, this method returns
1280        the most recent HTTP response body.  If requests fail to upload
1281        enough copies, this method raises KeepWriteError.
1282
1283        Arguments:
1284        * data: The string of data to upload.
1285        * copies: The number of copies that the user requires be saved.
1286          Default 2.
1287        * num_retries: The number of times to retry PUT requests to
1288          *each* Keep server if it returns temporary failures, with
1289          exponential backoff.  The default value is set when the
1290          KeepClient is initialized.
1291        * classes: An optional list of storage class names where copies should
1292          be written.
1293        """
1294
1295        classes = classes or self._default_classes
1296
1297        if not isinstance(data, bytes):
1298            data = data.encode()
1299
1300        self.put_counter.add(1)
1301
1302        data_hash = hashlib.md5(data).hexdigest()
1303        loc_s = data_hash + '+' + str(len(data))
1304        if copies < 1:
1305            return loc_s
1306        locator = KeepLocator(loc_s)
1307
1308        request_id = (request_id or
1309                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1310                      arvados.util.new_request_id())
1311        headers = {
1312            'X-Request-Id': request_id,
1313            'X-Keep-Desired-Replicas': str(copies),
1314        }
1315        roots_map = {}
1316        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1317                               backoff_start=2)
1318        done_copies = 0
1319        done_classes = []
1320        for tries_left in loop:
1321            try:
1322                sorted_roots = self.map_new_services(
1323                    roots_map, locator,
1324                    force_rebuild=(tries_left < num_retries),
1325                    need_writable=True,
1326                    headers=headers)
1327            except Exception as error:
1328                loop.save_result(error)
1329                continue
1330
1331            pending_classes = []
1332            if done_classes is not None:
1333                pending_classes = list(set(classes) - set(done_classes))
1334            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1335                                                        data_hash=data_hash,
1336                                                        copies=copies - done_copies,
1337                                                        max_service_replicas=self.max_replicas_per_service,
1338                                                        timeout=self.current_timeout(num_retries - tries_left),
1339                                                        classes=pending_classes)
1340            for service_root, ks in [(root, roots_map[root])
1341                                     for root in sorted_roots]:
1342                if ks.finished():
1343                    continue
1344                writer_pool.add_task(ks, service_root)
1345            writer_pool.join()
1346            pool_copies, pool_classes = writer_pool.done()
1347            done_copies += pool_copies
1348            if (done_classes is not None) and (pool_classes is not None):
1349                done_classes += pool_classes
1350                loop.save_result(
1351                    (done_copies >= copies and set(done_classes) == set(classes),
1352                    writer_pool.total_task_nr))
1353            else:
1354                # Old keepstore contacted without storage classes support:
1355                # success is determined only by successful copies.
1356                #
1357                # Disable storage classes tracking from this point forward.
1358                if not self._storage_classes_unsupported_warning:
1359                    self._storage_classes_unsupported_warning = True
1360                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1361                done_classes = None
1362                loop.save_result(
1363                    (done_copies >= copies, writer_pool.total_task_nr))
1364
1365        if loop.success():
1366            return writer_pool.response()
1367        if not roots_map:
1368            raise arvados.errors.KeepWriteError(
1369                "[{}] failed to write {}: no Keep services available ({})".format(
1370                    request_id, data_hash, loop.last_result()))
1371        else:
1372            service_errors = ((key, roots_map[key].last_result()['error'])
1373                              for key in sorted_roots
1374                              if roots_map[key].last_result()['error'])
1375            raise arvados.errors.KeepWriteError(
1376                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1377                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")

Save data in Keep.

This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.

Arguments:

  • data: The string of data to upload.
  • copies: The number of copies that the user requires be saved. Default 2.
  • num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
  • classes: An optional list of storage class names where copies should be written.
def block_prefetch(self, locator):
1403    def block_prefetch(self, locator):
1404        """
1405        This relies on the fact that KeepClient implements a block cache,
1406        so repeated requests for the same block will not result in repeated
1407        downloads (unless the block is evicted from the cache.)  This method
1408        does not block.
1409        """
1410
1411        if self.block_cache.get(locator) is not None:
1412            return
1413
1414        self._start_prefetch_threads()
1415        self._prefetch_queue.put(locator)

This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.

def stop_prefetch_threads(self):
1417    def stop_prefetch_threads(self):
1418        with self.lock:
1419            if self._prefetch_threads is not None:
1420                for t in self._prefetch_threads:
1421                    self._prefetch_queue.put(None)
1422                for t in self._prefetch_threads:
1423                    t.join()
1424            self._prefetch_threads = None
1425            self._prefetch_queue = None
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1427    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1428        """A stub for put().
1429
1430        This method is used in place of the real put() method when
1431        using local storage (see constructor's local_store argument).
1432
1433        copies and num_retries arguments are ignored: they are here
1434        only for the sake of offering the same call signature as
1435        put().
1436
1437        Data stored this way can be retrieved via local_store_get().
1438        """
1439        md5 = hashlib.md5(data).hexdigest()
1440        locator = '%s+%d' % (md5, len(data))
1441        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1442            f.write(data)
1443        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1444                  os.path.join(self.local_store, md5))
1445        return locator

A stub for put().

This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).

copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().

Data stored this way can be retrieved via local_store_get().

def local_store_get(self, loc_s, num_retries=None):
1447    def local_store_get(self, loc_s, num_retries=None):
1448        """Companion to local_store_put()."""
1449        try:
1450            locator = KeepLocator(loc_s)
1451        except ValueError:
1452            raise arvados.errors.NotFoundError(
1453                "Invalid data locator: '%s'" % loc_s)
1454        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1455            return b''
1456        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1457            return f.read()

Companion to local_store_put().

def local_store_head(self, loc_s, num_retries=None):
1459    def local_store_head(self, loc_s, num_retries=None):
1460        """Companion to local_store_put()."""
1461        try:
1462            locator = KeepLocator(loc_s)
1463        except ValueError:
1464            raise arvados.errors.NotFoundError(
1465                "Invalid data locator: '%s'" % loc_s)
1466        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1467            return True
1468        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1469            return True

Companion to local_store_put().

class KeepClient.KeepService(arvados._pycurlhelper.PyCurlHelper):
389    class KeepService(PyCurlHelper):
390        """Make requests to a single Keep service, and track results.
391
392        A KeepService is intended to last long enough to perform one
393        transaction (GET or PUT) against one Keep service. This can
394        involve calling either get() or put() multiple times in order
395        to retry after transient failures. However, calling both get()
396        and put() on a single instance -- or using the same instance
397        to access two different Keep services -- will not produce
398        sensible behavior.
399        """
400
401        HTTP_ERRORS = (
402            socket.error,
403            ssl.SSLError,
404            arvados.errors.HttpError,
405        )
406
407        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
408                     upload_counter=None,
409                     download_counter=None,
410                     headers={},
411                     insecure=False):
412            super(KeepClient.KeepService, self).__init__()
413            self.root = root
414            self._user_agent_pool = user_agent_pool
415            self._result = {'error': None}
416            self._usable = True
417            self._session = None
418            self._socket = None
419            self.get_headers = {'Accept': 'application/octet-stream'}
420            self.get_headers.update(headers)
421            self.put_headers = headers
422            self.upload_counter = upload_counter
423            self.download_counter = download_counter
424            self.insecure = insecure
425
426        def usable(self):
427            """Is it worth attempting a request?"""
428            return self._usable
429
430        def finished(self):
431            """Did the request succeed or encounter permanent failure?"""
432            return self._result['error'] == False or not self._usable
433
434        def last_result(self):
435            return self._result
436
437        def _get_user_agent(self):
438            try:
439                return self._user_agent_pool.get(block=False)
440            except queue.Empty:
441                return pycurl.Curl()
442
443        def _put_user_agent(self, ua):
444            try:
445                ua.reset()
446                self._user_agent_pool.put(ua, block=False)
447            except:
448                ua.close()
449
450        def get(self, locator, method="GET", timeout=None):
451            # locator is a KeepLocator object.
452            url = self.root + str(locator)
453            _logger.debug("Request: %s %s", method, url)
454            curl = self._get_user_agent()
455            ok = None
456            try:
457                with timer.Timer() as t:
458                    self._headers = {}
459                    response_body = BytesIO()
460                    curl.setopt(pycurl.NOSIGNAL, 1)
461                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
462                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
463                    curl.setopt(pycurl.URL, url.encode('utf-8'))
464                    curl.setopt(pycurl.HTTPHEADER, [
465                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
466                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
467                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
468                    if self.insecure:
469                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
470                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
471                    else:
472                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
473                    if method == "HEAD":
474                        curl.setopt(pycurl.NOBODY, True)
475                    else:
476                        curl.setopt(pycurl.HTTPGET, True)
477                    self._setcurltimeouts(curl, timeout, method=="HEAD")
478
479                    try:
480                        curl.perform()
481                    except Exception as e:
482                        raise arvados.errors.HttpError(0, str(e))
483                    finally:
484                        if self._socket:
485                            self._socket.close()
486                            self._socket = None
487                    self._result = {
488                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
489                        'body': response_body.getvalue(),
490                        'headers': self._headers,
491                        'error': False,
492                    }
493
494                ok = retry.check_http_response_success(self._result['status_code'])
495                if not ok:
496                    self._result['error'] = arvados.errors.HttpError(
497                        self._result['status_code'],
498                        self._headers.get('x-status-line', 'Error'))
499            except self.HTTP_ERRORS as e:
500                self._result = {
501                    'error': e,
502                }
503            self._usable = ok != False
504            if self._result.get('status_code', None):
505                # The client worked well enough to get an HTTP status
506                # code, so presumably any problems are just on the
507                # server side and it's OK to reuse the client.
508                self._put_user_agent(curl)
509            else:
510                # Don't return this client to the pool, in case it's
511                # broken.
512                curl.close()
513            if not ok:
514                _logger.debug("Request fail: GET %s => %s: %s",
515                              url, type(self._result['error']), str(self._result['error']))
516                return None
517            if method == "HEAD":
518                _logger.info("HEAD %s: %s bytes",
519                         self._result['status_code'],
520                         self._result.get('content-length'))
521                if self._result['headers'].get('x-keep-locator'):
522                    # This is a response to a remote block copy request, return
523                    # the local copy block locator.
524                    return self._result['headers'].get('x-keep-locator')
525                return True
526
527            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
528                         self._result['status_code'],
529                         len(self._result['body']),
530                         t.msecs,
531                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
532
533            if self.download_counter:
534                self.download_counter.add(len(self._result['body']))
535            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
536            if resp_md5 != locator.md5sum:
537                _logger.warning("Checksum fail: md5(%s) = %s",
538                                url, resp_md5)
539                self._result['error'] = arvados.errors.HttpError(
540                    0, 'Checksum fail')
541                return None
542            return self._result['body']
543
544        def put(self, hash_s, body, timeout=None, headers={}):
545            put_headers = copy.copy(self.put_headers)
546            put_headers.update(headers)
547            url = self.root + hash_s
548            _logger.debug("Request: PUT %s", url)
549            curl = self._get_user_agent()
550            ok = None
551            try:
552                with timer.Timer() as t:
553                    self._headers = {}
554                    body_reader = BytesIO(body)
555                    response_body = BytesIO()
556                    curl.setopt(pycurl.NOSIGNAL, 1)
557                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
558                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
559                    curl.setopt(pycurl.URL, url.encode('utf-8'))
560                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
561                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
562                    # response) instead of sending the request body immediately.
563                    # This allows the server to reject the request if the request
564                    # is invalid or the server is read-only, without waiting for
565                    # the client to send the entire block.
566                    curl.setopt(pycurl.UPLOAD, True)
567                    curl.setopt(pycurl.INFILESIZE, len(body))
568                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
569                    curl.setopt(pycurl.HTTPHEADER, [
570                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
571                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
572                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
573                    if self.insecure:
574                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
575                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
576                    else:
577                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
578                    self._setcurltimeouts(curl, timeout)
579                    try:
580                        curl.perform()
581                    except Exception as e:
582                        raise arvados.errors.HttpError(0, str(e))
583                    finally:
584                        if self._socket:
585                            self._socket.close()
586                            self._socket = None
587                    self._result = {
588                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
589                        'body': response_body.getvalue().decode('utf-8'),
590                        'headers': self._headers,
591                        'error': False,
592                    }
593                ok = retry.check_http_response_success(self._result['status_code'])
594                if not ok:
595                    self._result['error'] = arvados.errors.HttpError(
596                        self._result['status_code'],
597                        self._headers.get('x-status-line', 'Error'))
598            except self.HTTP_ERRORS as e:
599                self._result = {
600                    'error': e,
601                }
602            self._usable = ok != False # still usable if ok is True or None
603            if self._result.get('status_code', None):
604                # Client is functional. See comment in get().
605                self._put_user_agent(curl)
606            else:
607                curl.close()
608            if not ok:
609                _logger.debug("Request fail: PUT %s => %s: %s",
610                              url, type(self._result['error']), str(self._result['error']))
611                return False
612            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
613                         self._result['status_code'],
614                         len(body),
615                         t.msecs,
616                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
617            if self.upload_counter:
618                self.upload_counter.add(len(body))
619            return True

Make requests to a single Keep service, and track results.

A KeepService is intended to last long enough to perform one transaction (GET or PUT) against one Keep service. This can involve calling either get() or put() multiple times in order to retry after transient failures. However, calling both get() and put() on a single instance – or using the same instance to access two different Keep services – will not produce sensible behavior.

KeepClient.KeepService( root, user_agent_pool=<queue.LifoQueue object>, upload_counter=None, download_counter=None, headers={}, insecure=False)
407        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
408                     upload_counter=None,
409                     download_counter=None,
410                     headers={},
411                     insecure=False):
412            super(KeepClient.KeepService, self).__init__()
413            self.root = root
414            self._user_agent_pool = user_agent_pool
415            self._result = {'error': None}
416            self._usable = True
417            self._session = None
418            self._socket = None
419            self.get_headers = {'Accept': 'application/octet-stream'}
420            self.get_headers.update(headers)
421            self.put_headers = headers
422            self.upload_counter = upload_counter
423            self.download_counter = download_counter
424            self.insecure = insecure
HTTP_ERRORS = (<class 'OSError'>, <class 'ssl.SSLError'>, <class 'arvados.errors.HttpError'>)
root
get_headers
put_headers
upload_counter
download_counter
insecure
def usable(self):
426        def usable(self):
427            """Is it worth attempting a request?"""
428            return self._usable

Is it worth attempting a request?

def finished(self):
430        def finished(self):
431            """Did the request succeed or encounter permanent failure?"""
432            return self._result['error'] == False or not self._usable

Did the request succeed or encounter permanent failure?

def last_result(self):
434        def last_result(self):
435            return self._result
def get(self, locator, method='GET', timeout=None):
450        def get(self, locator, method="GET", timeout=None):
451            # locator is a KeepLocator object.
452            url = self.root + str(locator)
453            _logger.debug("Request: %s %s", method, url)
454            curl = self._get_user_agent()
455            ok = None
456            try:
457                with timer.Timer() as t:
458                    self._headers = {}
459                    response_body = BytesIO()
460                    curl.setopt(pycurl.NOSIGNAL, 1)
461                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
462                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
463                    curl.setopt(pycurl.URL, url.encode('utf-8'))
464                    curl.setopt(pycurl.HTTPHEADER, [
465                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
466                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
467                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
468                    if self.insecure:
469                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
470                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
471                    else:
472                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
473                    if method == "HEAD":
474                        curl.setopt(pycurl.NOBODY, True)
475                    else:
476                        curl.setopt(pycurl.HTTPGET, True)
477                    self._setcurltimeouts(curl, timeout, method=="HEAD")
478
479                    try:
480                        curl.perform()
481                    except Exception as e:
482                        raise arvados.errors.HttpError(0, str(e))
483                    finally:
484                        if self._socket:
485                            self._socket.close()
486                            self._socket = None
487                    self._result = {
488                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
489                        'body': response_body.getvalue(),
490                        'headers': self._headers,
491                        'error': False,
492                    }
493
494                ok = retry.check_http_response_success(self._result['status_code'])
495                if not ok:
496                    self._result['error'] = arvados.errors.HttpError(
497                        self._result['status_code'],
498                        self._headers.get('x-status-line', 'Error'))
499            except self.HTTP_ERRORS as e:
500                self._result = {
501                    'error': e,
502                }
503            self._usable = ok != False
504            if self._result.get('status_code', None):
505                # The client worked well enough to get an HTTP status
506                # code, so presumably any problems are just on the
507                # server side and it's OK to reuse the client.
508                self._put_user_agent(curl)
509            else:
510                # Don't return this client to the pool, in case it's
511                # broken.
512                curl.close()
513            if not ok:
514                _logger.debug("Request fail: GET %s => %s: %s",
515                              url, type(self._result['error']), str(self._result['error']))
516                return None
517            if method == "HEAD":
518                _logger.info("HEAD %s: %s bytes",
519                         self._result['status_code'],
520                         self._result.get('content-length'))
521                if self._result['headers'].get('x-keep-locator'):
522                    # This is a response to a remote block copy request, return
523                    # the local copy block locator.
524                    return self._result['headers'].get('x-keep-locator')
525                return True
526
527            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
528                         self._result['status_code'],
529                         len(self._result['body']),
530                         t.msecs,
531                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
532
533            if self.download_counter:
534                self.download_counter.add(len(self._result['body']))
535            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
536            if resp_md5 != locator.md5sum:
537                _logger.warning("Checksum fail: md5(%s) = %s",
538                                url, resp_md5)
539                self._result['error'] = arvados.errors.HttpError(
540                    0, 'Checksum fail')
541                return None
542            return self._result['body']
def put(self, hash_s, body, timeout=None, headers={}):
544        def put(self, hash_s, body, timeout=None, headers={}):
545            put_headers = copy.copy(self.put_headers)
546            put_headers.update(headers)
547            url = self.root + hash_s
548            _logger.debug("Request: PUT %s", url)
549            curl = self._get_user_agent()
550            ok = None
551            try:
552                with timer.Timer() as t:
553                    self._headers = {}
554                    body_reader = BytesIO(body)
555                    response_body = BytesIO()
556                    curl.setopt(pycurl.NOSIGNAL, 1)
557                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
558                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
559                    curl.setopt(pycurl.URL, url.encode('utf-8'))
560                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
561                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
562                    # response) instead of sending the request body immediately.
563                    # This allows the server to reject the request if the request
564                    # is invalid or the server is read-only, without waiting for
565                    # the client to send the entire block.
566                    curl.setopt(pycurl.UPLOAD, True)
567                    curl.setopt(pycurl.INFILESIZE, len(body))
568                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
569                    curl.setopt(pycurl.HTTPHEADER, [
570                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
571                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
572                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
573                    if self.insecure:
574                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
575                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
576                    else:
577                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
578                    self._setcurltimeouts(curl, timeout)
579                    try:
580                        curl.perform()
581                    except Exception as e:
582                        raise arvados.errors.HttpError(0, str(e))
583                    finally:
584                        if self._socket:
585                            self._socket.close()
586                            self._socket = None
587                    self._result = {
588                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
589                        'body': response_body.getvalue().decode('utf-8'),
590                        'headers': self._headers,
591                        'error': False,
592                    }
593                ok = retry.check_http_response_success(self._result['status_code'])
594                if not ok:
595                    self._result['error'] = arvados.errors.HttpError(
596                        self._result['status_code'],
597                        self._headers.get('x-status-line', 'Error'))
598            except self.HTTP_ERRORS as e:
599                self._result = {
600                    'error': e,
601                }
602            self._usable = ok != False # still usable if ok is True or None
603            if self._result.get('status_code', None):
604                # Client is functional. See comment in get().
605                self._put_user_agent(curl)
606            else:
607                curl.close()
608            if not ok:
609                _logger.debug("Request fail: PUT %s => %s: %s",
610                              url, type(self._result['error']), str(self._result['error']))
611                return False
612            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
613                         self._result['status_code'],
614                         len(body),
615                         t.msecs,
616                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
617            if self.upload_counter:
618                self.upload_counter.add(len(body))
619            return True
class KeepClient.KeepWriterQueue(queue.Queue):
622    class KeepWriterQueue(queue.Queue):
623        def __init__(self, copies, classes=[]):
624            queue.Queue.__init__(self) # Old-style superclass
625            self.wanted_copies = copies
626            self.wanted_storage_classes = classes
627            self.successful_copies = 0
628            self.confirmed_storage_classes = {}
629            self.response = None
630            self.storage_classes_tracking = True
631            self.queue_data_lock = threading.RLock()
632            self.pending_tries = max(copies, len(classes))
633            self.pending_tries_notification = threading.Condition()
634
635        def write_success(self, response, replicas_nr, classes_confirmed):
636            with self.queue_data_lock:
637                self.successful_copies += replicas_nr
638                if classes_confirmed is None:
639                    self.storage_classes_tracking = False
640                elif self.storage_classes_tracking:
641                    for st_class, st_copies in classes_confirmed.items():
642                        try:
643                            self.confirmed_storage_classes[st_class] += st_copies
644                        except KeyError:
645                            self.confirmed_storage_classes[st_class] = st_copies
646                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
647                self.response = response
648            with self.pending_tries_notification:
649                self.pending_tries_notification.notify_all()
650
651        def write_fail(self, ks):
652            with self.pending_tries_notification:
653                self.pending_tries += 1
654                self.pending_tries_notification.notify()
655
656        def pending_copies(self):
657            with self.queue_data_lock:
658                return self.wanted_copies - self.successful_copies
659
660        def satisfied_classes(self):
661            with self.queue_data_lock:
662                if not self.storage_classes_tracking:
663                    # Notifies disabled storage classes expectation to
664                    # the outer loop.
665                    return None
666            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
667
668        def pending_classes(self):
669            with self.queue_data_lock:
670                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
671                    return []
672                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
673                for st_class, st_copies in self.confirmed_storage_classes.items():
674                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
675                        unsatisfied_classes.remove(st_class)
676                return unsatisfied_classes
677
678        def get_next_task(self):
679            with self.pending_tries_notification:
680                while True:
681                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
682                        # This notify_all() is unnecessary --
683                        # write_success() already called notify_all()
684                        # when pending<1 became true, so it's not
685                        # possible for any other thread to be in
686                        # wait() now -- but it's cheap insurance
687                        # against deadlock so we do it anyway:
688                        self.pending_tries_notification.notify_all()
689                        # Drain the queue and then raise Queue.Empty
690                        while True:
691                            self.get_nowait()
692                            self.task_done()
693                    elif self.pending_tries > 0:
694                        service, service_root = self.get_nowait()
695                        if service.finished():
696                            self.task_done()
697                            continue
698                        self.pending_tries -= 1
699                        return service, service_root
700                    elif self.empty():
701                        self.pending_tries_notification.notify_all()
702                        raise queue.Empty
703                    else:
704                        self.pending_tries_notification.wait()

Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.

KeepClient.KeepWriterQueue(copies, classes=[])
623        def __init__(self, copies, classes=[]):
624            queue.Queue.__init__(self) # Old-style superclass
625            self.wanted_copies = copies
626            self.wanted_storage_classes = classes
627            self.successful_copies = 0
628            self.confirmed_storage_classes = {}
629            self.response = None
630            self.storage_classes_tracking = True
631            self.queue_data_lock = threading.RLock()
632            self.pending_tries = max(copies, len(classes))
633            self.pending_tries_notification = threading.Condition()
wanted_copies
wanted_storage_classes
successful_copies
confirmed_storage_classes
response
storage_classes_tracking
queue_data_lock
pending_tries
pending_tries_notification
def write_success(self, response, replicas_nr, classes_confirmed):
635        def write_success(self, response, replicas_nr, classes_confirmed):
636            with self.queue_data_lock:
637                self.successful_copies += replicas_nr
638                if classes_confirmed is None:
639                    self.storage_classes_tracking = False
640                elif self.storage_classes_tracking:
641                    for st_class, st_copies in classes_confirmed.items():
642                        try:
643                            self.confirmed_storage_classes[st_class] += st_copies
644                        except KeyError:
645                            self.confirmed_storage_classes[st_class] = st_copies
646                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
647                self.response = response
648            with self.pending_tries_notification:
649                self.pending_tries_notification.notify_all()
def write_fail(self, ks):
651        def write_fail(self, ks):
652            with self.pending_tries_notification:
653                self.pending_tries += 1
654                self.pending_tries_notification.notify()
def pending_copies(self):
656        def pending_copies(self):
657            with self.queue_data_lock:
658                return self.wanted_copies - self.successful_copies
def satisfied_classes(self):
660        def satisfied_classes(self):
661            with self.queue_data_lock:
662                if not self.storage_classes_tracking:
663                    # Notifies disabled storage classes expectation to
664                    # the outer loop.
665                    return None
666            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
def pending_classes(self):
668        def pending_classes(self):
669            with self.queue_data_lock:
670                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
671                    return []
672                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
673                for st_class, st_copies in self.confirmed_storage_classes.items():
674                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
675                        unsatisfied_classes.remove(st_class)
676                return unsatisfied_classes
def get_next_task(self):
678        def get_next_task(self):
679            with self.pending_tries_notification:
680                while True:
681                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
682                        # This notify_all() is unnecessary --
683                        # write_success() already called notify_all()
684                        # when pending<1 became true, so it's not
685                        # possible for any other thread to be in
686                        # wait() now -- but it's cheap insurance
687                        # against deadlock so we do it anyway:
688                        self.pending_tries_notification.notify_all()
689                        # Drain the queue and then raise Queue.Empty
690                        while True:
691                            self.get_nowait()
692                            self.task_done()
693                    elif self.pending_tries > 0:
694                        service, service_root = self.get_nowait()
695                        if service.finished():
696                            self.task_done()
697                            continue
698                        self.pending_tries -= 1
699                        return service, service_root
700                    elif self.empty():
701                        self.pending_tries_notification.notify_all()
702                        raise queue.Empty
703                    else:
704                        self.pending_tries_notification.wait()
Inherited Members
queue.Queue
maxsize
mutex
not_empty
not_full
all_tasks_done
unfinished_tasks
task_done
join
qsize
empty
full
put
get
put_nowait
get_nowait
class KeepClient.KeepWriterThreadPool:
707    class KeepWriterThreadPool(object):
708        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
709            self.total_task_nr = 0
710            if (not max_service_replicas) or (max_service_replicas >= copies):
711                num_threads = 1
712            else:
713                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
714            _logger.debug("Pool max threads is %d", num_threads)
715            self.workers = []
716            self.queue = KeepClient.KeepWriterQueue(copies, classes)
717            # Create workers
718            for _ in range(num_threads):
719                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
720                self.workers.append(w)
721
722        def add_task(self, ks, service_root):
723            self.queue.put((ks, service_root))
724            self.total_task_nr += 1
725
726        def done(self):
727            return self.queue.successful_copies, self.queue.satisfied_classes()
728
729        def join(self):
730            # Start workers
731            for worker in self.workers:
732                worker.start()
733            # Wait for finished work
734            self.queue.join()
735
736        def response(self):
737            return self.queue.response
KeepClient.KeepWriterThreadPool( data, data_hash, copies, max_service_replicas, timeout=None, classes=[])
708        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
709            self.total_task_nr = 0
710            if (not max_service_replicas) or (max_service_replicas >= copies):
711                num_threads = 1
712            else:
713                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
714            _logger.debug("Pool max threads is %d", num_threads)
715            self.workers = []
716            self.queue = KeepClient.KeepWriterQueue(copies, classes)
717            # Create workers
718            for _ in range(num_threads):
719                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
720                self.workers.append(w)
total_task_nr
workers
queue
def add_task(self, ks, service_root):
722        def add_task(self, ks, service_root):
723            self.queue.put((ks, service_root))
724            self.total_task_nr += 1
def done(self):
726        def done(self):
727            return self.queue.successful_copies, self.queue.satisfied_classes()
def join(self):
729        def join(self):
730            # Start workers
731            for worker in self.workers:
732                worker.start()
733            # Wait for finished work
734            self.queue.join()
def response(self):
736        def response(self):
737            return self.queue.response
class KeepClient.KeepWriterThread(threading.Thread):
740    class KeepWriterThread(threading.Thread):
741        class TaskFailed(RuntimeError): pass
742
743        def __init__(self, queue, data, data_hash, timeout=None):
744            super(KeepClient.KeepWriterThread, self).__init__()
745            self.timeout = timeout
746            self.queue = queue
747            self.data = data
748            self.data_hash = data_hash
749            self.daemon = True
750
751        def run(self):
752            while True:
753                try:
754                    service, service_root = self.queue.get_next_task()
755                except queue.Empty:
756                    return
757                try:
758                    locator, copies, classes = self.do_task(service, service_root)
759                except Exception as e:
760                    if not isinstance(e, self.TaskFailed):
761                        _logger.exception("Exception in KeepWriterThread")
762                    self.queue.write_fail(service)
763                else:
764                    self.queue.write_success(locator, copies, classes)
765                finally:
766                    self.queue.task_done()
767
768        def do_task(self, service, service_root):
769            classes = self.queue.pending_classes()
770            headers = {}
771            if len(classes) > 0:
772                classes.sort()
773                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
774            success = bool(service.put(self.data_hash,
775                                        self.data,
776                                        timeout=self.timeout,
777                                        headers=headers))
778            result = service.last_result()
779
780            if not success:
781                if result.get('status_code'):
782                    _logger.debug("Request fail: PUT %s => %s %s",
783                                  self.data_hash,
784                                  result.get('status_code'),
785                                  result.get('body'))
786                raise self.TaskFailed()
787
788            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
789                          str(threading.current_thread()),
790                          self.data_hash,
791                          len(self.data),
792                          service_root)
793            try:
794                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
795            except (KeyError, ValueError):
796                replicas_stored = 1
797
798            classes_confirmed = {}
799            try:
800                scch = result['headers']['x-keep-storage-classes-confirmed']
801                for confirmation in scch.replace(' ', '').split(','):
802                    if '=' in confirmation:
803                        stored_class, stored_copies = confirmation.split('=')[:2]
804                        classes_confirmed[stored_class] = int(stored_copies)
805            except (KeyError, ValueError):
806                # Storage classes confirmed header missing or corrupt
807                classes_confirmed = None
808
809            return result['body'].strip(), replicas_stored, classes_confirmed

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

KeepClient.KeepWriterThread(queue, data, data_hash, timeout=None)
743        def __init__(self, queue, data, data_hash, timeout=None):
744            super(KeepClient.KeepWriterThread, self).__init__()
745            self.timeout = timeout
746            self.queue = queue
747            self.data = data
748            self.data_hash = data_hash
749            self.daemon = True

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

timeout
queue
data
data_hash
daemon
1108    @property
1109    def daemon(self):
1110        """A boolean value indicating whether this thread is a daemon thread.
1111
1112        This must be set before start() is called, otherwise RuntimeError is
1113        raised. Its initial value is inherited from the creating thread; the
1114        main thread is not a daemon thread and therefore all threads created in
1115        the main thread default to daemon = False.
1116
1117        The entire Python program exits when only daemon threads are left.
1118
1119        """
1120        assert self._initialized, "Thread.__init__() not called"
1121        return self._daemonic

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

def run(self):
751        def run(self):
752            while True:
753                try:
754                    service, service_root = self.queue.get_next_task()
755                except queue.Empty:
756                    return
757                try:
758                    locator, copies, classes = self.do_task(service, service_root)
759                except Exception as e:
760                    if not isinstance(e, self.TaskFailed):
761                        _logger.exception("Exception in KeepWriterThread")
762                    self.queue.write_fail(service)
763                else:
764                    self.queue.write_success(locator, copies, classes)
765                finally:
766                    self.queue.task_done()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def do_task(self, service, service_root):
768        def do_task(self, service, service_root):
769            classes = self.queue.pending_classes()
770            headers = {}
771            if len(classes) > 0:
772                classes.sort()
773                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
774            success = bool(service.put(self.data_hash,
775                                        self.data,
776                                        timeout=self.timeout,
777                                        headers=headers))
778            result = service.last_result()
779
780            if not success:
781                if result.get('status_code'):
782                    _logger.debug("Request fail: PUT %s => %s %s",
783                                  self.data_hash,
784                                  result.get('status_code'),
785                                  result.get('body'))
786                raise self.TaskFailed()
787
788            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
789                          str(threading.current_thread()),
790                          self.data_hash,
791                          len(self.data),
792                          service_root)
793            try:
794                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
795            except (KeyError, ValueError):
796                replicas_stored = 1
797
798            classes_confirmed = {}
799            try:
800                scch = result['headers']['x-keep-storage-classes-confirmed']
801                for confirmation in scch.replace(' ', '').split(','):
802                    if '=' in confirmation:
803                        stored_class, stored_copies = confirmation.split('=')[:2]
804                        classes_confirmed[stored_class] = int(stored_copies)
805            except (KeyError, ValueError):
806                # Storage classes confirmed header missing or corrupt
807                classes_confirmed = None
808
809            return result['body'].strip(), replicas_stored, classes_confirmed
Inherited Members
threading.Thread
start
join
name
ident
is_alive
isDaemon
setDaemon
getName
setName
native_id
class KeepClient.KeepWriterThread.TaskFailed(builtins.RuntimeError):
741        class TaskFailed(RuntimeError): pass

Unspecified run-time error.

Inherited Members
builtins.RuntimeError
RuntimeError
builtins.BaseException
with_traceback
args