arvados.keep

   1# Copyright (C) The Arvados Authors. All rights reserved.
   2#
   3# SPDX-License-Identifier: Apache-2.0
   4
   5from __future__ import absolute_import
   6from __future__ import division
   7import copy
   8from future import standard_library
   9from future.utils import native_str
  10standard_library.install_aliases()
  11from builtins import next
  12from builtins import str
  13from builtins import range
  14from builtins import object
  15import collections
  16import datetime
  17import hashlib
  18import errno
  19import io
  20import logging
  21import math
  22import os
  23import pycurl
  24import queue
  25import re
  26import socket
  27import ssl
  28import sys
  29import threading
  30import resource
  31from . import timer
  32import urllib.parse
  33import traceback
  34import weakref
  35
  36if sys.version_info >= (3, 0):
  37    from io import BytesIO
  38else:
  39    from cStringIO import StringIO as BytesIO
  40
  41import arvados
  42import arvados.config as config
  43import arvados.errors
  44import arvados.retry as retry
  45import arvados.util
  46import arvados.diskcache
  47from arvados._pycurlhelper import PyCurlHelper
  48
  49_logger = logging.getLogger('arvados.keep')
  50global_client_object = None
  51
  52
  53# Monkey patch TCP constants when not available (apple). Values sourced from:
  54# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
  55if sys.platform == 'darwin':
  56    if not hasattr(socket, 'TCP_KEEPALIVE'):
  57        socket.TCP_KEEPALIVE = 0x010
  58    if not hasattr(socket, 'TCP_KEEPINTVL'):
  59        socket.TCP_KEEPINTVL = 0x101
  60    if not hasattr(socket, 'TCP_KEEPCNT'):
  61        socket.TCP_KEEPCNT = 0x102
  62
  63
  64class KeepLocator(object):
  65    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
  66    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
  67
  68    def __init__(self, locator_str):
  69        self.hints = []
  70        self._perm_sig = None
  71        self._perm_expiry = None
  72        pieces = iter(locator_str.split('+'))
  73        self.md5sum = next(pieces)
  74        try:
  75            self.size = int(next(pieces))
  76        except StopIteration:
  77            self.size = None
  78        for hint in pieces:
  79            if self.HINT_RE.match(hint) is None:
  80                raise ValueError("invalid hint format: {}".format(hint))
  81            elif hint.startswith('A'):
  82                self.parse_permission_hint(hint)
  83            else:
  84                self.hints.append(hint)
  85
  86    def __str__(self):
  87        return '+'.join(
  88            native_str(s)
  89            for s in [self.md5sum, self.size,
  90                      self.permission_hint()] + self.hints
  91            if s is not None)
  92
  93    def stripped(self):
  94        if self.size is not None:
  95            return "%s+%i" % (self.md5sum, self.size)
  96        else:
  97            return self.md5sum
  98
  99    def _make_hex_prop(name, length):
 100        # Build and return a new property with the given name that
 101        # must be a hex string of the given length.
 102        data_name = '_{}'.format(name)
 103        def getter(self):
 104            return getattr(self, data_name)
 105        def setter(self, hex_str):
 106            if not arvados.util.is_hex(hex_str, length):
 107                raise ValueError("{} is not a {}-digit hex string: {!r}".
 108                                 format(name, length, hex_str))
 109            setattr(self, data_name, hex_str)
 110        return property(getter, setter)
 111
 112    md5sum = _make_hex_prop('md5sum', 32)
 113    perm_sig = _make_hex_prop('perm_sig', 40)
 114
 115    @property
 116    def perm_expiry(self):
 117        return self._perm_expiry
 118
 119    @perm_expiry.setter
 120    def perm_expiry(self, value):
 121        if not arvados.util.is_hex(value, 1, 8):
 122            raise ValueError(
 123                "permission timestamp must be a hex Unix timestamp: {}".
 124                format(value))
 125        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
 126
 127    def permission_hint(self):
 128        data = [self.perm_sig, self.perm_expiry]
 129        if None in data:
 130            return None
 131        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
 132        return "A{}@{:08x}".format(*data)
 133
 134    def parse_permission_hint(self, s):
 135        try:
 136            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
 137        except IndexError:
 138            raise ValueError("bad permission hint {}".format(s))
 139
 140    def permission_expired(self, as_of_dt=None):
 141        if self.perm_expiry is None:
 142            return False
 143        elif as_of_dt is None:
 144            as_of_dt = datetime.datetime.now()
 145        return self.perm_expiry <= as_of_dt
 146
 147
 148class Keep(object):
 149    """Simple interface to a global KeepClient object.
 150
 151    THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
 152    own API client.  The global KeepClient will build an API client from the
 153    current Arvados configuration, which may not match the one you built.
 154    """
 155    _last_key = None
 156
 157    @classmethod
 158    def global_client_object(cls):
 159        global global_client_object
 160        # Previously, KeepClient would change its behavior at runtime based
 161        # on these configuration settings.  We simulate that behavior here
 162        # by checking the values and returning a new KeepClient if any of
 163        # them have changed.
 164        key = (config.get('ARVADOS_API_HOST'),
 165               config.get('ARVADOS_API_TOKEN'),
 166               config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
 167               config.get('ARVADOS_KEEP_PROXY'),
 168               os.environ.get('KEEP_LOCAL_STORE'))
 169        if (global_client_object is None) or (cls._last_key != key):
 170            global_client_object = KeepClient()
 171            cls._last_key = key
 172        return global_client_object
 173
 174    @staticmethod
 175    def get(locator, **kwargs):
 176        return Keep.global_client_object().get(locator, **kwargs)
 177
 178    @staticmethod
 179    def put(data, **kwargs):
 180        return Keep.global_client_object().put(data, **kwargs)
 181
 182class KeepBlockCache(object):
 183    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
 184        self.cache_max = cache_max
 185        self._cache = collections.OrderedDict()
 186        self._cache_lock = threading.Lock()
 187        self._max_slots = max_slots
 188        self._disk_cache = disk_cache
 189        self._disk_cache_dir = disk_cache_dir
 190        self._cache_updating = threading.Condition(self._cache_lock)
 191
 192        if self._disk_cache and self._disk_cache_dir is None:
 193            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
 194            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
 195
 196        if self._max_slots == 0:
 197            if self._disk_cache:
 198                # Each block uses two file descriptors, one used to
 199                # open it initially and hold the flock(), and a second
 200                # hidden one used by mmap().
 201                #
 202                # Set max slots to 1/8 of maximum file handles.  This
 203                # means we'll use at most 1/4 of total file handles.
 204                #
 205                # NOFILE typically defaults to 1024 on Linux so this
 206                # is 128 slots (256 file handles), which means we can
 207                # cache up to 8 GiB of 64 MiB blocks.  This leaves
 208                # 768 file handles for sockets and other stuff.
 209                #
 210                # When we want the ability to have more cache (e.g. in
 211                # arv-mount) we'll increase rlimit before calling
 212                # this.
 213                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
 214            else:
 215                # RAM cache slots
 216                self._max_slots = 512
 217
 218        if self.cache_max == 0:
 219            if self._disk_cache:
 220                fs = os.statvfs(self._disk_cache_dir)
 221                # Calculation of available space incorporates existing cache usage
 222                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
 223                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
 224                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
 225                # pick smallest of:
 226                # 10% of total disk size
 227                # 25% of available space
 228                # max_slots * 64 MiB
 229                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
 230            else:
 231                # 256 MiB in RAM
 232                self.cache_max = (256 * 1024 * 1024)
 233
 234        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
 235
 236        self.cache_total = 0
 237        if self._disk_cache:
 238            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
 239            for slot in self._cache.values():
 240                self.cache_total += slot.size()
 241            self.cap_cache()
 242
 243    class CacheSlot(object):
 244        __slots__ = ("locator", "ready", "content")
 245
 246        def __init__(self, locator):
 247            self.locator = locator
 248            self.ready = threading.Event()
 249            self.content = None
 250
 251        def get(self):
 252            self.ready.wait()
 253            return self.content
 254
 255        def set(self, value):
 256            if self.content is not None:
 257                return False
 258            self.content = value
 259            self.ready.set()
 260            return True
 261
 262        def size(self):
 263            if self.content is None:
 264                return 0
 265            else:
 266                return len(self.content)
 267
 268        def evict(self):
 269            self.content = None
 270
 271
 272    def _resize_cache(self, cache_max, max_slots):
 273        # Try and make sure the contents of the cache do not exceed
 274        # the supplied maximums.
 275
 276        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
 277            return
 278
 279        _evict_candidates = collections.deque(self._cache.values())
 280        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
 281            slot = _evict_candidates.popleft()
 282            if not slot.ready.is_set():
 283                continue
 284
 285            sz = slot.size()
 286            slot.evict()
 287            self.cache_total -= sz
 288            del self._cache[slot.locator]
 289
 290
 291    def cap_cache(self):
 292        '''Cap the cache size to self.cache_max'''
 293        with self._cache_updating:
 294            self._resize_cache(self.cache_max, self._max_slots)
 295            self._cache_updating.notify_all()
 296
 297    def _get(self, locator):
 298        # Test if the locator is already in the cache
 299        if locator in self._cache:
 300            n = self._cache[locator]
 301            if n.ready.is_set() and n.content is None:
 302                del self._cache[n.locator]
 303                return None
 304            self._cache.move_to_end(locator)
 305            return n
 306        if self._disk_cache:
 307            # see if it exists on disk
 308            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
 309            if n is not None:
 310                self._cache[n.locator] = n
 311                self.cache_total += n.size()
 312                return n
 313        return None
 314
 315    def get(self, locator):
 316        with self._cache_lock:
 317            return self._get(locator)
 318
 319    def reserve_cache(self, locator):
 320        '''Reserve a cache slot for the specified locator,
 321        or return the existing slot.'''
 322        with self._cache_updating:
 323            n = self._get(locator)
 324            if n:
 325                return n, False
 326            else:
 327                # Add a new cache slot for the locator
 328                self._resize_cache(self.cache_max, self._max_slots-1)
 329                while len(self._cache) >= self._max_slots:
 330                    # If there isn't a slot available, need to wait
 331                    # for something to happen that releases one of the
 332                    # cache slots.  Idle for 200 ms or woken up by
 333                    # another thread
 334                    self._cache_updating.wait(timeout=0.2)
 335                    self._resize_cache(self.cache_max, self._max_slots-1)
 336
 337                if self._disk_cache:
 338                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
 339                else:
 340                    n = KeepBlockCache.CacheSlot(locator)
 341                self._cache[n.locator] = n
 342                return n, True
 343
 344    def set(self, slot, blob):
 345        try:
 346            if slot.set(blob):
 347                self.cache_total += slot.size()
 348            return
 349        except OSError as e:
 350            if e.errno == errno.ENOMEM:
 351                # Reduce max slots to current - 4, cap cache and retry
 352                with self._cache_lock:
 353                    self._max_slots = max(4, len(self._cache) - 4)
 354            elif e.errno == errno.ENOSPC:
 355                # Reduce disk max space to current - 256 MiB, cap cache and retry
 356                with self._cache_lock:
 357                    sm = sum(st.size() for st in self._cache.values())
 358                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
 359            elif e.errno == errno.ENODEV:
 360                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
 361        except Exception as e:
 362            pass
 363        finally:
 364            # Check if we should evict things from the cache.  Either
 365            # because we added a new thing or there was an error and
 366            # we possibly adjusted the limits down, so we might need
 367            # to push something out.
 368            self.cap_cache()
 369
 370        try:
 371            # Only gets here if there was an error the first time. The
 372            # exception handler adjusts limits downward in some cases
 373            # to free up resources, which would make the operation
 374            # succeed.
 375            if slot.set(blob):
 376                self.cache_total += slot.size()
 377        except Exception as e:
 378            # It failed again.  Give up.
 379            slot.set(None)
 380            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
 381
 382        self.cap_cache()
 383
 384class Counter(object):
 385    def __init__(self, v=0):
 386        self._lk = threading.Lock()
 387        self._val = v
 388
 389    def add(self, v):
 390        with self._lk:
 391            self._val += v
 392
 393    def get(self):
 394        with self._lk:
 395            return self._val
 396
 397
 398class KeepClient(object):
 399    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 400    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 401
 402    class KeepService(PyCurlHelper):
 403        """Make requests to a single Keep service, and track results.
 404
 405        A KeepService is intended to last long enough to perform one
 406        transaction (GET or PUT) against one Keep service. This can
 407        involve calling either get() or put() multiple times in order
 408        to retry after transient failures. However, calling both get()
 409        and put() on a single instance -- or using the same instance
 410        to access two different Keep services -- will not produce
 411        sensible behavior.
 412        """
 413
 414        HTTP_ERRORS = (
 415            socket.error,
 416            ssl.SSLError,
 417            arvados.errors.HttpError,
 418        )
 419
 420        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 421                     upload_counter=None,
 422                     download_counter=None,
 423                     headers={},
 424                     insecure=False):
 425            super(KeepClient.KeepService, self).__init__()
 426            self.root = root
 427            self._user_agent_pool = user_agent_pool
 428            self._result = {'error': None}
 429            self._usable = True
 430            self._session = None
 431            self._socket = None
 432            self.get_headers = {'Accept': 'application/octet-stream'}
 433            self.get_headers.update(headers)
 434            self.put_headers = headers
 435            self.upload_counter = upload_counter
 436            self.download_counter = download_counter
 437            self.insecure = insecure
 438
 439        def usable(self):
 440            """Is it worth attempting a request?"""
 441            return self._usable
 442
 443        def finished(self):
 444            """Did the request succeed or encounter permanent failure?"""
 445            return self._result['error'] == False or not self._usable
 446
 447        def last_result(self):
 448            return self._result
 449
 450        def _get_user_agent(self):
 451            try:
 452                return self._user_agent_pool.get(block=False)
 453            except queue.Empty:
 454                return pycurl.Curl()
 455
 456        def _put_user_agent(self, ua):
 457            try:
 458                ua.reset()
 459                self._user_agent_pool.put(ua, block=False)
 460            except:
 461                ua.close()
 462
 463        def get(self, locator, method="GET", timeout=None):
 464            # locator is a KeepLocator object.
 465            url = self.root + str(locator)
 466            _logger.debug("Request: %s %s", method, url)
 467            curl = self._get_user_agent()
 468            ok = None
 469            try:
 470                with timer.Timer() as t:
 471                    self._headers = {}
 472                    response_body = BytesIO()
 473                    curl.setopt(pycurl.NOSIGNAL, 1)
 474                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 475                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 476                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 477                    curl.setopt(pycurl.HTTPHEADER, [
 478                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 479                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 480                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 481                    if self.insecure:
 482                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 483                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 484                    else:
 485                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 486                    if method == "HEAD":
 487                        curl.setopt(pycurl.NOBODY, True)
 488                    else:
 489                        curl.setopt(pycurl.HTTPGET, True)
 490                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 491
 492                    try:
 493                        curl.perform()
 494                    except Exception as e:
 495                        raise arvados.errors.HttpError(0, str(e))
 496                    finally:
 497                        if self._socket:
 498                            self._socket.close()
 499                            self._socket = None
 500                    self._result = {
 501                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 502                        'body': response_body.getvalue(),
 503                        'headers': self._headers,
 504                        'error': False,
 505                    }
 506
 507                ok = retry.check_http_response_success(self._result['status_code'])
 508                if not ok:
 509                    self._result['error'] = arvados.errors.HttpError(
 510                        self._result['status_code'],
 511                        self._headers.get('x-status-line', 'Error'))
 512            except self.HTTP_ERRORS as e:
 513                self._result = {
 514                    'error': e,
 515                }
 516            self._usable = ok != False
 517            if self._result.get('status_code', None):
 518                # The client worked well enough to get an HTTP status
 519                # code, so presumably any problems are just on the
 520                # server side and it's OK to reuse the client.
 521                self._put_user_agent(curl)
 522            else:
 523                # Don't return this client to the pool, in case it's
 524                # broken.
 525                curl.close()
 526            if not ok:
 527                _logger.debug("Request fail: GET %s => %s: %s",
 528                              url, type(self._result['error']), str(self._result['error']))
 529                return None
 530            if method == "HEAD":
 531                _logger.info("HEAD %s: %s bytes",
 532                         self._result['status_code'],
 533                         self._result.get('content-length'))
 534                if self._result['headers'].get('x-keep-locator'):
 535                    # This is a response to a remote block copy request, return
 536                    # the local copy block locator.
 537                    return self._result['headers'].get('x-keep-locator')
 538                return True
 539
 540            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 541                         self._result['status_code'],
 542                         len(self._result['body']),
 543                         t.msecs,
 544                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 545
 546            if self.download_counter:
 547                self.download_counter.add(len(self._result['body']))
 548            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 549            if resp_md5 != locator.md5sum:
 550                _logger.warning("Checksum fail: md5(%s) = %s",
 551                                url, resp_md5)
 552                self._result['error'] = arvados.errors.HttpError(
 553                    0, 'Checksum fail')
 554                return None
 555            return self._result['body']
 556
 557        def put(self, hash_s, body, timeout=None, headers={}):
 558            put_headers = copy.copy(self.put_headers)
 559            put_headers.update(headers)
 560            url = self.root + hash_s
 561            _logger.debug("Request: PUT %s", url)
 562            curl = self._get_user_agent()
 563            ok = None
 564            try:
 565                with timer.Timer() as t:
 566                    self._headers = {}
 567                    body_reader = BytesIO(body)
 568                    response_body = BytesIO()
 569                    curl.setopt(pycurl.NOSIGNAL, 1)
 570                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 571                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 572                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 573                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 574                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 575                    # response) instead of sending the request body immediately.
 576                    # This allows the server to reject the request if the request
 577                    # is invalid or the server is read-only, without waiting for
 578                    # the client to send the entire block.
 579                    curl.setopt(pycurl.UPLOAD, True)
 580                    curl.setopt(pycurl.INFILESIZE, len(body))
 581                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 582                    curl.setopt(pycurl.HTTPHEADER, [
 583                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 584                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 585                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 586                    if self.insecure:
 587                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 588                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 589                    else:
 590                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 591                    self._setcurltimeouts(curl, timeout)
 592                    try:
 593                        curl.perform()
 594                    except Exception as e:
 595                        raise arvados.errors.HttpError(0, str(e))
 596                    finally:
 597                        if self._socket:
 598                            self._socket.close()
 599                            self._socket = None
 600                    self._result = {
 601                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 602                        'body': response_body.getvalue().decode('utf-8'),
 603                        'headers': self._headers,
 604                        'error': False,
 605                    }
 606                ok = retry.check_http_response_success(self._result['status_code'])
 607                if not ok:
 608                    self._result['error'] = arvados.errors.HttpError(
 609                        self._result['status_code'],
 610                        self._headers.get('x-status-line', 'Error'))
 611            except self.HTTP_ERRORS as e:
 612                self._result = {
 613                    'error': e,
 614                }
 615            self._usable = ok != False # still usable if ok is True or None
 616            if self._result.get('status_code', None):
 617                # Client is functional. See comment in get().
 618                self._put_user_agent(curl)
 619            else:
 620                curl.close()
 621            if not ok:
 622                _logger.debug("Request fail: PUT %s => %s: %s",
 623                              url, type(self._result['error']), str(self._result['error']))
 624                return False
 625            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 626                         self._result['status_code'],
 627                         len(body),
 628                         t.msecs,
 629                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 630            if self.upload_counter:
 631                self.upload_counter.add(len(body))
 632            return True
 633
 634
 635    class KeepWriterQueue(queue.Queue):
 636        def __init__(self, copies, classes=[]):
 637            queue.Queue.__init__(self) # Old-style superclass
 638            self.wanted_copies = copies
 639            self.wanted_storage_classes = classes
 640            self.successful_copies = 0
 641            self.confirmed_storage_classes = {}
 642            self.response = None
 643            self.storage_classes_tracking = True
 644            self.queue_data_lock = threading.RLock()
 645            self.pending_tries = max(copies, len(classes))
 646            self.pending_tries_notification = threading.Condition()
 647
 648        def write_success(self, response, replicas_nr, classes_confirmed):
 649            with self.queue_data_lock:
 650                self.successful_copies += replicas_nr
 651                if classes_confirmed is None:
 652                    self.storage_classes_tracking = False
 653                elif self.storage_classes_tracking:
 654                    for st_class, st_copies in classes_confirmed.items():
 655                        try:
 656                            self.confirmed_storage_classes[st_class] += st_copies
 657                        except KeyError:
 658                            self.confirmed_storage_classes[st_class] = st_copies
 659                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 660                self.response = response
 661            with self.pending_tries_notification:
 662                self.pending_tries_notification.notify_all()
 663
 664        def write_fail(self, ks):
 665            with self.pending_tries_notification:
 666                self.pending_tries += 1
 667                self.pending_tries_notification.notify()
 668
 669        def pending_copies(self):
 670            with self.queue_data_lock:
 671                return self.wanted_copies - self.successful_copies
 672
 673        def satisfied_classes(self):
 674            with self.queue_data_lock:
 675                if not self.storage_classes_tracking:
 676                    # Notifies disabled storage classes expectation to
 677                    # the outer loop.
 678                    return None
 679            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 680
 681        def pending_classes(self):
 682            with self.queue_data_lock:
 683                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 684                    return []
 685                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 686                for st_class, st_copies in self.confirmed_storage_classes.items():
 687                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 688                        unsatisfied_classes.remove(st_class)
 689                return unsatisfied_classes
 690
 691        def get_next_task(self):
 692            with self.pending_tries_notification:
 693                while True:
 694                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 695                        # This notify_all() is unnecessary --
 696                        # write_success() already called notify_all()
 697                        # when pending<1 became true, so it's not
 698                        # possible for any other thread to be in
 699                        # wait() now -- but it's cheap insurance
 700                        # against deadlock so we do it anyway:
 701                        self.pending_tries_notification.notify_all()
 702                        # Drain the queue and then raise Queue.Empty
 703                        while True:
 704                            self.get_nowait()
 705                            self.task_done()
 706                    elif self.pending_tries > 0:
 707                        service, service_root = self.get_nowait()
 708                        if service.finished():
 709                            self.task_done()
 710                            continue
 711                        self.pending_tries -= 1
 712                        return service, service_root
 713                    elif self.empty():
 714                        self.pending_tries_notification.notify_all()
 715                        raise queue.Empty
 716                    else:
 717                        self.pending_tries_notification.wait()
 718
 719
 720    class KeepWriterThreadPool(object):
 721        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 722            self.total_task_nr = 0
 723            if (not max_service_replicas) or (max_service_replicas >= copies):
 724                num_threads = 1
 725            else:
 726                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 727            _logger.debug("Pool max threads is %d", num_threads)
 728            self.workers = []
 729            self.queue = KeepClient.KeepWriterQueue(copies, classes)
 730            # Create workers
 731            for _ in range(num_threads):
 732                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
 733                self.workers.append(w)
 734
 735        def add_task(self, ks, service_root):
 736            self.queue.put((ks, service_root))
 737            self.total_task_nr += 1
 738
 739        def done(self):
 740            return self.queue.successful_copies, self.queue.satisfied_classes()
 741
 742        def join(self):
 743            # Start workers
 744            for worker in self.workers:
 745                worker.start()
 746            # Wait for finished work
 747            self.queue.join()
 748
 749        def response(self):
 750            return self.queue.response
 751
 752
 753    class KeepWriterThread(threading.Thread):
 754        class TaskFailed(RuntimeError): pass
 755
 756        def __init__(self, queue, data, data_hash, timeout=None):
 757            super(KeepClient.KeepWriterThread, self).__init__()
 758            self.timeout = timeout
 759            self.queue = queue
 760            self.data = data
 761            self.data_hash = data_hash
 762            self.daemon = True
 763
 764        def run(self):
 765            while True:
 766                try:
 767                    service, service_root = self.queue.get_next_task()
 768                except queue.Empty:
 769                    return
 770                try:
 771                    locator, copies, classes = self.do_task(service, service_root)
 772                except Exception as e:
 773                    if not isinstance(e, self.TaskFailed):
 774                        _logger.exception("Exception in KeepWriterThread")
 775                    self.queue.write_fail(service)
 776                else:
 777                    self.queue.write_success(locator, copies, classes)
 778                finally:
 779                    self.queue.task_done()
 780
 781        def do_task(self, service, service_root):
 782            classes = self.queue.pending_classes()
 783            headers = {}
 784            if len(classes) > 0:
 785                classes.sort()
 786                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 787            success = bool(service.put(self.data_hash,
 788                                        self.data,
 789                                        timeout=self.timeout,
 790                                        headers=headers))
 791            result = service.last_result()
 792
 793            if not success:
 794                if result.get('status_code'):
 795                    _logger.debug("Request fail: PUT %s => %s %s",
 796                                  self.data_hash,
 797                                  result.get('status_code'),
 798                                  result.get('body'))
 799                raise self.TaskFailed()
 800
 801            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
 802                          str(threading.current_thread()),
 803                          self.data_hash,
 804                          len(self.data),
 805                          service_root)
 806            try:
 807                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 808            except (KeyError, ValueError):
 809                replicas_stored = 1
 810
 811            classes_confirmed = {}
 812            try:
 813                scch = result['headers']['x-keep-storage-classes-confirmed']
 814                for confirmation in scch.replace(' ', '').split(','):
 815                    if '=' in confirmation:
 816                        stored_class, stored_copies = confirmation.split('=')[:2]
 817                        classes_confirmed[stored_class] = int(stored_copies)
 818            except (KeyError, ValueError):
 819                # Storage classes confirmed header missing or corrupt
 820                classes_confirmed = None
 821
 822            return result['body'].strip(), replicas_stored, classes_confirmed
 823
 824
 825    def __init__(self, api_client=None, proxy=None,
 826                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 827                 api_token=None, local_store=None, block_cache=None,
 828                 num_retries=10, session=None, num_prefetch_threads=None):
 829        """Initialize a new KeepClient.
 830
 831        Arguments:
 832        :api_client:
 833          The API client to use to find Keep services.  If not
 834          provided, KeepClient will build one from available Arvados
 835          configuration.
 836
 837        :proxy:
 838          If specified, this KeepClient will send requests to this Keep
 839          proxy.  Otherwise, KeepClient will fall back to the setting of the
 840          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 841          If you want to KeepClient does not use a proxy, pass in an empty
 842          string.
 843
 844        :timeout:
 845          The initial timeout (in seconds) for HTTP requests to Keep
 846          non-proxy servers.  A tuple of three floats is interpreted as
 847          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 848          will be aborted if the average traffic rate falls below
 849          minimum_bandwidth bytes per second over an interval of read_timeout
 850          seconds. Because timeouts are often a result of transient server
 851          load, the actual connection timeout will be increased by a factor
 852          of two on each retry.
 853          Default: (2, 256, 32768).
 854
 855        :proxy_timeout:
 856          The initial timeout (in seconds) for HTTP requests to
 857          Keep proxies. A tuple of three floats is interpreted as
 858          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 859          described above for adjusting connection timeouts on retry also
 860          applies.
 861          Default: (20, 256, 32768).
 862
 863        :api_token:
 864          If you're not using an API client, but only talking
 865          directly to a Keep proxy, this parameter specifies an API token
 866          to authenticate Keep requests.  It is an error to specify both
 867          api_client and api_token.  If you specify neither, KeepClient
 868          will use one available from the Arvados configuration.
 869
 870        :local_store:
 871          If specified, this KeepClient will bypass Keep
 872          services, and save data to the named directory.  If unspecified,
 873          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 874          environment variable.  If you want to ensure KeepClient does not
 875          use local storage, pass in an empty string.  This is primarily
 876          intended to mock a server for testing.
 877
 878        :num_retries:
 879          The default number of times to retry failed requests.
 880          This will be used as the default num_retries value when get() and
 881          put() are called.  Default 10.
 882        """
 883        self.lock = threading.Lock()
 884        if proxy is None:
 885            if config.get('ARVADOS_KEEP_SERVICES'):
 886                proxy = config.get('ARVADOS_KEEP_SERVICES')
 887            else:
 888                proxy = config.get('ARVADOS_KEEP_PROXY')
 889        if api_token is None:
 890            if api_client is None:
 891                api_token = config.get('ARVADOS_API_TOKEN')
 892            else:
 893                api_token = api_client.api_token
 894        elif api_client is not None:
 895            raise ValueError(
 896                "can't build KeepClient with both API client and token")
 897        if local_store is None:
 898            local_store = os.environ.get('KEEP_LOCAL_STORE')
 899
 900        if api_client is None:
 901            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 902        else:
 903            self.insecure = api_client.insecure
 904
 905        self.block_cache = block_cache if block_cache else KeepBlockCache()
 906        self.timeout = timeout
 907        self.proxy_timeout = proxy_timeout
 908        self._user_agent_pool = queue.LifoQueue()
 909        self.upload_counter = Counter()
 910        self.download_counter = Counter()
 911        self.put_counter = Counter()
 912        self.get_counter = Counter()
 913        self.hits_counter = Counter()
 914        self.misses_counter = Counter()
 915        self._storage_classes_unsupported_warning = False
 916        self._default_classes = []
 917        if num_prefetch_threads is not None:
 918            self.num_prefetch_threads = num_prefetch_threads
 919        else:
 920            self.num_prefetch_threads = 2
 921        self._prefetch_queue = None
 922        self._prefetch_threads = None
 923
 924        if local_store:
 925            self.local_store = local_store
 926            self.head = self.local_store_head
 927            self.get = self.local_store_get
 928            self.put = self.local_store_put
 929        else:
 930            self.num_retries = num_retries
 931            self.max_replicas_per_service = None
 932            if proxy:
 933                proxy_uris = proxy.split()
 934                for i in range(len(proxy_uris)):
 935                    if not proxy_uris[i].endswith('/'):
 936                        proxy_uris[i] += '/'
 937                    # URL validation
 938                    url = urllib.parse.urlparse(proxy_uris[i])
 939                    if not (url.scheme and url.netloc):
 940                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 941                self.api_token = api_token
 942                self._gateway_services = {}
 943                self._keep_services = [{
 944                    'uuid': "00000-bi6l4-%015d" % idx,
 945                    'service_type': 'proxy',
 946                    '_service_root': uri,
 947                    } for idx, uri in enumerate(proxy_uris)]
 948                self._writable_services = self._keep_services
 949                self.using_proxy = True
 950                self._static_services_list = True
 951            else:
 952                # It's important to avoid instantiating an API client
 953                # unless we actually need one, for testing's sake.
 954                if api_client is None:
 955                    api_client = arvados.api('v1')
 956                self.api_client = api_client
 957                self.api_token = api_client.api_token
 958                self._gateway_services = {}
 959                self._keep_services = None
 960                self._writable_services = None
 961                self.using_proxy = None
 962                self._static_services_list = False
 963                try:
 964                    self._default_classes = [
 965                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 966                except KeyError:
 967                    # We're talking to an old cluster
 968                    pass
 969
 970    def current_timeout(self, attempt_number):
 971        """Return the appropriate timeout to use for this client.
 972
 973        The proxy timeout setting if the backend service is currently a proxy,
 974        the regular timeout setting otherwise.  The `attempt_number` indicates
 975        how many times the operation has been tried already (starting from 0
 976        for the first try), and scales the connection timeout portion of the
 977        return value accordingly.
 978
 979        """
 980        # TODO(twp): the timeout should be a property of a
 981        # KeepService, not a KeepClient. See #4488.
 982        t = self.proxy_timeout if self.using_proxy else self.timeout
 983        if len(t) == 2:
 984            return (t[0] * (1 << attempt_number), t[1])
 985        else:
 986            return (t[0] * (1 << attempt_number), t[1], t[2])
 987    def _any_nondisk_services(self, service_list):
 988        return any(ks.get('service_type', 'disk') != 'disk'
 989                   for ks in service_list)
 990
 991    def build_services_list(self, force_rebuild=False):
 992        if (self._static_services_list or
 993              (self._keep_services and not force_rebuild)):
 994            return
 995        with self.lock:
 996            try:
 997                keep_services = self.api_client.keep_services().accessible()
 998            except Exception:  # API server predates Keep services.
 999                keep_services = self.api_client.keep_disks().list()
1000
1001            # Gateway services are only used when specified by UUID,
1002            # so there's nothing to gain by filtering them by
1003            # service_type.
1004            self._gateway_services = {ks['uuid']: ks for ks in
1005                                      keep_services.execute()['items']}
1006            if not self._gateway_services:
1007                raise arvados.errors.NoKeepServersError()
1008
1009            # Precompute the base URI for each service.
1010            for r in self._gateway_services.values():
1011                host = r['service_host']
1012                if not host.startswith('[') and host.find(':') >= 0:
1013                    # IPv6 URIs must be formatted like http://[::1]:80/...
1014                    host = '[' + host + ']'
1015                r['_service_root'] = "{}://{}:{:d}/".format(
1016                    'https' if r['service_ssl_flag'] else 'http',
1017                    host,
1018                    r['service_port'])
1019
1020            _logger.debug(str(self._gateway_services))
1021            self._keep_services = [
1022                ks for ks in self._gateway_services.values()
1023                if not ks.get('service_type', '').startswith('gateway:')]
1024            self._writable_services = [ks for ks in self._keep_services
1025                                       if not ks.get('read_only')]
1026
1027            # For disk type services, max_replicas_per_service is 1
1028            # It is unknown (unlimited) for other service types.
1029            if self._any_nondisk_services(self._writable_services):
1030                self.max_replicas_per_service = None
1031            else:
1032                self.max_replicas_per_service = 1
1033
1034    def _service_weight(self, data_hash, service_uuid):
1035        """Compute the weight of a Keep service endpoint for a data
1036        block with a known hash.
1037
1038        The weight is md5(h + u) where u is the last 15 characters of
1039        the service endpoint's UUID.
1040        """
1041        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1042
1043    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1044        """Return an array of Keep service endpoints, in the order in
1045        which they should be probed when reading or writing data with
1046        the given hash+hints.
1047        """
1048        self.build_services_list(force_rebuild)
1049
1050        sorted_roots = []
1051        # Use the services indicated by the given +K@... remote
1052        # service hints, if any are present and can be resolved to a
1053        # URI.
1054        for hint in locator.hints:
1055            if hint.startswith('K@'):
1056                if len(hint) == 7:
1057                    sorted_roots.append(
1058                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1059                elif len(hint) == 29:
1060                    svc = self._gateway_services.get(hint[2:])
1061                    if svc:
1062                        sorted_roots.append(svc['_service_root'])
1063
1064        # Sort the available local services by weight (heaviest first)
1065        # for this locator, and return their service_roots (base URIs)
1066        # in that order.
1067        use_services = self._keep_services
1068        if need_writable:
1069            use_services = self._writable_services
1070        self.using_proxy = self._any_nondisk_services(use_services)
1071        sorted_roots.extend([
1072            svc['_service_root'] for svc in sorted(
1073                use_services,
1074                reverse=True,
1075                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1076        _logger.debug("{}: {}".format(locator, sorted_roots))
1077        return sorted_roots
1078
1079    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1080        # roots_map is a dictionary, mapping Keep service root strings
1081        # to KeepService objects.  Poll for Keep services, and add any
1082        # new ones to roots_map.  Return the current list of local
1083        # root strings.
1084        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1085        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1086        for root in local_roots:
1087            if root not in roots_map:
1088                roots_map[root] = self.KeepService(
1089                    root, self._user_agent_pool,
1090                    upload_counter=self.upload_counter,
1091                    download_counter=self.download_counter,
1092                    headers=headers,
1093                    insecure=self.insecure)
1094        return local_roots
1095
1096    @staticmethod
1097    def _check_loop_result(result):
1098        # KeepClient RetryLoops should save results as a 2-tuple: the
1099        # actual result of the request, and the number of servers available
1100        # to receive the request this round.
1101        # This method returns True if there's a real result, False if
1102        # there are no more servers available, otherwise None.
1103        if isinstance(result, Exception):
1104            return None
1105        result, tried_server_count = result
1106        if (result is not None) and (result is not False):
1107            return True
1108        elif tried_server_count < 1:
1109            _logger.info("No more Keep services to try; giving up")
1110            return False
1111        else:
1112            return None
1113
1114    def get_from_cache(self, loc_s):
1115        """Fetch a block only if is in the cache, otherwise return None."""
1116        locator = KeepLocator(loc_s)
1117        slot = self.block_cache.get(locator.md5sum)
1118        if slot is not None and slot.ready.is_set():
1119            return slot.get()
1120        else:
1121            return None
1122
1123    def refresh_signature(self, loc):
1124        """Ask Keep to get the remote block and return its local signature"""
1125        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1126        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1127
1128    @retry.retry_method
1129    def head(self, loc_s, **kwargs):
1130        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1131
1132    @retry.retry_method
1133    def get(self, loc_s, **kwargs):
1134        return self._get_or_head(loc_s, method="GET", **kwargs)
1135
1136    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1137        """Get data from Keep.
1138
1139        This method fetches one or more blocks of data from Keep.  It
1140        sends a request each Keep service registered with the API
1141        server (or the proxy provided when this client was
1142        instantiated), then each service named in location hints, in
1143        sequence.  As soon as one service provides the data, it's
1144        returned.
1145
1146        Arguments:
1147        * loc_s: A string of one or more comma-separated locators to fetch.
1148          This method returns the concatenation of these blocks.
1149        * num_retries: The number of times to retry GET requests to
1150          *each* Keep server if it returns temporary failures, with
1151          exponential backoff.  Note that, in each loop, the method may try
1152          to fetch data from every available Keep service, along with any
1153          that are named in location hints in the locator.  The default value
1154          is set when the KeepClient is initialized.
1155        """
1156        if ',' in loc_s:
1157            return ''.join(self.get(x) for x in loc_s.split(','))
1158
1159        self.get_counter.add(1)
1160
1161        request_id = (request_id or
1162                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1163                      arvados.util.new_request_id())
1164        if headers is None:
1165            headers = {}
1166        headers['X-Request-Id'] = request_id
1167
1168        slot = None
1169        blob = None
1170        try:
1171            locator = KeepLocator(loc_s)
1172            if method == "GET":
1173                while slot is None:
1174                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1175                    if first:
1176                        # Fresh and empty "first time it is used" slot
1177                        break
1178                    if prefetch:
1179                        # this is request for a prefetch to fill in
1180                        # the cache, don't need to wait for the
1181                        # result, so if it is already in flight return
1182                        # immediately.  Clear 'slot' to prevent
1183                        # finally block from calling slot.set()
1184                        if slot.ready.is_set():
1185                            slot.get()
1186                        slot = None
1187                        return None
1188
1189                    blob = slot.get()
1190                    if blob is not None:
1191                        self.hits_counter.add(1)
1192                        return blob
1193
1194                    # If blob is None, this means either
1195                    #
1196                    # (a) another thread was fetching this block and
1197                    # failed with an error or
1198                    #
1199                    # (b) cache thrashing caused the slot to be
1200                    # evicted (content set to None) by another thread
1201                    # between the call to reserve_cache() and get().
1202                    #
1203                    # We'll handle these cases by reserving a new slot
1204                    # and then doing a full GET request.
1205                    slot = None
1206
1207            self.misses_counter.add(1)
1208
1209            # If the locator has hints specifying a prefix (indicating a
1210            # remote keepproxy) or the UUID of a local gateway service,
1211            # read data from the indicated service(s) instead of the usual
1212            # list of local disk services.
1213            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1214                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1215            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1216                               for hint in locator.hints if (
1217                                       hint.startswith('K@') and
1218                                       len(hint) == 29 and
1219                                       self._gateway_services.get(hint[2:])
1220                                       )])
1221            # Map root URLs to their KeepService objects.
1222            roots_map = {
1223                root: self.KeepService(root, self._user_agent_pool,
1224                                       upload_counter=self.upload_counter,
1225                                       download_counter=self.download_counter,
1226                                       headers=headers,
1227                                       insecure=self.insecure)
1228                for root in hint_roots
1229            }
1230
1231            # See #3147 for a discussion of the loop implementation.  Highlights:
1232            # * Refresh the list of Keep services after each failure, in case
1233            #   it's being updated.
1234            # * Retry until we succeed, we're out of retries, or every available
1235            #   service has returned permanent failure.
1236            sorted_roots = []
1237            roots_map = {}
1238            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1239                                   backoff_start=2)
1240            for tries_left in loop:
1241                try:
1242                    sorted_roots = self.map_new_services(
1243                        roots_map, locator,
1244                        force_rebuild=(tries_left < num_retries),
1245                        need_writable=False,
1246                        headers=headers)
1247                except Exception as error:
1248                    loop.save_result(error)
1249                    continue
1250
1251                # Query KeepService objects that haven't returned
1252                # permanent failure, in our specified shuffle order.
1253                services_to_try = [roots_map[root]
1254                                   for root in sorted_roots
1255                                   if roots_map[root].usable()]
1256                for keep_service in services_to_try:
1257                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1258                    if blob is not None:
1259                        break
1260                loop.save_result((blob, len(services_to_try)))
1261
1262            # Always cache the result, then return it if we succeeded.
1263            if loop.success():
1264                return blob
1265        finally:
1266            if slot is not None:
1267                self.block_cache.set(slot, blob)
1268
1269        # Q: Including 403 is necessary for the Keep tests to continue
1270        # passing, but maybe they should expect KeepReadError instead?
1271        not_founds = sum(1 for key in sorted_roots
1272                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1273        service_errors = ((key, roots_map[key].last_result()['error'])
1274                          for key in sorted_roots)
1275        if not roots_map:
1276            raise arvados.errors.KeepReadError(
1277                "[{}] failed to read {}: no Keep services available ({})".format(
1278                    request_id, loc_s, loop.last_result()))
1279        elif not_founds == len(sorted_roots):
1280            raise arvados.errors.NotFoundError(
1281                "[{}] {} not found".format(request_id, loc_s), service_errors)
1282        else:
1283            raise arvados.errors.KeepReadError(
1284                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1285
1286    @retry.retry_method
1287    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1288        """Save data in Keep.
1289
1290        This method will get a list of Keep services from the API server, and
1291        send the data to each one simultaneously in a new thread.  Once the
1292        uploads are finished, if enough copies are saved, this method returns
1293        the most recent HTTP response body.  If requests fail to upload
1294        enough copies, this method raises KeepWriteError.
1295
1296        Arguments:
1297        * data: The string of data to upload.
1298        * copies: The number of copies that the user requires be saved.
1299          Default 2.
1300        * num_retries: The number of times to retry PUT requests to
1301          *each* Keep server if it returns temporary failures, with
1302          exponential backoff.  The default value is set when the
1303          KeepClient is initialized.
1304        * classes: An optional list of storage class names where copies should
1305          be written.
1306        """
1307
1308        classes = classes or self._default_classes
1309
1310        if not isinstance(data, bytes):
1311            data = data.encode()
1312
1313        self.put_counter.add(1)
1314
1315        data_hash = hashlib.md5(data).hexdigest()
1316        loc_s = data_hash + '+' + str(len(data))
1317        if copies < 1:
1318            return loc_s
1319        locator = KeepLocator(loc_s)
1320
1321        request_id = (request_id or
1322                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1323                      arvados.util.new_request_id())
1324        headers = {
1325            'X-Request-Id': request_id,
1326            'X-Keep-Desired-Replicas': str(copies),
1327        }
1328        roots_map = {}
1329        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1330                               backoff_start=2)
1331        done_copies = 0
1332        done_classes = []
1333        for tries_left in loop:
1334            try:
1335                sorted_roots = self.map_new_services(
1336                    roots_map, locator,
1337                    force_rebuild=(tries_left < num_retries),
1338                    need_writable=True,
1339                    headers=headers)
1340            except Exception as error:
1341                loop.save_result(error)
1342                continue
1343
1344            pending_classes = []
1345            if done_classes is not None:
1346                pending_classes = list(set(classes) - set(done_classes))
1347            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1348                                                        data_hash=data_hash,
1349                                                        copies=copies - done_copies,
1350                                                        max_service_replicas=self.max_replicas_per_service,
1351                                                        timeout=self.current_timeout(num_retries - tries_left),
1352                                                        classes=pending_classes)
1353            for service_root, ks in [(root, roots_map[root])
1354                                     for root in sorted_roots]:
1355                if ks.finished():
1356                    continue
1357                writer_pool.add_task(ks, service_root)
1358            writer_pool.join()
1359            pool_copies, pool_classes = writer_pool.done()
1360            done_copies += pool_copies
1361            if (done_classes is not None) and (pool_classes is not None):
1362                done_classes += pool_classes
1363                loop.save_result(
1364                    (done_copies >= copies and set(done_classes) == set(classes),
1365                    writer_pool.total_task_nr))
1366            else:
1367                # Old keepstore contacted without storage classes support:
1368                # success is determined only by successful copies.
1369                #
1370                # Disable storage classes tracking from this point forward.
1371                if not self._storage_classes_unsupported_warning:
1372                    self._storage_classes_unsupported_warning = True
1373                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1374                done_classes = None
1375                loop.save_result(
1376                    (done_copies >= copies, writer_pool.total_task_nr))
1377
1378        if loop.success():
1379            return writer_pool.response()
1380        if not roots_map:
1381            raise arvados.errors.KeepWriteError(
1382                "[{}] failed to write {}: no Keep services available ({})".format(
1383                    request_id, data_hash, loop.last_result()))
1384        else:
1385            service_errors = ((key, roots_map[key].last_result()['error'])
1386                              for key in sorted_roots
1387                              if roots_map[key].last_result()['error'])
1388            raise arvados.errors.KeepWriteError(
1389                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1390                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1391
1392    def _block_prefetch_worker(self):
1393        """The background downloader thread."""
1394        while True:
1395            try:
1396                b = self._prefetch_queue.get()
1397                if b is None:
1398                    return
1399                self.get(b, prefetch=True)
1400            except Exception:
1401                _logger.exception("Exception doing block prefetch")
1402
1403    def _start_prefetch_threads(self):
1404        if self._prefetch_threads is None:
1405            with self.lock:
1406                if self._prefetch_threads is not None:
1407                    return
1408                self._prefetch_queue = queue.Queue()
1409                self._prefetch_threads = []
1410                for i in range(0, self.num_prefetch_threads):
1411                    thread = threading.Thread(target=self._block_prefetch_worker)
1412                    self._prefetch_threads.append(thread)
1413                    thread.daemon = True
1414                    thread.start()
1415
1416    def block_prefetch(self, locator):
1417        """
1418        This relies on the fact that KeepClient implements a block cache,
1419        so repeated requests for the same block will not result in repeated
1420        downloads (unless the block is evicted from the cache.)  This method
1421        does not block.
1422        """
1423
1424        if self.block_cache.get(locator) is not None:
1425            return
1426
1427        self._start_prefetch_threads()
1428        self._prefetch_queue.put(locator)
1429
1430    def stop_prefetch_threads(self):
1431        with self.lock:
1432            if self._prefetch_threads is not None:
1433                for t in self._prefetch_threads:
1434                    self._prefetch_queue.put(None)
1435                for t in self._prefetch_threads:
1436                    t.join()
1437            self._prefetch_threads = None
1438            self._prefetch_queue = None
1439
1440    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1441        """A stub for put().
1442
1443        This method is used in place of the real put() method when
1444        using local storage (see constructor's local_store argument).
1445
1446        copies and num_retries arguments are ignored: they are here
1447        only for the sake of offering the same call signature as
1448        put().
1449
1450        Data stored this way can be retrieved via local_store_get().
1451        """
1452        md5 = hashlib.md5(data).hexdigest()
1453        locator = '%s+%d' % (md5, len(data))
1454        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1455            f.write(data)
1456        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1457                  os.path.join(self.local_store, md5))
1458        return locator
1459
1460    def local_store_get(self, loc_s, num_retries=None):
1461        """Companion to local_store_put()."""
1462        try:
1463            locator = KeepLocator(loc_s)
1464        except ValueError:
1465            raise arvados.errors.NotFoundError(
1466                "Invalid data locator: '%s'" % loc_s)
1467        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1468            return b''
1469        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1470            return f.read()
1471
1472    def local_store_head(self, loc_s, num_retries=None):
1473        """Companion to local_store_put()."""
1474        try:
1475            locator = KeepLocator(loc_s)
1476        except ValueError:
1477            raise arvados.errors.NotFoundError(
1478                "Invalid data locator: '%s'" % loc_s)
1479        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1480            return True
1481        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1482            return True
global_client_object = None
class KeepLocator:
 65class KeepLocator(object):
 66    EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
 67    HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
 68
 69    def __init__(self, locator_str):
 70        self.hints = []
 71        self._perm_sig = None
 72        self._perm_expiry = None
 73        pieces = iter(locator_str.split('+'))
 74        self.md5sum = next(pieces)
 75        try:
 76            self.size = int(next(pieces))
 77        except StopIteration:
 78            self.size = None
 79        for hint in pieces:
 80            if self.HINT_RE.match(hint) is None:
 81                raise ValueError("invalid hint format: {}".format(hint))
 82            elif hint.startswith('A'):
 83                self.parse_permission_hint(hint)
 84            else:
 85                self.hints.append(hint)
 86
 87    def __str__(self):
 88        return '+'.join(
 89            native_str(s)
 90            for s in [self.md5sum, self.size,
 91                      self.permission_hint()] + self.hints
 92            if s is not None)
 93
 94    def stripped(self):
 95        if self.size is not None:
 96            return "%s+%i" % (self.md5sum, self.size)
 97        else:
 98            return self.md5sum
 99
100    def _make_hex_prop(name, length):
101        # Build and return a new property with the given name that
102        # must be a hex string of the given length.
103        data_name = '_{}'.format(name)
104        def getter(self):
105            return getattr(self, data_name)
106        def setter(self, hex_str):
107            if not arvados.util.is_hex(hex_str, length):
108                raise ValueError("{} is not a {}-digit hex string: {!r}".
109                                 format(name, length, hex_str))
110            setattr(self, data_name, hex_str)
111        return property(getter, setter)
112
113    md5sum = _make_hex_prop('md5sum', 32)
114    perm_sig = _make_hex_prop('perm_sig', 40)
115
116    @property
117    def perm_expiry(self):
118        return self._perm_expiry
119
120    @perm_expiry.setter
121    def perm_expiry(self, value):
122        if not arvados.util.is_hex(value, 1, 8):
123            raise ValueError(
124                "permission timestamp must be a hex Unix timestamp: {}".
125                format(value))
126        self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
127
128    def permission_hint(self):
129        data = [self.perm_sig, self.perm_expiry]
130        if None in data:
131            return None
132        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
133        return "A{}@{:08x}".format(*data)
134
135    def parse_permission_hint(self, s):
136        try:
137            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138        except IndexError:
139            raise ValueError("bad permission hint {}".format(s))
140
141    def permission_expired(self, as_of_dt=None):
142        if self.perm_expiry is None:
143            return False
144        elif as_of_dt is None:
145            as_of_dt = datetime.datetime.now()
146        return self.perm_expiry <= as_of_dt
KeepLocator(locator_str)
69    def __init__(self, locator_str):
70        self.hints = []
71        self._perm_sig = None
72        self._perm_expiry = None
73        pieces = iter(locator_str.split('+'))
74        self.md5sum = next(pieces)
75        try:
76            self.size = int(next(pieces))
77        except StopIteration:
78            self.size = None
79        for hint in pieces:
80            if self.HINT_RE.match(hint) is None:
81                raise ValueError("invalid hint format: {}".format(hint))
82            elif hint.startswith('A'):
83                self.parse_permission_hint(hint)
84            else:
85                self.hints.append(hint)
EPOCH_DATETIME = datetime.datetime(1970, 1, 1, 0, 0)
HINT_RE = re.compile('^[A-Z][A-Za-z0-9@_-]+$')
hints
md5sum
104        def getter(self):
105            return getattr(self, data_name)
def stripped(self):
94    def stripped(self):
95        if self.size is not None:
96            return "%s+%i" % (self.md5sum, self.size)
97        else:
98            return self.md5sum
perm_sig
104        def getter(self):
105            return getattr(self, data_name)
perm_expiry
116    @property
117    def perm_expiry(self):
118        return self._perm_expiry
def permission_hint(self):
128    def permission_hint(self):
129        data = [self.perm_sig, self.perm_expiry]
130        if None in data:
131            return None
132        data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
133        return "A{}@{:08x}".format(*data)
def parse_permission_hint(self, s):
135    def parse_permission_hint(self, s):
136        try:
137            self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
138        except IndexError:
139            raise ValueError("bad permission hint {}".format(s))
def permission_expired(self, as_of_dt=None):
141    def permission_expired(self, as_of_dt=None):
142        if self.perm_expiry is None:
143            return False
144        elif as_of_dt is None:
145            as_of_dt = datetime.datetime.now()
146        return self.perm_expiry <= as_of_dt
class Keep:
149class Keep(object):
150    """Simple interface to a global KeepClient object.
151
152    THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
153    own API client.  The global KeepClient will build an API client from the
154    current Arvados configuration, which may not match the one you built.
155    """
156    _last_key = None
157
158    @classmethod
159    def global_client_object(cls):
160        global global_client_object
161        # Previously, KeepClient would change its behavior at runtime based
162        # on these configuration settings.  We simulate that behavior here
163        # by checking the values and returning a new KeepClient if any of
164        # them have changed.
165        key = (config.get('ARVADOS_API_HOST'),
166               config.get('ARVADOS_API_TOKEN'),
167               config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
168               config.get('ARVADOS_KEEP_PROXY'),
169               os.environ.get('KEEP_LOCAL_STORE'))
170        if (global_client_object is None) or (cls._last_key != key):
171            global_client_object = KeepClient()
172            cls._last_key = key
173        return global_client_object
174
175    @staticmethod
176    def get(locator, **kwargs):
177        return Keep.global_client_object().get(locator, **kwargs)
178
179    @staticmethod
180    def put(data, **kwargs):
181        return Keep.global_client_object().put(data, **kwargs)

Simple interface to a global KeepClient object.

THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your own API client. The global KeepClient will build an API client from the current Arvados configuration, which may not match the one you built.

@classmethod
def global_client_object(cls):
158    @classmethod
159    def global_client_object(cls):
160        global global_client_object
161        # Previously, KeepClient would change its behavior at runtime based
162        # on these configuration settings.  We simulate that behavior here
163        # by checking the values and returning a new KeepClient if any of
164        # them have changed.
165        key = (config.get('ARVADOS_API_HOST'),
166               config.get('ARVADOS_API_TOKEN'),
167               config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
168               config.get('ARVADOS_KEEP_PROXY'),
169               os.environ.get('KEEP_LOCAL_STORE'))
170        if (global_client_object is None) or (cls._last_key != key):
171            global_client_object = KeepClient()
172            cls._last_key = key
173        return global_client_object
@staticmethod
def get(locator, **kwargs):
175    @staticmethod
176    def get(locator, **kwargs):
177        return Keep.global_client_object().get(locator, **kwargs)
@staticmethod
def put(data, **kwargs):
179    @staticmethod
180    def put(data, **kwargs):
181        return Keep.global_client_object().put(data, **kwargs)
class KeepBlockCache:
183class KeepBlockCache(object):
184    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
185        self.cache_max = cache_max
186        self._cache = collections.OrderedDict()
187        self._cache_lock = threading.Lock()
188        self._max_slots = max_slots
189        self._disk_cache = disk_cache
190        self._disk_cache_dir = disk_cache_dir
191        self._cache_updating = threading.Condition(self._cache_lock)
192
193        if self._disk_cache and self._disk_cache_dir is None:
194            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
195            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196
197        if self._max_slots == 0:
198            if self._disk_cache:
199                # Each block uses two file descriptors, one used to
200                # open it initially and hold the flock(), and a second
201                # hidden one used by mmap().
202                #
203                # Set max slots to 1/8 of maximum file handles.  This
204                # means we'll use at most 1/4 of total file handles.
205                #
206                # NOFILE typically defaults to 1024 on Linux so this
207                # is 128 slots (256 file handles), which means we can
208                # cache up to 8 GiB of 64 MiB blocks.  This leaves
209                # 768 file handles for sockets and other stuff.
210                #
211                # When we want the ability to have more cache (e.g. in
212                # arv-mount) we'll increase rlimit before calling
213                # this.
214                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
215            else:
216                # RAM cache slots
217                self._max_slots = 512
218
219        if self.cache_max == 0:
220            if self._disk_cache:
221                fs = os.statvfs(self._disk_cache_dir)
222                # Calculation of available space incorporates existing cache usage
223                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
224                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
225                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226                # pick smallest of:
227                # 10% of total disk size
228                # 25% of available space
229                # max_slots * 64 MiB
230                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
231            else:
232                # 256 MiB in RAM
233                self.cache_max = (256 * 1024 * 1024)
234
235        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
236
237        self.cache_total = 0
238        if self._disk_cache:
239            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
240            for slot in self._cache.values():
241                self.cache_total += slot.size()
242            self.cap_cache()
243
244    class CacheSlot(object):
245        __slots__ = ("locator", "ready", "content")
246
247        def __init__(self, locator):
248            self.locator = locator
249            self.ready = threading.Event()
250            self.content = None
251
252        def get(self):
253            self.ready.wait()
254            return self.content
255
256        def set(self, value):
257            if self.content is not None:
258                return False
259            self.content = value
260            self.ready.set()
261            return True
262
263        def size(self):
264            if self.content is None:
265                return 0
266            else:
267                return len(self.content)
268
269        def evict(self):
270            self.content = None
271
272
273    def _resize_cache(self, cache_max, max_slots):
274        # Try and make sure the contents of the cache do not exceed
275        # the supplied maximums.
276
277        if self.cache_total <= cache_max and len(self._cache) <= max_slots:
278            return
279
280        _evict_candidates = collections.deque(self._cache.values())
281        while _evict_candidates and (self.cache_total > cache_max or len(self._cache) > max_slots):
282            slot = _evict_candidates.popleft()
283            if not slot.ready.is_set():
284                continue
285
286            sz = slot.size()
287            slot.evict()
288            self.cache_total -= sz
289            del self._cache[slot.locator]
290
291
292    def cap_cache(self):
293        '''Cap the cache size to self.cache_max'''
294        with self._cache_updating:
295            self._resize_cache(self.cache_max, self._max_slots)
296            self._cache_updating.notify_all()
297
298    def _get(self, locator):
299        # Test if the locator is already in the cache
300        if locator in self._cache:
301            n = self._cache[locator]
302            if n.ready.is_set() and n.content is None:
303                del self._cache[n.locator]
304                return None
305            self._cache.move_to_end(locator)
306            return n
307        if self._disk_cache:
308            # see if it exists on disk
309            n = arvados.diskcache.DiskCacheSlot.get_from_disk(locator, self._disk_cache_dir)
310            if n is not None:
311                self._cache[n.locator] = n
312                self.cache_total += n.size()
313                return n
314        return None
315
316    def get(self, locator):
317        with self._cache_lock:
318            return self._get(locator)
319
320    def reserve_cache(self, locator):
321        '''Reserve a cache slot for the specified locator,
322        or return the existing slot.'''
323        with self._cache_updating:
324            n = self._get(locator)
325            if n:
326                return n, False
327            else:
328                # Add a new cache slot for the locator
329                self._resize_cache(self.cache_max, self._max_slots-1)
330                while len(self._cache) >= self._max_slots:
331                    # If there isn't a slot available, need to wait
332                    # for something to happen that releases one of the
333                    # cache slots.  Idle for 200 ms or woken up by
334                    # another thread
335                    self._cache_updating.wait(timeout=0.2)
336                    self._resize_cache(self.cache_max, self._max_slots-1)
337
338                if self._disk_cache:
339                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
340                else:
341                    n = KeepBlockCache.CacheSlot(locator)
342                self._cache[n.locator] = n
343                return n, True
344
345    def set(self, slot, blob):
346        try:
347            if slot.set(blob):
348                self.cache_total += slot.size()
349            return
350        except OSError as e:
351            if e.errno == errno.ENOMEM:
352                # Reduce max slots to current - 4, cap cache and retry
353                with self._cache_lock:
354                    self._max_slots = max(4, len(self._cache) - 4)
355            elif e.errno == errno.ENOSPC:
356                # Reduce disk max space to current - 256 MiB, cap cache and retry
357                with self._cache_lock:
358                    sm = sum(st.size() for st in self._cache.values())
359                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
360            elif e.errno == errno.ENODEV:
361                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
362        except Exception as e:
363            pass
364        finally:
365            # Check if we should evict things from the cache.  Either
366            # because we added a new thing or there was an error and
367            # we possibly adjusted the limits down, so we might need
368            # to push something out.
369            self.cap_cache()
370
371        try:
372            # Only gets here if there was an error the first time. The
373            # exception handler adjusts limits downward in some cases
374            # to free up resources, which would make the operation
375            # succeed.
376            if slot.set(blob):
377                self.cache_total += slot.size()
378        except Exception as e:
379            # It failed again.  Give up.
380            slot.set(None)
381            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
382
383        self.cap_cache()
KeepBlockCache(cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None)
184    def __init__(self, cache_max=0, max_slots=0, disk_cache=False, disk_cache_dir=None):
185        self.cache_max = cache_max
186        self._cache = collections.OrderedDict()
187        self._cache_lock = threading.Lock()
188        self._max_slots = max_slots
189        self._disk_cache = disk_cache
190        self._disk_cache_dir = disk_cache_dir
191        self._cache_updating = threading.Condition(self._cache_lock)
192
193        if self._disk_cache and self._disk_cache_dir is None:
194            self._disk_cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "arvados", "keep")
195            os.makedirs(self._disk_cache_dir, mode=0o700, exist_ok=True)
196
197        if self._max_slots == 0:
198            if self._disk_cache:
199                # Each block uses two file descriptors, one used to
200                # open it initially and hold the flock(), and a second
201                # hidden one used by mmap().
202                #
203                # Set max slots to 1/8 of maximum file handles.  This
204                # means we'll use at most 1/4 of total file handles.
205                #
206                # NOFILE typically defaults to 1024 on Linux so this
207                # is 128 slots (256 file handles), which means we can
208                # cache up to 8 GiB of 64 MiB blocks.  This leaves
209                # 768 file handles for sockets and other stuff.
210                #
211                # When we want the ability to have more cache (e.g. in
212                # arv-mount) we'll increase rlimit before calling
213                # this.
214                self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
215            else:
216                # RAM cache slots
217                self._max_slots = 512
218
219        if self.cache_max == 0:
220            if self._disk_cache:
221                fs = os.statvfs(self._disk_cache_dir)
222                # Calculation of available space incorporates existing cache usage
223                existing_usage = arvados.diskcache.DiskCacheSlot.cache_usage(self._disk_cache_dir)
224                avail = (fs.f_bavail * fs.f_bsize + existing_usage) / 4
225                maxdisk = int((fs.f_blocks * fs.f_bsize) * 0.10)
226                # pick smallest of:
227                # 10% of total disk size
228                # 25% of available space
229                # max_slots * 64 MiB
230                self.cache_max = min(min(maxdisk, avail), (self._max_slots * 64 * 1024 * 1024))
231            else:
232                # 256 MiB in RAM
233                self.cache_max = (256 * 1024 * 1024)
234
235        self.cache_max = max(self.cache_max, 64 * 1024 * 1024)
236
237        self.cache_total = 0
238        if self._disk_cache:
239            self._cache = arvados.diskcache.DiskCacheSlot.init_cache(self._disk_cache_dir, self._max_slots)
240            for slot in self._cache.values():
241                self.cache_total += slot.size()
242            self.cap_cache()
cache_max
cache_total
def cap_cache(self):
292    def cap_cache(self):
293        '''Cap the cache size to self.cache_max'''
294        with self._cache_updating:
295            self._resize_cache(self.cache_max, self._max_slots)
296            self._cache_updating.notify_all()

Cap the cache size to self.cache_max

def get(self, locator):
316    def get(self, locator):
317        with self._cache_lock:
318            return self._get(locator)
def reserve_cache(self, locator):
320    def reserve_cache(self, locator):
321        '''Reserve a cache slot for the specified locator,
322        or return the existing slot.'''
323        with self._cache_updating:
324            n = self._get(locator)
325            if n:
326                return n, False
327            else:
328                # Add a new cache slot for the locator
329                self._resize_cache(self.cache_max, self._max_slots-1)
330                while len(self._cache) >= self._max_slots:
331                    # If there isn't a slot available, need to wait
332                    # for something to happen that releases one of the
333                    # cache slots.  Idle for 200 ms or woken up by
334                    # another thread
335                    self._cache_updating.wait(timeout=0.2)
336                    self._resize_cache(self.cache_max, self._max_slots-1)
337
338                if self._disk_cache:
339                    n = arvados.diskcache.DiskCacheSlot(locator, self._disk_cache_dir)
340                else:
341                    n = KeepBlockCache.CacheSlot(locator)
342                self._cache[n.locator] = n
343                return n, True

Reserve a cache slot for the specified locator, or return the existing slot.

def set(self, slot, blob):
345    def set(self, slot, blob):
346        try:
347            if slot.set(blob):
348                self.cache_total += slot.size()
349            return
350        except OSError as e:
351            if e.errno == errno.ENOMEM:
352                # Reduce max slots to current - 4, cap cache and retry
353                with self._cache_lock:
354                    self._max_slots = max(4, len(self._cache) - 4)
355            elif e.errno == errno.ENOSPC:
356                # Reduce disk max space to current - 256 MiB, cap cache and retry
357                with self._cache_lock:
358                    sm = sum(st.size() for st in self._cache.values())
359                    self.cache_max = max((256 * 1024 * 1024), sm - (256 * 1024 * 1024))
360            elif e.errno == errno.ENODEV:
361                _logger.error("Unable to use disk cache: The underlying filesystem does not support memory mapping.")
362        except Exception as e:
363            pass
364        finally:
365            # Check if we should evict things from the cache.  Either
366            # because we added a new thing or there was an error and
367            # we possibly adjusted the limits down, so we might need
368            # to push something out.
369            self.cap_cache()
370
371        try:
372            # Only gets here if there was an error the first time. The
373            # exception handler adjusts limits downward in some cases
374            # to free up resources, which would make the operation
375            # succeed.
376            if slot.set(blob):
377                self.cache_total += slot.size()
378        except Exception as e:
379            # It failed again.  Give up.
380            slot.set(None)
381            raise arvados.errors.KeepCacheError("Unable to save block %s to disk cache: %s" % (slot.locator, e))
382
383        self.cap_cache()
class KeepBlockCache.CacheSlot:
244    class CacheSlot(object):
245        __slots__ = ("locator", "ready", "content")
246
247        def __init__(self, locator):
248            self.locator = locator
249            self.ready = threading.Event()
250            self.content = None
251
252        def get(self):
253            self.ready.wait()
254            return self.content
255
256        def set(self, value):
257            if self.content is not None:
258                return False
259            self.content = value
260            self.ready.set()
261            return True
262
263        def size(self):
264            if self.content is None:
265                return 0
266            else:
267                return len(self.content)
268
269        def evict(self):
270            self.content = None
KeepBlockCache.CacheSlot(locator)
247        def __init__(self, locator):
248            self.locator = locator
249            self.ready = threading.Event()
250            self.content = None
locator
ready
content
def get(self):
252        def get(self):
253            self.ready.wait()
254            return self.content
def set(self, value):
256        def set(self, value):
257            if self.content is not None:
258                return False
259            self.content = value
260            self.ready.set()
261            return True
def size(self):
263        def size(self):
264            if self.content is None:
265                return 0
266            else:
267                return len(self.content)
def evict(self):
269        def evict(self):
270            self.content = None
class Counter:
385class Counter(object):
386    def __init__(self, v=0):
387        self._lk = threading.Lock()
388        self._val = v
389
390    def add(self, v):
391        with self._lk:
392            self._val += v
393
394    def get(self):
395        with self._lk:
396            return self._val
Counter(v=0)
386    def __init__(self, v=0):
387        self._lk = threading.Lock()
388        self._val = v
def add(self, v):
390    def add(self, v):
391        with self._lk:
392            self._val += v
def get(self):
394    def get(self):
395        with self._lk:
396            return self._val
class KeepClient:
 399class KeepClient(object):
 400    DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
 401    DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
 402
 403    class KeepService(PyCurlHelper):
 404        """Make requests to a single Keep service, and track results.
 405
 406        A KeepService is intended to last long enough to perform one
 407        transaction (GET or PUT) against one Keep service. This can
 408        involve calling either get() or put() multiple times in order
 409        to retry after transient failures. However, calling both get()
 410        and put() on a single instance -- or using the same instance
 411        to access two different Keep services -- will not produce
 412        sensible behavior.
 413        """
 414
 415        HTTP_ERRORS = (
 416            socket.error,
 417            ssl.SSLError,
 418            arvados.errors.HttpError,
 419        )
 420
 421        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
 422                     upload_counter=None,
 423                     download_counter=None,
 424                     headers={},
 425                     insecure=False):
 426            super(KeepClient.KeepService, self).__init__()
 427            self.root = root
 428            self._user_agent_pool = user_agent_pool
 429            self._result = {'error': None}
 430            self._usable = True
 431            self._session = None
 432            self._socket = None
 433            self.get_headers = {'Accept': 'application/octet-stream'}
 434            self.get_headers.update(headers)
 435            self.put_headers = headers
 436            self.upload_counter = upload_counter
 437            self.download_counter = download_counter
 438            self.insecure = insecure
 439
 440        def usable(self):
 441            """Is it worth attempting a request?"""
 442            return self._usable
 443
 444        def finished(self):
 445            """Did the request succeed or encounter permanent failure?"""
 446            return self._result['error'] == False or not self._usable
 447
 448        def last_result(self):
 449            return self._result
 450
 451        def _get_user_agent(self):
 452            try:
 453                return self._user_agent_pool.get(block=False)
 454            except queue.Empty:
 455                return pycurl.Curl()
 456
 457        def _put_user_agent(self, ua):
 458            try:
 459                ua.reset()
 460                self._user_agent_pool.put(ua, block=False)
 461            except:
 462                ua.close()
 463
 464        def get(self, locator, method="GET", timeout=None):
 465            # locator is a KeepLocator object.
 466            url = self.root + str(locator)
 467            _logger.debug("Request: %s %s", method, url)
 468            curl = self._get_user_agent()
 469            ok = None
 470            try:
 471                with timer.Timer() as t:
 472                    self._headers = {}
 473                    response_body = BytesIO()
 474                    curl.setopt(pycurl.NOSIGNAL, 1)
 475                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 476                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 477                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 478                    curl.setopt(pycurl.HTTPHEADER, [
 479                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
 480                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 481                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 482                    if self.insecure:
 483                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 484                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 485                    else:
 486                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 487                    if method == "HEAD":
 488                        curl.setopt(pycurl.NOBODY, True)
 489                    else:
 490                        curl.setopt(pycurl.HTTPGET, True)
 491                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 492
 493                    try:
 494                        curl.perform()
 495                    except Exception as e:
 496                        raise arvados.errors.HttpError(0, str(e))
 497                    finally:
 498                        if self._socket:
 499                            self._socket.close()
 500                            self._socket = None
 501                    self._result = {
 502                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 503                        'body': response_body.getvalue(),
 504                        'headers': self._headers,
 505                        'error': False,
 506                    }
 507
 508                ok = retry.check_http_response_success(self._result['status_code'])
 509                if not ok:
 510                    self._result['error'] = arvados.errors.HttpError(
 511                        self._result['status_code'],
 512                        self._headers.get('x-status-line', 'Error'))
 513            except self.HTTP_ERRORS as e:
 514                self._result = {
 515                    'error': e,
 516                }
 517            self._usable = ok != False
 518            if self._result.get('status_code', None):
 519                # The client worked well enough to get an HTTP status
 520                # code, so presumably any problems are just on the
 521                # server side and it's OK to reuse the client.
 522                self._put_user_agent(curl)
 523            else:
 524                # Don't return this client to the pool, in case it's
 525                # broken.
 526                curl.close()
 527            if not ok:
 528                _logger.debug("Request fail: GET %s => %s: %s",
 529                              url, type(self._result['error']), str(self._result['error']))
 530                return None
 531            if method == "HEAD":
 532                _logger.info("HEAD %s: %s bytes",
 533                         self._result['status_code'],
 534                         self._result.get('content-length'))
 535                if self._result['headers'].get('x-keep-locator'):
 536                    # This is a response to a remote block copy request, return
 537                    # the local copy block locator.
 538                    return self._result['headers'].get('x-keep-locator')
 539                return True
 540
 541            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
 542                         self._result['status_code'],
 543                         len(self._result['body']),
 544                         t.msecs,
 545                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
 546
 547            if self.download_counter:
 548                self.download_counter.add(len(self._result['body']))
 549            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
 550            if resp_md5 != locator.md5sum:
 551                _logger.warning("Checksum fail: md5(%s) = %s",
 552                                url, resp_md5)
 553                self._result['error'] = arvados.errors.HttpError(
 554                    0, 'Checksum fail')
 555                return None
 556            return self._result['body']
 557
 558        def put(self, hash_s, body, timeout=None, headers={}):
 559            put_headers = copy.copy(self.put_headers)
 560            put_headers.update(headers)
 561            url = self.root + hash_s
 562            _logger.debug("Request: PUT %s", url)
 563            curl = self._get_user_agent()
 564            ok = None
 565            try:
 566                with timer.Timer() as t:
 567                    self._headers = {}
 568                    body_reader = BytesIO(body)
 569                    response_body = BytesIO()
 570                    curl.setopt(pycurl.NOSIGNAL, 1)
 571                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
 572                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
 573                    curl.setopt(pycurl.URL, url.encode('utf-8'))
 574                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
 575                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
 576                    # response) instead of sending the request body immediately.
 577                    # This allows the server to reject the request if the request
 578                    # is invalid or the server is read-only, without waiting for
 579                    # the client to send the entire block.
 580                    curl.setopt(pycurl.UPLOAD, True)
 581                    curl.setopt(pycurl.INFILESIZE, len(body))
 582                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
 583                    curl.setopt(pycurl.HTTPHEADER, [
 584                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
 585                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
 586                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
 587                    if self.insecure:
 588                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
 589                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
 590                    else:
 591                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
 592                    self._setcurltimeouts(curl, timeout)
 593                    try:
 594                        curl.perform()
 595                    except Exception as e:
 596                        raise arvados.errors.HttpError(0, str(e))
 597                    finally:
 598                        if self._socket:
 599                            self._socket.close()
 600                            self._socket = None
 601                    self._result = {
 602                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
 603                        'body': response_body.getvalue().decode('utf-8'),
 604                        'headers': self._headers,
 605                        'error': False,
 606                    }
 607                ok = retry.check_http_response_success(self._result['status_code'])
 608                if not ok:
 609                    self._result['error'] = arvados.errors.HttpError(
 610                        self._result['status_code'],
 611                        self._headers.get('x-status-line', 'Error'))
 612            except self.HTTP_ERRORS as e:
 613                self._result = {
 614                    'error': e,
 615                }
 616            self._usable = ok != False # still usable if ok is True or None
 617            if self._result.get('status_code', None):
 618                # Client is functional. See comment in get().
 619                self._put_user_agent(curl)
 620            else:
 621                curl.close()
 622            if not ok:
 623                _logger.debug("Request fail: PUT %s => %s: %s",
 624                              url, type(self._result['error']), str(self._result['error']))
 625                return False
 626            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
 627                         self._result['status_code'],
 628                         len(body),
 629                         t.msecs,
 630                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
 631            if self.upload_counter:
 632                self.upload_counter.add(len(body))
 633            return True
 634
 635
 636    class KeepWriterQueue(queue.Queue):
 637        def __init__(self, copies, classes=[]):
 638            queue.Queue.__init__(self) # Old-style superclass
 639            self.wanted_copies = copies
 640            self.wanted_storage_classes = classes
 641            self.successful_copies = 0
 642            self.confirmed_storage_classes = {}
 643            self.response = None
 644            self.storage_classes_tracking = True
 645            self.queue_data_lock = threading.RLock()
 646            self.pending_tries = max(copies, len(classes))
 647            self.pending_tries_notification = threading.Condition()
 648
 649        def write_success(self, response, replicas_nr, classes_confirmed):
 650            with self.queue_data_lock:
 651                self.successful_copies += replicas_nr
 652                if classes_confirmed is None:
 653                    self.storage_classes_tracking = False
 654                elif self.storage_classes_tracking:
 655                    for st_class, st_copies in classes_confirmed.items():
 656                        try:
 657                            self.confirmed_storage_classes[st_class] += st_copies
 658                        except KeyError:
 659                            self.confirmed_storage_classes[st_class] = st_copies
 660                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
 661                self.response = response
 662            with self.pending_tries_notification:
 663                self.pending_tries_notification.notify_all()
 664
 665        def write_fail(self, ks):
 666            with self.pending_tries_notification:
 667                self.pending_tries += 1
 668                self.pending_tries_notification.notify()
 669
 670        def pending_copies(self):
 671            with self.queue_data_lock:
 672                return self.wanted_copies - self.successful_copies
 673
 674        def satisfied_classes(self):
 675            with self.queue_data_lock:
 676                if not self.storage_classes_tracking:
 677                    # Notifies disabled storage classes expectation to
 678                    # the outer loop.
 679                    return None
 680            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
 681
 682        def pending_classes(self):
 683            with self.queue_data_lock:
 684                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
 685                    return []
 686                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
 687                for st_class, st_copies in self.confirmed_storage_classes.items():
 688                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
 689                        unsatisfied_classes.remove(st_class)
 690                return unsatisfied_classes
 691
 692        def get_next_task(self):
 693            with self.pending_tries_notification:
 694                while True:
 695                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
 696                        # This notify_all() is unnecessary --
 697                        # write_success() already called notify_all()
 698                        # when pending<1 became true, so it's not
 699                        # possible for any other thread to be in
 700                        # wait() now -- but it's cheap insurance
 701                        # against deadlock so we do it anyway:
 702                        self.pending_tries_notification.notify_all()
 703                        # Drain the queue and then raise Queue.Empty
 704                        while True:
 705                            self.get_nowait()
 706                            self.task_done()
 707                    elif self.pending_tries > 0:
 708                        service, service_root = self.get_nowait()
 709                        if service.finished():
 710                            self.task_done()
 711                            continue
 712                        self.pending_tries -= 1
 713                        return service, service_root
 714                    elif self.empty():
 715                        self.pending_tries_notification.notify_all()
 716                        raise queue.Empty
 717                    else:
 718                        self.pending_tries_notification.wait()
 719
 720
 721    class KeepWriterThreadPool(object):
 722        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
 723            self.total_task_nr = 0
 724            if (not max_service_replicas) or (max_service_replicas >= copies):
 725                num_threads = 1
 726            else:
 727                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
 728            _logger.debug("Pool max threads is %d", num_threads)
 729            self.workers = []
 730            self.queue = KeepClient.KeepWriterQueue(copies, classes)
 731            # Create workers
 732            for _ in range(num_threads):
 733                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
 734                self.workers.append(w)
 735
 736        def add_task(self, ks, service_root):
 737            self.queue.put((ks, service_root))
 738            self.total_task_nr += 1
 739
 740        def done(self):
 741            return self.queue.successful_copies, self.queue.satisfied_classes()
 742
 743        def join(self):
 744            # Start workers
 745            for worker in self.workers:
 746                worker.start()
 747            # Wait for finished work
 748            self.queue.join()
 749
 750        def response(self):
 751            return self.queue.response
 752
 753
 754    class KeepWriterThread(threading.Thread):
 755        class TaskFailed(RuntimeError): pass
 756
 757        def __init__(self, queue, data, data_hash, timeout=None):
 758            super(KeepClient.KeepWriterThread, self).__init__()
 759            self.timeout = timeout
 760            self.queue = queue
 761            self.data = data
 762            self.data_hash = data_hash
 763            self.daemon = True
 764
 765        def run(self):
 766            while True:
 767                try:
 768                    service, service_root = self.queue.get_next_task()
 769                except queue.Empty:
 770                    return
 771                try:
 772                    locator, copies, classes = self.do_task(service, service_root)
 773                except Exception as e:
 774                    if not isinstance(e, self.TaskFailed):
 775                        _logger.exception("Exception in KeepWriterThread")
 776                    self.queue.write_fail(service)
 777                else:
 778                    self.queue.write_success(locator, copies, classes)
 779                finally:
 780                    self.queue.task_done()
 781
 782        def do_task(self, service, service_root):
 783            classes = self.queue.pending_classes()
 784            headers = {}
 785            if len(classes) > 0:
 786                classes.sort()
 787                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
 788            success = bool(service.put(self.data_hash,
 789                                        self.data,
 790                                        timeout=self.timeout,
 791                                        headers=headers))
 792            result = service.last_result()
 793
 794            if not success:
 795                if result.get('status_code'):
 796                    _logger.debug("Request fail: PUT %s => %s %s",
 797                                  self.data_hash,
 798                                  result.get('status_code'),
 799                                  result.get('body'))
 800                raise self.TaskFailed()
 801
 802            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
 803                          str(threading.current_thread()),
 804                          self.data_hash,
 805                          len(self.data),
 806                          service_root)
 807            try:
 808                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
 809            except (KeyError, ValueError):
 810                replicas_stored = 1
 811
 812            classes_confirmed = {}
 813            try:
 814                scch = result['headers']['x-keep-storage-classes-confirmed']
 815                for confirmation in scch.replace(' ', '').split(','):
 816                    if '=' in confirmation:
 817                        stored_class, stored_copies = confirmation.split('=')[:2]
 818                        classes_confirmed[stored_class] = int(stored_copies)
 819            except (KeyError, ValueError):
 820                # Storage classes confirmed header missing or corrupt
 821                classes_confirmed = None
 822
 823            return result['body'].strip(), replicas_stored, classes_confirmed
 824
 825
 826    def __init__(self, api_client=None, proxy=None,
 827                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
 828                 api_token=None, local_store=None, block_cache=None,
 829                 num_retries=10, session=None, num_prefetch_threads=None):
 830        """Initialize a new KeepClient.
 831
 832        Arguments:
 833        :api_client:
 834          The API client to use to find Keep services.  If not
 835          provided, KeepClient will build one from available Arvados
 836          configuration.
 837
 838        :proxy:
 839          If specified, this KeepClient will send requests to this Keep
 840          proxy.  Otherwise, KeepClient will fall back to the setting of the
 841          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
 842          If you want to KeepClient does not use a proxy, pass in an empty
 843          string.
 844
 845        :timeout:
 846          The initial timeout (in seconds) for HTTP requests to Keep
 847          non-proxy servers.  A tuple of three floats is interpreted as
 848          (connection_timeout, read_timeout, minimum_bandwidth). A connection
 849          will be aborted if the average traffic rate falls below
 850          minimum_bandwidth bytes per second over an interval of read_timeout
 851          seconds. Because timeouts are often a result of transient server
 852          load, the actual connection timeout will be increased by a factor
 853          of two on each retry.
 854          Default: (2, 256, 32768).
 855
 856        :proxy_timeout:
 857          The initial timeout (in seconds) for HTTP requests to
 858          Keep proxies. A tuple of three floats is interpreted as
 859          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
 860          described above for adjusting connection timeouts on retry also
 861          applies.
 862          Default: (20, 256, 32768).
 863
 864        :api_token:
 865          If you're not using an API client, but only talking
 866          directly to a Keep proxy, this parameter specifies an API token
 867          to authenticate Keep requests.  It is an error to specify both
 868          api_client and api_token.  If you specify neither, KeepClient
 869          will use one available from the Arvados configuration.
 870
 871        :local_store:
 872          If specified, this KeepClient will bypass Keep
 873          services, and save data to the named directory.  If unspecified,
 874          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
 875          environment variable.  If you want to ensure KeepClient does not
 876          use local storage, pass in an empty string.  This is primarily
 877          intended to mock a server for testing.
 878
 879        :num_retries:
 880          The default number of times to retry failed requests.
 881          This will be used as the default num_retries value when get() and
 882          put() are called.  Default 10.
 883        """
 884        self.lock = threading.Lock()
 885        if proxy is None:
 886            if config.get('ARVADOS_KEEP_SERVICES'):
 887                proxy = config.get('ARVADOS_KEEP_SERVICES')
 888            else:
 889                proxy = config.get('ARVADOS_KEEP_PROXY')
 890        if api_token is None:
 891            if api_client is None:
 892                api_token = config.get('ARVADOS_API_TOKEN')
 893            else:
 894                api_token = api_client.api_token
 895        elif api_client is not None:
 896            raise ValueError(
 897                "can't build KeepClient with both API client and token")
 898        if local_store is None:
 899            local_store = os.environ.get('KEEP_LOCAL_STORE')
 900
 901        if api_client is None:
 902            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
 903        else:
 904            self.insecure = api_client.insecure
 905
 906        self.block_cache = block_cache if block_cache else KeepBlockCache()
 907        self.timeout = timeout
 908        self.proxy_timeout = proxy_timeout
 909        self._user_agent_pool = queue.LifoQueue()
 910        self.upload_counter = Counter()
 911        self.download_counter = Counter()
 912        self.put_counter = Counter()
 913        self.get_counter = Counter()
 914        self.hits_counter = Counter()
 915        self.misses_counter = Counter()
 916        self._storage_classes_unsupported_warning = False
 917        self._default_classes = []
 918        if num_prefetch_threads is not None:
 919            self.num_prefetch_threads = num_prefetch_threads
 920        else:
 921            self.num_prefetch_threads = 2
 922        self._prefetch_queue = None
 923        self._prefetch_threads = None
 924
 925        if local_store:
 926            self.local_store = local_store
 927            self.head = self.local_store_head
 928            self.get = self.local_store_get
 929            self.put = self.local_store_put
 930        else:
 931            self.num_retries = num_retries
 932            self.max_replicas_per_service = None
 933            if proxy:
 934                proxy_uris = proxy.split()
 935                for i in range(len(proxy_uris)):
 936                    if not proxy_uris[i].endswith('/'):
 937                        proxy_uris[i] += '/'
 938                    # URL validation
 939                    url = urllib.parse.urlparse(proxy_uris[i])
 940                    if not (url.scheme and url.netloc):
 941                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
 942                self.api_token = api_token
 943                self._gateway_services = {}
 944                self._keep_services = [{
 945                    'uuid': "00000-bi6l4-%015d" % idx,
 946                    'service_type': 'proxy',
 947                    '_service_root': uri,
 948                    } for idx, uri in enumerate(proxy_uris)]
 949                self._writable_services = self._keep_services
 950                self.using_proxy = True
 951                self._static_services_list = True
 952            else:
 953                # It's important to avoid instantiating an API client
 954                # unless we actually need one, for testing's sake.
 955                if api_client is None:
 956                    api_client = arvados.api('v1')
 957                self.api_client = api_client
 958                self.api_token = api_client.api_token
 959                self._gateway_services = {}
 960                self._keep_services = None
 961                self._writable_services = None
 962                self.using_proxy = None
 963                self._static_services_list = False
 964                try:
 965                    self._default_classes = [
 966                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
 967                except KeyError:
 968                    # We're talking to an old cluster
 969                    pass
 970
 971    def current_timeout(self, attempt_number):
 972        """Return the appropriate timeout to use for this client.
 973
 974        The proxy timeout setting if the backend service is currently a proxy,
 975        the regular timeout setting otherwise.  The `attempt_number` indicates
 976        how many times the operation has been tried already (starting from 0
 977        for the first try), and scales the connection timeout portion of the
 978        return value accordingly.
 979
 980        """
 981        # TODO(twp): the timeout should be a property of a
 982        # KeepService, not a KeepClient. See #4488.
 983        t = self.proxy_timeout if self.using_proxy else self.timeout
 984        if len(t) == 2:
 985            return (t[0] * (1 << attempt_number), t[1])
 986        else:
 987            return (t[0] * (1 << attempt_number), t[1], t[2])
 988    def _any_nondisk_services(self, service_list):
 989        return any(ks.get('service_type', 'disk') != 'disk'
 990                   for ks in service_list)
 991
 992    def build_services_list(self, force_rebuild=False):
 993        if (self._static_services_list or
 994              (self._keep_services and not force_rebuild)):
 995            return
 996        with self.lock:
 997            try:
 998                keep_services = self.api_client.keep_services().accessible()
 999            except Exception:  # API server predates Keep services.
1000                keep_services = self.api_client.keep_disks().list()
1001
1002            # Gateway services are only used when specified by UUID,
1003            # so there's nothing to gain by filtering them by
1004            # service_type.
1005            self._gateway_services = {ks['uuid']: ks for ks in
1006                                      keep_services.execute()['items']}
1007            if not self._gateway_services:
1008                raise arvados.errors.NoKeepServersError()
1009
1010            # Precompute the base URI for each service.
1011            for r in self._gateway_services.values():
1012                host = r['service_host']
1013                if not host.startswith('[') and host.find(':') >= 0:
1014                    # IPv6 URIs must be formatted like http://[::1]:80/...
1015                    host = '[' + host + ']'
1016                r['_service_root'] = "{}://{}:{:d}/".format(
1017                    'https' if r['service_ssl_flag'] else 'http',
1018                    host,
1019                    r['service_port'])
1020
1021            _logger.debug(str(self._gateway_services))
1022            self._keep_services = [
1023                ks for ks in self._gateway_services.values()
1024                if not ks.get('service_type', '').startswith('gateway:')]
1025            self._writable_services = [ks for ks in self._keep_services
1026                                       if not ks.get('read_only')]
1027
1028            # For disk type services, max_replicas_per_service is 1
1029            # It is unknown (unlimited) for other service types.
1030            if self._any_nondisk_services(self._writable_services):
1031                self.max_replicas_per_service = None
1032            else:
1033                self.max_replicas_per_service = 1
1034
1035    def _service_weight(self, data_hash, service_uuid):
1036        """Compute the weight of a Keep service endpoint for a data
1037        block with a known hash.
1038
1039        The weight is md5(h + u) where u is the last 15 characters of
1040        the service endpoint's UUID.
1041        """
1042        return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
1043
1044    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1045        """Return an array of Keep service endpoints, in the order in
1046        which they should be probed when reading or writing data with
1047        the given hash+hints.
1048        """
1049        self.build_services_list(force_rebuild)
1050
1051        sorted_roots = []
1052        # Use the services indicated by the given +K@... remote
1053        # service hints, if any are present and can be resolved to a
1054        # URI.
1055        for hint in locator.hints:
1056            if hint.startswith('K@'):
1057                if len(hint) == 7:
1058                    sorted_roots.append(
1059                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1060                elif len(hint) == 29:
1061                    svc = self._gateway_services.get(hint[2:])
1062                    if svc:
1063                        sorted_roots.append(svc['_service_root'])
1064
1065        # Sort the available local services by weight (heaviest first)
1066        # for this locator, and return their service_roots (base URIs)
1067        # in that order.
1068        use_services = self._keep_services
1069        if need_writable:
1070            use_services = self._writable_services
1071        self.using_proxy = self._any_nondisk_services(use_services)
1072        sorted_roots.extend([
1073            svc['_service_root'] for svc in sorted(
1074                use_services,
1075                reverse=True,
1076                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1077        _logger.debug("{}: {}".format(locator, sorted_roots))
1078        return sorted_roots
1079
1080    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1081        # roots_map is a dictionary, mapping Keep service root strings
1082        # to KeepService objects.  Poll for Keep services, and add any
1083        # new ones to roots_map.  Return the current list of local
1084        # root strings.
1085        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1086        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1087        for root in local_roots:
1088            if root not in roots_map:
1089                roots_map[root] = self.KeepService(
1090                    root, self._user_agent_pool,
1091                    upload_counter=self.upload_counter,
1092                    download_counter=self.download_counter,
1093                    headers=headers,
1094                    insecure=self.insecure)
1095        return local_roots
1096
1097    @staticmethod
1098    def _check_loop_result(result):
1099        # KeepClient RetryLoops should save results as a 2-tuple: the
1100        # actual result of the request, and the number of servers available
1101        # to receive the request this round.
1102        # This method returns True if there's a real result, False if
1103        # there are no more servers available, otherwise None.
1104        if isinstance(result, Exception):
1105            return None
1106        result, tried_server_count = result
1107        if (result is not None) and (result is not False):
1108            return True
1109        elif tried_server_count < 1:
1110            _logger.info("No more Keep services to try; giving up")
1111            return False
1112        else:
1113            return None
1114
1115    def get_from_cache(self, loc_s):
1116        """Fetch a block only if is in the cache, otherwise return None."""
1117        locator = KeepLocator(loc_s)
1118        slot = self.block_cache.get(locator.md5sum)
1119        if slot is not None and slot.ready.is_set():
1120            return slot.get()
1121        else:
1122            return None
1123
1124    def refresh_signature(self, loc):
1125        """Ask Keep to get the remote block and return its local signature"""
1126        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1127        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1128
1129    @retry.retry_method
1130    def head(self, loc_s, **kwargs):
1131        return self._get_or_head(loc_s, method="HEAD", **kwargs)
1132
1133    @retry.retry_method
1134    def get(self, loc_s, **kwargs):
1135        return self._get_or_head(loc_s, method="GET", **kwargs)
1136
1137    def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1138        """Get data from Keep.
1139
1140        This method fetches one or more blocks of data from Keep.  It
1141        sends a request each Keep service registered with the API
1142        server (or the proxy provided when this client was
1143        instantiated), then each service named in location hints, in
1144        sequence.  As soon as one service provides the data, it's
1145        returned.
1146
1147        Arguments:
1148        * loc_s: A string of one or more comma-separated locators to fetch.
1149          This method returns the concatenation of these blocks.
1150        * num_retries: The number of times to retry GET requests to
1151          *each* Keep server if it returns temporary failures, with
1152          exponential backoff.  Note that, in each loop, the method may try
1153          to fetch data from every available Keep service, along with any
1154          that are named in location hints in the locator.  The default value
1155          is set when the KeepClient is initialized.
1156        """
1157        if ',' in loc_s:
1158            return ''.join(self.get(x) for x in loc_s.split(','))
1159
1160        self.get_counter.add(1)
1161
1162        request_id = (request_id or
1163                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1164                      arvados.util.new_request_id())
1165        if headers is None:
1166            headers = {}
1167        headers['X-Request-Id'] = request_id
1168
1169        slot = None
1170        blob = None
1171        try:
1172            locator = KeepLocator(loc_s)
1173            if method == "GET":
1174                while slot is None:
1175                    slot, first = self.block_cache.reserve_cache(locator.md5sum)
1176                    if first:
1177                        # Fresh and empty "first time it is used" slot
1178                        break
1179                    if prefetch:
1180                        # this is request for a prefetch to fill in
1181                        # the cache, don't need to wait for the
1182                        # result, so if it is already in flight return
1183                        # immediately.  Clear 'slot' to prevent
1184                        # finally block from calling slot.set()
1185                        if slot.ready.is_set():
1186                            slot.get()
1187                        slot = None
1188                        return None
1189
1190                    blob = slot.get()
1191                    if blob is not None:
1192                        self.hits_counter.add(1)
1193                        return blob
1194
1195                    # If blob is None, this means either
1196                    #
1197                    # (a) another thread was fetching this block and
1198                    # failed with an error or
1199                    #
1200                    # (b) cache thrashing caused the slot to be
1201                    # evicted (content set to None) by another thread
1202                    # between the call to reserve_cache() and get().
1203                    #
1204                    # We'll handle these cases by reserving a new slot
1205                    # and then doing a full GET request.
1206                    slot = None
1207
1208            self.misses_counter.add(1)
1209
1210            # If the locator has hints specifying a prefix (indicating a
1211            # remote keepproxy) or the UUID of a local gateway service,
1212            # read data from the indicated service(s) instead of the usual
1213            # list of local disk services.
1214            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1215                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1216            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1217                               for hint in locator.hints if (
1218                                       hint.startswith('K@') and
1219                                       len(hint) == 29 and
1220                                       self._gateway_services.get(hint[2:])
1221                                       )])
1222            # Map root URLs to their KeepService objects.
1223            roots_map = {
1224                root: self.KeepService(root, self._user_agent_pool,
1225                                       upload_counter=self.upload_counter,
1226                                       download_counter=self.download_counter,
1227                                       headers=headers,
1228                                       insecure=self.insecure)
1229                for root in hint_roots
1230            }
1231
1232            # See #3147 for a discussion of the loop implementation.  Highlights:
1233            # * Refresh the list of Keep services after each failure, in case
1234            #   it's being updated.
1235            # * Retry until we succeed, we're out of retries, or every available
1236            #   service has returned permanent failure.
1237            sorted_roots = []
1238            roots_map = {}
1239            loop = retry.RetryLoop(num_retries, self._check_loop_result,
1240                                   backoff_start=2)
1241            for tries_left in loop:
1242                try:
1243                    sorted_roots = self.map_new_services(
1244                        roots_map, locator,
1245                        force_rebuild=(tries_left < num_retries),
1246                        need_writable=False,
1247                        headers=headers)
1248                except Exception as error:
1249                    loop.save_result(error)
1250                    continue
1251
1252                # Query KeepService objects that haven't returned
1253                # permanent failure, in our specified shuffle order.
1254                services_to_try = [roots_map[root]
1255                                   for root in sorted_roots
1256                                   if roots_map[root].usable()]
1257                for keep_service in services_to_try:
1258                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1259                    if blob is not None:
1260                        break
1261                loop.save_result((blob, len(services_to_try)))
1262
1263            # Always cache the result, then return it if we succeeded.
1264            if loop.success():
1265                return blob
1266        finally:
1267            if slot is not None:
1268                self.block_cache.set(slot, blob)
1269
1270        # Q: Including 403 is necessary for the Keep tests to continue
1271        # passing, but maybe they should expect KeepReadError instead?
1272        not_founds = sum(1 for key in sorted_roots
1273                         if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1274        service_errors = ((key, roots_map[key].last_result()['error'])
1275                          for key in sorted_roots)
1276        if not roots_map:
1277            raise arvados.errors.KeepReadError(
1278                "[{}] failed to read {}: no Keep services available ({})".format(
1279                    request_id, loc_s, loop.last_result()))
1280        elif not_founds == len(sorted_roots):
1281            raise arvados.errors.NotFoundError(
1282                "[{}] {} not found".format(request_id, loc_s), service_errors)
1283        else:
1284            raise arvados.errors.KeepReadError(
1285                "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1286
1287    @retry.retry_method
1288    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1289        """Save data in Keep.
1290
1291        This method will get a list of Keep services from the API server, and
1292        send the data to each one simultaneously in a new thread.  Once the
1293        uploads are finished, if enough copies are saved, this method returns
1294        the most recent HTTP response body.  If requests fail to upload
1295        enough copies, this method raises KeepWriteError.
1296
1297        Arguments:
1298        * data: The string of data to upload.
1299        * copies: The number of copies that the user requires be saved.
1300          Default 2.
1301        * num_retries: The number of times to retry PUT requests to
1302          *each* Keep server if it returns temporary failures, with
1303          exponential backoff.  The default value is set when the
1304          KeepClient is initialized.
1305        * classes: An optional list of storage class names where copies should
1306          be written.
1307        """
1308
1309        classes = classes or self._default_classes
1310
1311        if not isinstance(data, bytes):
1312            data = data.encode()
1313
1314        self.put_counter.add(1)
1315
1316        data_hash = hashlib.md5(data).hexdigest()
1317        loc_s = data_hash + '+' + str(len(data))
1318        if copies < 1:
1319            return loc_s
1320        locator = KeepLocator(loc_s)
1321
1322        request_id = (request_id or
1323                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1324                      arvados.util.new_request_id())
1325        headers = {
1326            'X-Request-Id': request_id,
1327            'X-Keep-Desired-Replicas': str(copies),
1328        }
1329        roots_map = {}
1330        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1331                               backoff_start=2)
1332        done_copies = 0
1333        done_classes = []
1334        for tries_left in loop:
1335            try:
1336                sorted_roots = self.map_new_services(
1337                    roots_map, locator,
1338                    force_rebuild=(tries_left < num_retries),
1339                    need_writable=True,
1340                    headers=headers)
1341            except Exception as error:
1342                loop.save_result(error)
1343                continue
1344
1345            pending_classes = []
1346            if done_classes is not None:
1347                pending_classes = list(set(classes) - set(done_classes))
1348            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1349                                                        data_hash=data_hash,
1350                                                        copies=copies - done_copies,
1351                                                        max_service_replicas=self.max_replicas_per_service,
1352                                                        timeout=self.current_timeout(num_retries - tries_left),
1353                                                        classes=pending_classes)
1354            for service_root, ks in [(root, roots_map[root])
1355                                     for root in sorted_roots]:
1356                if ks.finished():
1357                    continue
1358                writer_pool.add_task(ks, service_root)
1359            writer_pool.join()
1360            pool_copies, pool_classes = writer_pool.done()
1361            done_copies += pool_copies
1362            if (done_classes is not None) and (pool_classes is not None):
1363                done_classes += pool_classes
1364                loop.save_result(
1365                    (done_copies >= copies and set(done_classes) == set(classes),
1366                    writer_pool.total_task_nr))
1367            else:
1368                # Old keepstore contacted without storage classes support:
1369                # success is determined only by successful copies.
1370                #
1371                # Disable storage classes tracking from this point forward.
1372                if not self._storage_classes_unsupported_warning:
1373                    self._storage_classes_unsupported_warning = True
1374                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1375                done_classes = None
1376                loop.save_result(
1377                    (done_copies >= copies, writer_pool.total_task_nr))
1378
1379        if loop.success():
1380            return writer_pool.response()
1381        if not roots_map:
1382            raise arvados.errors.KeepWriteError(
1383                "[{}] failed to write {}: no Keep services available ({})".format(
1384                    request_id, data_hash, loop.last_result()))
1385        else:
1386            service_errors = ((key, roots_map[key].last_result()['error'])
1387                              for key in sorted_roots
1388                              if roots_map[key].last_result()['error'])
1389            raise arvados.errors.KeepWriteError(
1390                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1391                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1392
1393    def _block_prefetch_worker(self):
1394        """The background downloader thread."""
1395        while True:
1396            try:
1397                b = self._prefetch_queue.get()
1398                if b is None:
1399                    return
1400                self.get(b, prefetch=True)
1401            except Exception:
1402                _logger.exception("Exception doing block prefetch")
1403
1404    def _start_prefetch_threads(self):
1405        if self._prefetch_threads is None:
1406            with self.lock:
1407                if self._prefetch_threads is not None:
1408                    return
1409                self._prefetch_queue = queue.Queue()
1410                self._prefetch_threads = []
1411                for i in range(0, self.num_prefetch_threads):
1412                    thread = threading.Thread(target=self._block_prefetch_worker)
1413                    self._prefetch_threads.append(thread)
1414                    thread.daemon = True
1415                    thread.start()
1416
1417    def block_prefetch(self, locator):
1418        """
1419        This relies on the fact that KeepClient implements a block cache,
1420        so repeated requests for the same block will not result in repeated
1421        downloads (unless the block is evicted from the cache.)  This method
1422        does not block.
1423        """
1424
1425        if self.block_cache.get(locator) is not None:
1426            return
1427
1428        self._start_prefetch_threads()
1429        self._prefetch_queue.put(locator)
1430
1431    def stop_prefetch_threads(self):
1432        with self.lock:
1433            if self._prefetch_threads is not None:
1434                for t in self._prefetch_threads:
1435                    self._prefetch_queue.put(None)
1436                for t in self._prefetch_threads:
1437                    t.join()
1438            self._prefetch_threads = None
1439            self._prefetch_queue = None
1440
1441    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1442        """A stub for put().
1443
1444        This method is used in place of the real put() method when
1445        using local storage (see constructor's local_store argument).
1446
1447        copies and num_retries arguments are ignored: they are here
1448        only for the sake of offering the same call signature as
1449        put().
1450
1451        Data stored this way can be retrieved via local_store_get().
1452        """
1453        md5 = hashlib.md5(data).hexdigest()
1454        locator = '%s+%d' % (md5, len(data))
1455        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1456            f.write(data)
1457        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1458                  os.path.join(self.local_store, md5))
1459        return locator
1460
1461    def local_store_get(self, loc_s, num_retries=None):
1462        """Companion to local_store_put()."""
1463        try:
1464            locator = KeepLocator(loc_s)
1465        except ValueError:
1466            raise arvados.errors.NotFoundError(
1467                "Invalid data locator: '%s'" % loc_s)
1468        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1469            return b''
1470        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1471            return f.read()
1472
1473    def local_store_head(self, loc_s, num_retries=None):
1474        """Companion to local_store_put()."""
1475        try:
1476            locator = KeepLocator(loc_s)
1477        except ValueError:
1478            raise arvados.errors.NotFoundError(
1479                "Invalid data locator: '%s'" % loc_s)
1480        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1481            return True
1482        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1483            return True
KeepClient( api_client=None, proxy=None, timeout=(2, 256, 32768), proxy_timeout=(20, 256, 32768), api_token=None, local_store=None, block_cache=None, num_retries=10, session=None, num_prefetch_threads=None)
826    def __init__(self, api_client=None, proxy=None,
827                 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
828                 api_token=None, local_store=None, block_cache=None,
829                 num_retries=10, session=None, num_prefetch_threads=None):
830        """Initialize a new KeepClient.
831
832        Arguments:
833        :api_client:
834          The API client to use to find Keep services.  If not
835          provided, KeepClient will build one from available Arvados
836          configuration.
837
838        :proxy:
839          If specified, this KeepClient will send requests to this Keep
840          proxy.  Otherwise, KeepClient will fall back to the setting of the
841          ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
842          If you want to KeepClient does not use a proxy, pass in an empty
843          string.
844
845        :timeout:
846          The initial timeout (in seconds) for HTTP requests to Keep
847          non-proxy servers.  A tuple of three floats is interpreted as
848          (connection_timeout, read_timeout, minimum_bandwidth). A connection
849          will be aborted if the average traffic rate falls below
850          minimum_bandwidth bytes per second over an interval of read_timeout
851          seconds. Because timeouts are often a result of transient server
852          load, the actual connection timeout will be increased by a factor
853          of two on each retry.
854          Default: (2, 256, 32768).
855
856        :proxy_timeout:
857          The initial timeout (in seconds) for HTTP requests to
858          Keep proxies. A tuple of three floats is interpreted as
859          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
860          described above for adjusting connection timeouts on retry also
861          applies.
862          Default: (20, 256, 32768).
863
864        :api_token:
865          If you're not using an API client, but only talking
866          directly to a Keep proxy, this parameter specifies an API token
867          to authenticate Keep requests.  It is an error to specify both
868          api_client and api_token.  If you specify neither, KeepClient
869          will use one available from the Arvados configuration.
870
871        :local_store:
872          If specified, this KeepClient will bypass Keep
873          services, and save data to the named directory.  If unspecified,
874          KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
875          environment variable.  If you want to ensure KeepClient does not
876          use local storage, pass in an empty string.  This is primarily
877          intended to mock a server for testing.
878
879        :num_retries:
880          The default number of times to retry failed requests.
881          This will be used as the default num_retries value when get() and
882          put() are called.  Default 10.
883        """
884        self.lock = threading.Lock()
885        if proxy is None:
886            if config.get('ARVADOS_KEEP_SERVICES'):
887                proxy = config.get('ARVADOS_KEEP_SERVICES')
888            else:
889                proxy = config.get('ARVADOS_KEEP_PROXY')
890        if api_token is None:
891            if api_client is None:
892                api_token = config.get('ARVADOS_API_TOKEN')
893            else:
894                api_token = api_client.api_token
895        elif api_client is not None:
896            raise ValueError(
897                "can't build KeepClient with both API client and token")
898        if local_store is None:
899            local_store = os.environ.get('KEEP_LOCAL_STORE')
900
901        if api_client is None:
902            self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
903        else:
904            self.insecure = api_client.insecure
905
906        self.block_cache = block_cache if block_cache else KeepBlockCache()
907        self.timeout = timeout
908        self.proxy_timeout = proxy_timeout
909        self._user_agent_pool = queue.LifoQueue()
910        self.upload_counter = Counter()
911        self.download_counter = Counter()
912        self.put_counter = Counter()
913        self.get_counter = Counter()
914        self.hits_counter = Counter()
915        self.misses_counter = Counter()
916        self._storage_classes_unsupported_warning = False
917        self._default_classes = []
918        if num_prefetch_threads is not None:
919            self.num_prefetch_threads = num_prefetch_threads
920        else:
921            self.num_prefetch_threads = 2
922        self._prefetch_queue = None
923        self._prefetch_threads = None
924
925        if local_store:
926            self.local_store = local_store
927            self.head = self.local_store_head
928            self.get = self.local_store_get
929            self.put = self.local_store_put
930        else:
931            self.num_retries = num_retries
932            self.max_replicas_per_service = None
933            if proxy:
934                proxy_uris = proxy.split()
935                for i in range(len(proxy_uris)):
936                    if not proxy_uris[i].endswith('/'):
937                        proxy_uris[i] += '/'
938                    # URL validation
939                    url = urllib.parse.urlparse(proxy_uris[i])
940                    if not (url.scheme and url.netloc):
941                        raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
942                self.api_token = api_token
943                self._gateway_services = {}
944                self._keep_services = [{
945                    'uuid': "00000-bi6l4-%015d" % idx,
946                    'service_type': 'proxy',
947                    '_service_root': uri,
948                    } for idx, uri in enumerate(proxy_uris)]
949                self._writable_services = self._keep_services
950                self.using_proxy = True
951                self._static_services_list = True
952            else:
953                # It's important to avoid instantiating an API client
954                # unless we actually need one, for testing's sake.
955                if api_client is None:
956                    api_client = arvados.api('v1')
957                self.api_client = api_client
958                self.api_token = api_client.api_token
959                self._gateway_services = {}
960                self._keep_services = None
961                self._writable_services = None
962                self.using_proxy = None
963                self._static_services_list = False
964                try:
965                    self._default_classes = [
966                        k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
967                except KeyError:
968                    # We're talking to an old cluster
969                    pass

Initialize a new KeepClient.

Arguments: :api_client: The API client to use to find Keep services. If not provided, KeepClient will build one from available Arvados configuration.

:proxy: If specified, this KeepClient will send requests to this Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings. If you want to KeepClient does not use a proxy, pass in an empty string.

:timeout: The initial timeout (in seconds) for HTTP requests to Keep non-proxy servers. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). A connection will be aborted if the average traffic rate falls below minimum_bandwidth bytes per second over an interval of read_timeout seconds. Because timeouts are often a result of transient server load, the actual connection timeout will be increased by a factor of two on each retry. Default: (2, 256, 32768).

:proxy_timeout: The initial timeout (in seconds) for HTTP requests to Keep proxies. A tuple of three floats is interpreted as (connection_timeout, read_timeout, minimum_bandwidth). The behavior described above for adjusting connection timeouts on retry also applies. Default: (20, 256, 32768).

:api_token: If you’re not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both api_client and api_token. If you specify neither, KeepClient will use one available from the Arvados configuration.

:local_store: If specified, this KeepClient will bypass Keep services, and save data to the named directory. If unspecified, KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing.

:num_retries: The default number of times to retry failed requests. This will be used as the default num_retries value when get() and put() are called. Default 10.

DEFAULT_TIMEOUT = (2, 256, 32768)
DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
lock
block_cache
timeout
proxy_timeout
upload_counter
download_counter
put_counter
get_counter
hits_counter
misses_counter
def current_timeout(self, attempt_number):
971    def current_timeout(self, attempt_number):
972        """Return the appropriate timeout to use for this client.
973
974        The proxy timeout setting if the backend service is currently a proxy,
975        the regular timeout setting otherwise.  The `attempt_number` indicates
976        how many times the operation has been tried already (starting from 0
977        for the first try), and scales the connection timeout portion of the
978        return value accordingly.
979
980        """
981        # TODO(twp): the timeout should be a property of a
982        # KeepService, not a KeepClient. See #4488.
983        t = self.proxy_timeout if self.using_proxy else self.timeout
984        if len(t) == 2:
985            return (t[0] * (1 << attempt_number), t[1])
986        else:
987            return (t[0] * (1 << attempt_number), t[1], t[2])

Return the appropriate timeout to use for this client.

The proxy timeout setting if the backend service is currently a proxy, the regular timeout setting otherwise. The attempt_number indicates how many times the operation has been tried already (starting from 0 for the first try), and scales the connection timeout portion of the return value accordingly.

def build_services_list(self, force_rebuild=False):
 992    def build_services_list(self, force_rebuild=False):
 993        if (self._static_services_list or
 994              (self._keep_services and not force_rebuild)):
 995            return
 996        with self.lock:
 997            try:
 998                keep_services = self.api_client.keep_services().accessible()
 999            except Exception:  # API server predates Keep services.
1000                keep_services = self.api_client.keep_disks().list()
1001
1002            # Gateway services are only used when specified by UUID,
1003            # so there's nothing to gain by filtering them by
1004            # service_type.
1005            self._gateway_services = {ks['uuid']: ks for ks in
1006                                      keep_services.execute()['items']}
1007            if not self._gateway_services:
1008                raise arvados.errors.NoKeepServersError()
1009
1010            # Precompute the base URI for each service.
1011            for r in self._gateway_services.values():
1012                host = r['service_host']
1013                if not host.startswith('[') and host.find(':') >= 0:
1014                    # IPv6 URIs must be formatted like http://[::1]:80/...
1015                    host = '[' + host + ']'
1016                r['_service_root'] = "{}://{}:{:d}/".format(
1017                    'https' if r['service_ssl_flag'] else 'http',
1018                    host,
1019                    r['service_port'])
1020
1021            _logger.debug(str(self._gateway_services))
1022            self._keep_services = [
1023                ks for ks in self._gateway_services.values()
1024                if not ks.get('service_type', '').startswith('gateway:')]
1025            self._writable_services = [ks for ks in self._keep_services
1026                                       if not ks.get('read_only')]
1027
1028            # For disk type services, max_replicas_per_service is 1
1029            # It is unknown (unlimited) for other service types.
1030            if self._any_nondisk_services(self._writable_services):
1031                self.max_replicas_per_service = None
1032            else:
1033                self.max_replicas_per_service = 1
def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1044    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
1045        """Return an array of Keep service endpoints, in the order in
1046        which they should be probed when reading or writing data with
1047        the given hash+hints.
1048        """
1049        self.build_services_list(force_rebuild)
1050
1051        sorted_roots = []
1052        # Use the services indicated by the given +K@... remote
1053        # service hints, if any are present and can be resolved to a
1054        # URI.
1055        for hint in locator.hints:
1056            if hint.startswith('K@'):
1057                if len(hint) == 7:
1058                    sorted_roots.append(
1059                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
1060                elif len(hint) == 29:
1061                    svc = self._gateway_services.get(hint[2:])
1062                    if svc:
1063                        sorted_roots.append(svc['_service_root'])
1064
1065        # Sort the available local services by weight (heaviest first)
1066        # for this locator, and return their service_roots (base URIs)
1067        # in that order.
1068        use_services = self._keep_services
1069        if need_writable:
1070            use_services = self._writable_services
1071        self.using_proxy = self._any_nondisk_services(use_services)
1072        sorted_roots.extend([
1073            svc['_service_root'] for svc in sorted(
1074                use_services,
1075                reverse=True,
1076                key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1077        _logger.debug("{}: {}".format(locator, sorted_roots))
1078        return sorted_roots

Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints.

def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1080    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1081        # roots_map is a dictionary, mapping Keep service root strings
1082        # to KeepService objects.  Poll for Keep services, and add any
1083        # new ones to roots_map.  Return the current list of local
1084        # root strings.
1085        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1086        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1087        for root in local_roots:
1088            if root not in roots_map:
1089                roots_map[root] = self.KeepService(
1090                    root, self._user_agent_pool,
1091                    upload_counter=self.upload_counter,
1092                    download_counter=self.download_counter,
1093                    headers=headers,
1094                    insecure=self.insecure)
1095        return local_roots
def get_from_cache(self, loc_s):
1115    def get_from_cache(self, loc_s):
1116        """Fetch a block only if is in the cache, otherwise return None."""
1117        locator = KeepLocator(loc_s)
1118        slot = self.block_cache.get(locator.md5sum)
1119        if slot is not None and slot.ready.is_set():
1120            return slot.get()
1121        else:
1122            return None

Fetch a block only if is in the cache, otherwise return None.

def refresh_signature(self, loc):
1124    def refresh_signature(self, loc):
1125        """Ask Keep to get the remote block and return its local signature"""
1126        now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1127        return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})

Ask Keep to get the remote block and return its local signature

@retry.retry_method
def head(self, loc_s, **kwargs):
1129    @retry.retry_method
1130    def head(self, loc_s, **kwargs):
1131        return self._get_or_head(loc_s, method="HEAD", **kwargs)
@retry.retry_method
def get(self, loc_s, **kwargs):
1133    @retry.retry_method
1134    def get(self, loc_s, **kwargs):
1135        return self._get_or_head(loc_s, method="GET", **kwargs)
@retry.retry_method
def put( self, data, copies=2, num_retries=None, request_id=None, classes=None):
1287    @retry.retry_method
1288    def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1289        """Save data in Keep.
1290
1291        This method will get a list of Keep services from the API server, and
1292        send the data to each one simultaneously in a new thread.  Once the
1293        uploads are finished, if enough copies are saved, this method returns
1294        the most recent HTTP response body.  If requests fail to upload
1295        enough copies, this method raises KeepWriteError.
1296
1297        Arguments:
1298        * data: The string of data to upload.
1299        * copies: The number of copies that the user requires be saved.
1300          Default 2.
1301        * num_retries: The number of times to retry PUT requests to
1302          *each* Keep server if it returns temporary failures, with
1303          exponential backoff.  The default value is set when the
1304          KeepClient is initialized.
1305        * classes: An optional list of storage class names where copies should
1306          be written.
1307        """
1308
1309        classes = classes or self._default_classes
1310
1311        if not isinstance(data, bytes):
1312            data = data.encode()
1313
1314        self.put_counter.add(1)
1315
1316        data_hash = hashlib.md5(data).hexdigest()
1317        loc_s = data_hash + '+' + str(len(data))
1318        if copies < 1:
1319            return loc_s
1320        locator = KeepLocator(loc_s)
1321
1322        request_id = (request_id or
1323                      (hasattr(self, 'api_client') and self.api_client.request_id) or
1324                      arvados.util.new_request_id())
1325        headers = {
1326            'X-Request-Id': request_id,
1327            'X-Keep-Desired-Replicas': str(copies),
1328        }
1329        roots_map = {}
1330        loop = retry.RetryLoop(num_retries, self._check_loop_result,
1331                               backoff_start=2)
1332        done_copies = 0
1333        done_classes = []
1334        for tries_left in loop:
1335            try:
1336                sorted_roots = self.map_new_services(
1337                    roots_map, locator,
1338                    force_rebuild=(tries_left < num_retries),
1339                    need_writable=True,
1340                    headers=headers)
1341            except Exception as error:
1342                loop.save_result(error)
1343                continue
1344
1345            pending_classes = []
1346            if done_classes is not None:
1347                pending_classes = list(set(classes) - set(done_classes))
1348            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1349                                                        data_hash=data_hash,
1350                                                        copies=copies - done_copies,
1351                                                        max_service_replicas=self.max_replicas_per_service,
1352                                                        timeout=self.current_timeout(num_retries - tries_left),
1353                                                        classes=pending_classes)
1354            for service_root, ks in [(root, roots_map[root])
1355                                     for root in sorted_roots]:
1356                if ks.finished():
1357                    continue
1358                writer_pool.add_task(ks, service_root)
1359            writer_pool.join()
1360            pool_copies, pool_classes = writer_pool.done()
1361            done_copies += pool_copies
1362            if (done_classes is not None) and (pool_classes is not None):
1363                done_classes += pool_classes
1364                loop.save_result(
1365                    (done_copies >= copies and set(done_classes) == set(classes),
1366                    writer_pool.total_task_nr))
1367            else:
1368                # Old keepstore contacted without storage classes support:
1369                # success is determined only by successful copies.
1370                #
1371                # Disable storage classes tracking from this point forward.
1372                if not self._storage_classes_unsupported_warning:
1373                    self._storage_classes_unsupported_warning = True
1374                    _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1375                done_classes = None
1376                loop.save_result(
1377                    (done_copies >= copies, writer_pool.total_task_nr))
1378
1379        if loop.success():
1380            return writer_pool.response()
1381        if not roots_map:
1382            raise arvados.errors.KeepWriteError(
1383                "[{}] failed to write {}: no Keep services available ({})".format(
1384                    request_id, data_hash, loop.last_result()))
1385        else:
1386            service_errors = ((key, roots_map[key].last_result()['error'])
1387                              for key in sorted_roots
1388                              if roots_map[key].last_result()['error'])
1389            raise arvados.errors.KeepWriteError(
1390                "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1391                    request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")

Save data in Keep.

This method will get a list of Keep services from the API server, and send the data to each one simultaneously in a new thread. Once the uploads are finished, if enough copies are saved, this method returns the most recent HTTP response body. If requests fail to upload enough copies, this method raises KeepWriteError.

Arguments:

  • data: The string of data to upload.
  • copies: The number of copies that the user requires be saved. Default 2.
  • num_retries: The number of times to retry PUT requests to each Keep server if it returns temporary failures, with exponential backoff. The default value is set when the KeepClient is initialized.
  • classes: An optional list of storage class names where copies should be written.
def block_prefetch(self, locator):
1417    def block_prefetch(self, locator):
1418        """
1419        This relies on the fact that KeepClient implements a block cache,
1420        so repeated requests for the same block will not result in repeated
1421        downloads (unless the block is evicted from the cache.)  This method
1422        does not block.
1423        """
1424
1425        if self.block_cache.get(locator) is not None:
1426            return
1427
1428        self._start_prefetch_threads()
1429        self._prefetch_queue.put(locator)

This relies on the fact that KeepClient implements a block cache, so repeated requests for the same block will not result in repeated downloads (unless the block is evicted from the cache.) This method does not block.

def stop_prefetch_threads(self):
1431    def stop_prefetch_threads(self):
1432        with self.lock:
1433            if self._prefetch_threads is not None:
1434                for t in self._prefetch_threads:
1435                    self._prefetch_queue.put(None)
1436                for t in self._prefetch_threads:
1437                    t.join()
1438            self._prefetch_threads = None
1439            self._prefetch_queue = None
def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1441    def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1442        """A stub for put().
1443
1444        This method is used in place of the real put() method when
1445        using local storage (see constructor's local_store argument).
1446
1447        copies and num_retries arguments are ignored: they are here
1448        only for the sake of offering the same call signature as
1449        put().
1450
1451        Data stored this way can be retrieved via local_store_get().
1452        """
1453        md5 = hashlib.md5(data).hexdigest()
1454        locator = '%s+%d' % (md5, len(data))
1455        with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1456            f.write(data)
1457        os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1458                  os.path.join(self.local_store, md5))
1459        return locator

A stub for put().

This method is used in place of the real put() method when using local storage (see constructor’s local_store argument).

copies and num_retries arguments are ignored: they are here only for the sake of offering the same call signature as put().

Data stored this way can be retrieved via local_store_get().

def local_store_get(self, loc_s, num_retries=None):
1461    def local_store_get(self, loc_s, num_retries=None):
1462        """Companion to local_store_put()."""
1463        try:
1464            locator = KeepLocator(loc_s)
1465        except ValueError:
1466            raise arvados.errors.NotFoundError(
1467                "Invalid data locator: '%s'" % loc_s)
1468        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1469            return b''
1470        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1471            return f.read()

Companion to local_store_put().

def local_store_head(self, loc_s, num_retries=None):
1473    def local_store_head(self, loc_s, num_retries=None):
1474        """Companion to local_store_put()."""
1475        try:
1476            locator = KeepLocator(loc_s)
1477        except ValueError:
1478            raise arvados.errors.NotFoundError(
1479                "Invalid data locator: '%s'" % loc_s)
1480        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1481            return True
1482        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1483            return True

Companion to local_store_put().

class KeepClient.KeepService(arvados._pycurlhelper.PyCurlHelper):
403    class KeepService(PyCurlHelper):
404        """Make requests to a single Keep service, and track results.
405
406        A KeepService is intended to last long enough to perform one
407        transaction (GET or PUT) against one Keep service. This can
408        involve calling either get() or put() multiple times in order
409        to retry after transient failures. However, calling both get()
410        and put() on a single instance -- or using the same instance
411        to access two different Keep services -- will not produce
412        sensible behavior.
413        """
414
415        HTTP_ERRORS = (
416            socket.error,
417            ssl.SSLError,
418            arvados.errors.HttpError,
419        )
420
421        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
422                     upload_counter=None,
423                     download_counter=None,
424                     headers={},
425                     insecure=False):
426            super(KeepClient.KeepService, self).__init__()
427            self.root = root
428            self._user_agent_pool = user_agent_pool
429            self._result = {'error': None}
430            self._usable = True
431            self._session = None
432            self._socket = None
433            self.get_headers = {'Accept': 'application/octet-stream'}
434            self.get_headers.update(headers)
435            self.put_headers = headers
436            self.upload_counter = upload_counter
437            self.download_counter = download_counter
438            self.insecure = insecure
439
440        def usable(self):
441            """Is it worth attempting a request?"""
442            return self._usable
443
444        def finished(self):
445            """Did the request succeed or encounter permanent failure?"""
446            return self._result['error'] == False or not self._usable
447
448        def last_result(self):
449            return self._result
450
451        def _get_user_agent(self):
452            try:
453                return self._user_agent_pool.get(block=False)
454            except queue.Empty:
455                return pycurl.Curl()
456
457        def _put_user_agent(self, ua):
458            try:
459                ua.reset()
460                self._user_agent_pool.put(ua, block=False)
461            except:
462                ua.close()
463
464        def get(self, locator, method="GET", timeout=None):
465            # locator is a KeepLocator object.
466            url = self.root + str(locator)
467            _logger.debug("Request: %s %s", method, url)
468            curl = self._get_user_agent()
469            ok = None
470            try:
471                with timer.Timer() as t:
472                    self._headers = {}
473                    response_body = BytesIO()
474                    curl.setopt(pycurl.NOSIGNAL, 1)
475                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
476                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
477                    curl.setopt(pycurl.URL, url.encode('utf-8'))
478                    curl.setopt(pycurl.HTTPHEADER, [
479                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
480                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
481                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
482                    if self.insecure:
483                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
484                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
485                    else:
486                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
487                    if method == "HEAD":
488                        curl.setopt(pycurl.NOBODY, True)
489                    else:
490                        curl.setopt(pycurl.HTTPGET, True)
491                    self._setcurltimeouts(curl, timeout, method=="HEAD")
492
493                    try:
494                        curl.perform()
495                    except Exception as e:
496                        raise arvados.errors.HttpError(0, str(e))
497                    finally:
498                        if self._socket:
499                            self._socket.close()
500                            self._socket = None
501                    self._result = {
502                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
503                        'body': response_body.getvalue(),
504                        'headers': self._headers,
505                        'error': False,
506                    }
507
508                ok = retry.check_http_response_success(self._result['status_code'])
509                if not ok:
510                    self._result['error'] = arvados.errors.HttpError(
511                        self._result['status_code'],
512                        self._headers.get('x-status-line', 'Error'))
513            except self.HTTP_ERRORS as e:
514                self._result = {
515                    'error': e,
516                }
517            self._usable = ok != False
518            if self._result.get('status_code', None):
519                # The client worked well enough to get an HTTP status
520                # code, so presumably any problems are just on the
521                # server side and it's OK to reuse the client.
522                self._put_user_agent(curl)
523            else:
524                # Don't return this client to the pool, in case it's
525                # broken.
526                curl.close()
527            if not ok:
528                _logger.debug("Request fail: GET %s => %s: %s",
529                              url, type(self._result['error']), str(self._result['error']))
530                return None
531            if method == "HEAD":
532                _logger.info("HEAD %s: %s bytes",
533                         self._result['status_code'],
534                         self._result.get('content-length'))
535                if self._result['headers'].get('x-keep-locator'):
536                    # This is a response to a remote block copy request, return
537                    # the local copy block locator.
538                    return self._result['headers'].get('x-keep-locator')
539                return True
540
541            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
542                         self._result['status_code'],
543                         len(self._result['body']),
544                         t.msecs,
545                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
546
547            if self.download_counter:
548                self.download_counter.add(len(self._result['body']))
549            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
550            if resp_md5 != locator.md5sum:
551                _logger.warning("Checksum fail: md5(%s) = %s",
552                                url, resp_md5)
553                self._result['error'] = arvados.errors.HttpError(
554                    0, 'Checksum fail')
555                return None
556            return self._result['body']
557
558        def put(self, hash_s, body, timeout=None, headers={}):
559            put_headers = copy.copy(self.put_headers)
560            put_headers.update(headers)
561            url = self.root + hash_s
562            _logger.debug("Request: PUT %s", url)
563            curl = self._get_user_agent()
564            ok = None
565            try:
566                with timer.Timer() as t:
567                    self._headers = {}
568                    body_reader = BytesIO(body)
569                    response_body = BytesIO()
570                    curl.setopt(pycurl.NOSIGNAL, 1)
571                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
572                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
573                    curl.setopt(pycurl.URL, url.encode('utf-8'))
574                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
575                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
576                    # response) instead of sending the request body immediately.
577                    # This allows the server to reject the request if the request
578                    # is invalid or the server is read-only, without waiting for
579                    # the client to send the entire block.
580                    curl.setopt(pycurl.UPLOAD, True)
581                    curl.setopt(pycurl.INFILESIZE, len(body))
582                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
583                    curl.setopt(pycurl.HTTPHEADER, [
584                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
585                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
586                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
587                    if self.insecure:
588                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
589                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
590                    else:
591                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
592                    self._setcurltimeouts(curl, timeout)
593                    try:
594                        curl.perform()
595                    except Exception as e:
596                        raise arvados.errors.HttpError(0, str(e))
597                    finally:
598                        if self._socket:
599                            self._socket.close()
600                            self._socket = None
601                    self._result = {
602                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
603                        'body': response_body.getvalue().decode('utf-8'),
604                        'headers': self._headers,
605                        'error': False,
606                    }
607                ok = retry.check_http_response_success(self._result['status_code'])
608                if not ok:
609                    self._result['error'] = arvados.errors.HttpError(
610                        self._result['status_code'],
611                        self._headers.get('x-status-line', 'Error'))
612            except self.HTTP_ERRORS as e:
613                self._result = {
614                    'error': e,
615                }
616            self._usable = ok != False # still usable if ok is True or None
617            if self._result.get('status_code', None):
618                # Client is functional. See comment in get().
619                self._put_user_agent(curl)
620            else:
621                curl.close()
622            if not ok:
623                _logger.debug("Request fail: PUT %s => %s: %s",
624                              url, type(self._result['error']), str(self._result['error']))
625                return False
626            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
627                         self._result['status_code'],
628                         len(body),
629                         t.msecs,
630                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
631            if self.upload_counter:
632                self.upload_counter.add(len(body))
633            return True

Make requests to a single Keep service, and track results.

A KeepService is intended to last long enough to perform one transaction (GET or PUT) against one Keep service. This can involve calling either get() or put() multiple times in order to retry after transient failures. However, calling both get() and put() on a single instance – or using the same instance to access two different Keep services – will not produce sensible behavior.

KeepClient.KeepService( root, user_agent_pool=<queue.LifoQueue object>, upload_counter=None, download_counter=None, headers={}, insecure=False)
421        def __init__(self, root, user_agent_pool=queue.LifoQueue(),
422                     upload_counter=None,
423                     download_counter=None,
424                     headers={},
425                     insecure=False):
426            super(KeepClient.KeepService, self).__init__()
427            self.root = root
428            self._user_agent_pool = user_agent_pool
429            self._result = {'error': None}
430            self._usable = True
431            self._session = None
432            self._socket = None
433            self.get_headers = {'Accept': 'application/octet-stream'}
434            self.get_headers.update(headers)
435            self.put_headers = headers
436            self.upload_counter = upload_counter
437            self.download_counter = download_counter
438            self.insecure = insecure
HTTP_ERRORS = (<class 'OSError'>, <class 'ssl.SSLError'>, <class 'arvados.errors.HttpError'>)
root
get_headers
put_headers
upload_counter
download_counter
insecure
def usable(self):
440        def usable(self):
441            """Is it worth attempting a request?"""
442            return self._usable

Is it worth attempting a request?

def finished(self):
444        def finished(self):
445            """Did the request succeed or encounter permanent failure?"""
446            return self._result['error'] == False or not self._usable

Did the request succeed or encounter permanent failure?

def last_result(self):
448        def last_result(self):
449            return self._result
def get(self, locator, method='GET', timeout=None):
464        def get(self, locator, method="GET", timeout=None):
465            # locator is a KeepLocator object.
466            url = self.root + str(locator)
467            _logger.debug("Request: %s %s", method, url)
468            curl = self._get_user_agent()
469            ok = None
470            try:
471                with timer.Timer() as t:
472                    self._headers = {}
473                    response_body = BytesIO()
474                    curl.setopt(pycurl.NOSIGNAL, 1)
475                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
476                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
477                    curl.setopt(pycurl.URL, url.encode('utf-8'))
478                    curl.setopt(pycurl.HTTPHEADER, [
479                        '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
480                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
481                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
482                    if self.insecure:
483                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
484                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
485                    else:
486                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
487                    if method == "HEAD":
488                        curl.setopt(pycurl.NOBODY, True)
489                    else:
490                        curl.setopt(pycurl.HTTPGET, True)
491                    self._setcurltimeouts(curl, timeout, method=="HEAD")
492
493                    try:
494                        curl.perform()
495                    except Exception as e:
496                        raise arvados.errors.HttpError(0, str(e))
497                    finally:
498                        if self._socket:
499                            self._socket.close()
500                            self._socket = None
501                    self._result = {
502                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
503                        'body': response_body.getvalue(),
504                        'headers': self._headers,
505                        'error': False,
506                    }
507
508                ok = retry.check_http_response_success(self._result['status_code'])
509                if not ok:
510                    self._result['error'] = arvados.errors.HttpError(
511                        self._result['status_code'],
512                        self._headers.get('x-status-line', 'Error'))
513            except self.HTTP_ERRORS as e:
514                self._result = {
515                    'error': e,
516                }
517            self._usable = ok != False
518            if self._result.get('status_code', None):
519                # The client worked well enough to get an HTTP status
520                # code, so presumably any problems are just on the
521                # server side and it's OK to reuse the client.
522                self._put_user_agent(curl)
523            else:
524                # Don't return this client to the pool, in case it's
525                # broken.
526                curl.close()
527            if not ok:
528                _logger.debug("Request fail: GET %s => %s: %s",
529                              url, type(self._result['error']), str(self._result['error']))
530                return None
531            if method == "HEAD":
532                _logger.info("HEAD %s: %s bytes",
533                         self._result['status_code'],
534                         self._result.get('content-length'))
535                if self._result['headers'].get('x-keep-locator'):
536                    # This is a response to a remote block copy request, return
537                    # the local copy block locator.
538                    return self._result['headers'].get('x-keep-locator')
539                return True
540
541            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
542                         self._result['status_code'],
543                         len(self._result['body']),
544                         t.msecs,
545                         1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
546
547            if self.download_counter:
548                self.download_counter.add(len(self._result['body']))
549            resp_md5 = hashlib.md5(self._result['body']).hexdigest()
550            if resp_md5 != locator.md5sum:
551                _logger.warning("Checksum fail: md5(%s) = %s",
552                                url, resp_md5)
553                self._result['error'] = arvados.errors.HttpError(
554                    0, 'Checksum fail')
555                return None
556            return self._result['body']
def put(self, hash_s, body, timeout=None, headers={}):
558        def put(self, hash_s, body, timeout=None, headers={}):
559            put_headers = copy.copy(self.put_headers)
560            put_headers.update(headers)
561            url = self.root + hash_s
562            _logger.debug("Request: PUT %s", url)
563            curl = self._get_user_agent()
564            ok = None
565            try:
566                with timer.Timer() as t:
567                    self._headers = {}
568                    body_reader = BytesIO(body)
569                    response_body = BytesIO()
570                    curl.setopt(pycurl.NOSIGNAL, 1)
571                    curl.setopt(pycurl.OPENSOCKETFUNCTION,
572                                lambda *args, **kwargs: self._socket_open(*args, **kwargs))
573                    curl.setopt(pycurl.URL, url.encode('utf-8'))
574                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
575                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
576                    # response) instead of sending the request body immediately.
577                    # This allows the server to reject the request if the request
578                    # is invalid or the server is read-only, without waiting for
579                    # the client to send the entire block.
580                    curl.setopt(pycurl.UPLOAD, True)
581                    curl.setopt(pycurl.INFILESIZE, len(body))
582                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
583                    curl.setopt(pycurl.HTTPHEADER, [
584                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
585                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
586                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
587                    if self.insecure:
588                        curl.setopt(pycurl.SSL_VERIFYPEER, 0)
589                        curl.setopt(pycurl.SSL_VERIFYHOST, 0)
590                    else:
591                        curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
592                    self._setcurltimeouts(curl, timeout)
593                    try:
594                        curl.perform()
595                    except Exception as e:
596                        raise arvados.errors.HttpError(0, str(e))
597                    finally:
598                        if self._socket:
599                            self._socket.close()
600                            self._socket = None
601                    self._result = {
602                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
603                        'body': response_body.getvalue().decode('utf-8'),
604                        'headers': self._headers,
605                        'error': False,
606                    }
607                ok = retry.check_http_response_success(self._result['status_code'])
608                if not ok:
609                    self._result['error'] = arvados.errors.HttpError(
610                        self._result['status_code'],
611                        self._headers.get('x-status-line', 'Error'))
612            except self.HTTP_ERRORS as e:
613                self._result = {
614                    'error': e,
615                }
616            self._usable = ok != False # still usable if ok is True or None
617            if self._result.get('status_code', None):
618                # Client is functional. See comment in get().
619                self._put_user_agent(curl)
620            else:
621                curl.close()
622            if not ok:
623                _logger.debug("Request fail: PUT %s => %s: %s",
624                              url, type(self._result['error']), str(self._result['error']))
625                return False
626            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
627                         self._result['status_code'],
628                         len(body),
629                         t.msecs,
630                         1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
631            if self.upload_counter:
632                self.upload_counter.add(len(body))
633            return True
class KeepClient.KeepWriterQueue(queue.Queue):
636    class KeepWriterQueue(queue.Queue):
637        def __init__(self, copies, classes=[]):
638            queue.Queue.__init__(self) # Old-style superclass
639            self.wanted_copies = copies
640            self.wanted_storage_classes = classes
641            self.successful_copies = 0
642            self.confirmed_storage_classes = {}
643            self.response = None
644            self.storage_classes_tracking = True
645            self.queue_data_lock = threading.RLock()
646            self.pending_tries = max(copies, len(classes))
647            self.pending_tries_notification = threading.Condition()
648
649        def write_success(self, response, replicas_nr, classes_confirmed):
650            with self.queue_data_lock:
651                self.successful_copies += replicas_nr
652                if classes_confirmed is None:
653                    self.storage_classes_tracking = False
654                elif self.storage_classes_tracking:
655                    for st_class, st_copies in classes_confirmed.items():
656                        try:
657                            self.confirmed_storage_classes[st_class] += st_copies
658                        except KeyError:
659                            self.confirmed_storage_classes[st_class] = st_copies
660                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
661                self.response = response
662            with self.pending_tries_notification:
663                self.pending_tries_notification.notify_all()
664
665        def write_fail(self, ks):
666            with self.pending_tries_notification:
667                self.pending_tries += 1
668                self.pending_tries_notification.notify()
669
670        def pending_copies(self):
671            with self.queue_data_lock:
672                return self.wanted_copies - self.successful_copies
673
674        def satisfied_classes(self):
675            with self.queue_data_lock:
676                if not self.storage_classes_tracking:
677                    # Notifies disabled storage classes expectation to
678                    # the outer loop.
679                    return None
680            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
681
682        def pending_classes(self):
683            with self.queue_data_lock:
684                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
685                    return []
686                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
687                for st_class, st_copies in self.confirmed_storage_classes.items():
688                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
689                        unsatisfied_classes.remove(st_class)
690                return unsatisfied_classes
691
692        def get_next_task(self):
693            with self.pending_tries_notification:
694                while True:
695                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
696                        # This notify_all() is unnecessary --
697                        # write_success() already called notify_all()
698                        # when pending<1 became true, so it's not
699                        # possible for any other thread to be in
700                        # wait() now -- but it's cheap insurance
701                        # against deadlock so we do it anyway:
702                        self.pending_tries_notification.notify_all()
703                        # Drain the queue and then raise Queue.Empty
704                        while True:
705                            self.get_nowait()
706                            self.task_done()
707                    elif self.pending_tries > 0:
708                        service, service_root = self.get_nowait()
709                        if service.finished():
710                            self.task_done()
711                            continue
712                        self.pending_tries -= 1
713                        return service, service_root
714                    elif self.empty():
715                        self.pending_tries_notification.notify_all()
716                        raise queue.Empty
717                    else:
718                        self.pending_tries_notification.wait()

Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.

KeepClient.KeepWriterQueue(copies, classes=[])
637        def __init__(self, copies, classes=[]):
638            queue.Queue.__init__(self) # Old-style superclass
639            self.wanted_copies = copies
640            self.wanted_storage_classes = classes
641            self.successful_copies = 0
642            self.confirmed_storage_classes = {}
643            self.response = None
644            self.storage_classes_tracking = True
645            self.queue_data_lock = threading.RLock()
646            self.pending_tries = max(copies, len(classes))
647            self.pending_tries_notification = threading.Condition()
wanted_copies
wanted_storage_classes
successful_copies
confirmed_storage_classes
response
storage_classes_tracking
queue_data_lock
pending_tries
pending_tries_notification
def write_success(self, response, replicas_nr, classes_confirmed):
649        def write_success(self, response, replicas_nr, classes_confirmed):
650            with self.queue_data_lock:
651                self.successful_copies += replicas_nr
652                if classes_confirmed is None:
653                    self.storage_classes_tracking = False
654                elif self.storage_classes_tracking:
655                    for st_class, st_copies in classes_confirmed.items():
656                        try:
657                            self.confirmed_storage_classes[st_class] += st_copies
658                        except KeyError:
659                            self.confirmed_storage_classes[st_class] = st_copies
660                    self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
661                self.response = response
662            with self.pending_tries_notification:
663                self.pending_tries_notification.notify_all()
def write_fail(self, ks):
665        def write_fail(self, ks):
666            with self.pending_tries_notification:
667                self.pending_tries += 1
668                self.pending_tries_notification.notify()
def pending_copies(self):
670        def pending_copies(self):
671            with self.queue_data_lock:
672                return self.wanted_copies - self.successful_copies
def satisfied_classes(self):
674        def satisfied_classes(self):
675            with self.queue_data_lock:
676                if not self.storage_classes_tracking:
677                    # Notifies disabled storage classes expectation to
678                    # the outer loop.
679                    return None
680            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
def pending_classes(self):
682        def pending_classes(self):
683            with self.queue_data_lock:
684                if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
685                    return []
686                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
687                for st_class, st_copies in self.confirmed_storage_classes.items():
688                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
689                        unsatisfied_classes.remove(st_class)
690                return unsatisfied_classes
def get_next_task(self):
692        def get_next_task(self):
693            with self.pending_tries_notification:
694                while True:
695                    if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
696                        # This notify_all() is unnecessary --
697                        # write_success() already called notify_all()
698                        # when pending<1 became true, so it's not
699                        # possible for any other thread to be in
700                        # wait() now -- but it's cheap insurance
701                        # against deadlock so we do it anyway:
702                        self.pending_tries_notification.notify_all()
703                        # Drain the queue and then raise Queue.Empty
704                        while True:
705                            self.get_nowait()
706                            self.task_done()
707                    elif self.pending_tries > 0:
708                        service, service_root = self.get_nowait()
709                        if service.finished():
710                            self.task_done()
711                            continue
712                        self.pending_tries -= 1
713                        return service, service_root
714                    elif self.empty():
715                        self.pending_tries_notification.notify_all()
716                        raise queue.Empty
717                    else:
718                        self.pending_tries_notification.wait()
Inherited Members
queue.Queue
maxsize
mutex
not_empty
not_full
all_tasks_done
unfinished_tasks
task_done
join
qsize
empty
full
put
get
put_nowait
get_nowait
class KeepClient.KeepWriterThreadPool:
721    class KeepWriterThreadPool(object):
722        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
723            self.total_task_nr = 0
724            if (not max_service_replicas) or (max_service_replicas >= copies):
725                num_threads = 1
726            else:
727                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
728            _logger.debug("Pool max threads is %d", num_threads)
729            self.workers = []
730            self.queue = KeepClient.KeepWriterQueue(copies, classes)
731            # Create workers
732            for _ in range(num_threads):
733                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
734                self.workers.append(w)
735
736        def add_task(self, ks, service_root):
737            self.queue.put((ks, service_root))
738            self.total_task_nr += 1
739
740        def done(self):
741            return self.queue.successful_copies, self.queue.satisfied_classes()
742
743        def join(self):
744            # Start workers
745            for worker in self.workers:
746                worker.start()
747            # Wait for finished work
748            self.queue.join()
749
750        def response(self):
751            return self.queue.response
KeepClient.KeepWriterThreadPool( data, data_hash, copies, max_service_replicas, timeout=None, classes=[])
722        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
723            self.total_task_nr = 0
724            if (not max_service_replicas) or (max_service_replicas >= copies):
725                num_threads = 1
726            else:
727                num_threads = int(math.ceil(1.0*copies/max_service_replicas))
728            _logger.debug("Pool max threads is %d", num_threads)
729            self.workers = []
730            self.queue = KeepClient.KeepWriterQueue(copies, classes)
731            # Create workers
732            for _ in range(num_threads):
733                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
734                self.workers.append(w)
total_task_nr
workers
queue
def add_task(self, ks, service_root):
736        def add_task(self, ks, service_root):
737            self.queue.put((ks, service_root))
738            self.total_task_nr += 1
def done(self):
740        def done(self):
741            return self.queue.successful_copies, self.queue.satisfied_classes()
def join(self):
743        def join(self):
744            # Start workers
745            for worker in self.workers:
746                worker.start()
747            # Wait for finished work
748            self.queue.join()
def response(self):
750        def response(self):
751            return self.queue.response
class KeepClient.KeepWriterThread(threading.Thread):
754    class KeepWriterThread(threading.Thread):
755        class TaskFailed(RuntimeError): pass
756
757        def __init__(self, queue, data, data_hash, timeout=None):
758            super(KeepClient.KeepWriterThread, self).__init__()
759            self.timeout = timeout
760            self.queue = queue
761            self.data = data
762            self.data_hash = data_hash
763            self.daemon = True
764
765        def run(self):
766            while True:
767                try:
768                    service, service_root = self.queue.get_next_task()
769                except queue.Empty:
770                    return
771                try:
772                    locator, copies, classes = self.do_task(service, service_root)
773                except Exception as e:
774                    if not isinstance(e, self.TaskFailed):
775                        _logger.exception("Exception in KeepWriterThread")
776                    self.queue.write_fail(service)
777                else:
778                    self.queue.write_success(locator, copies, classes)
779                finally:
780                    self.queue.task_done()
781
782        def do_task(self, service, service_root):
783            classes = self.queue.pending_classes()
784            headers = {}
785            if len(classes) > 0:
786                classes.sort()
787                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
788            success = bool(service.put(self.data_hash,
789                                        self.data,
790                                        timeout=self.timeout,
791                                        headers=headers))
792            result = service.last_result()
793
794            if not success:
795                if result.get('status_code'):
796                    _logger.debug("Request fail: PUT %s => %s %s",
797                                  self.data_hash,
798                                  result.get('status_code'),
799                                  result.get('body'))
800                raise self.TaskFailed()
801
802            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
803                          str(threading.current_thread()),
804                          self.data_hash,
805                          len(self.data),
806                          service_root)
807            try:
808                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
809            except (KeyError, ValueError):
810                replicas_stored = 1
811
812            classes_confirmed = {}
813            try:
814                scch = result['headers']['x-keep-storage-classes-confirmed']
815                for confirmation in scch.replace(' ', '').split(','):
816                    if '=' in confirmation:
817                        stored_class, stored_copies = confirmation.split('=')[:2]
818                        classes_confirmed[stored_class] = int(stored_copies)
819            except (KeyError, ValueError):
820                # Storage classes confirmed header missing or corrupt
821                classes_confirmed = None
822
823            return result['body'].strip(), replicas_stored, classes_confirmed

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

KeepClient.KeepWriterThread(queue, data, data_hash, timeout=None)
757        def __init__(self, queue, data, data_hash, timeout=None):
758            super(KeepClient.KeepWriterThread, self).__init__()
759            self.timeout = timeout
760            self.queue = queue
761            self.data = data
762            self.data_hash = data_hash
763            self.daemon = True

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

timeout
queue
data
data_hash
daemon
1108    @property
1109    def daemon(self):
1110        """A boolean value indicating whether this thread is a daemon thread.
1111
1112        This must be set before start() is called, otherwise RuntimeError is
1113        raised. Its initial value is inherited from the creating thread; the
1114        main thread is not a daemon thread and therefore all threads created in
1115        the main thread default to daemon = False.
1116
1117        The entire Python program exits when only daemon threads are left.
1118
1119        """
1120        assert self._initialized, "Thread.__init__() not called"
1121        return self._daemonic

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

def run(self):
765        def run(self):
766            while True:
767                try:
768                    service, service_root = self.queue.get_next_task()
769                except queue.Empty:
770                    return
771                try:
772                    locator, copies, classes = self.do_task(service, service_root)
773                except Exception as e:
774                    if not isinstance(e, self.TaskFailed):
775                        _logger.exception("Exception in KeepWriterThread")
776                    self.queue.write_fail(service)
777                else:
778                    self.queue.write_success(locator, copies, classes)
779                finally:
780                    self.queue.task_done()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

def do_task(self, service, service_root):
782        def do_task(self, service, service_root):
783            classes = self.queue.pending_classes()
784            headers = {}
785            if len(classes) > 0:
786                classes.sort()
787                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
788            success = bool(service.put(self.data_hash,
789                                        self.data,
790                                        timeout=self.timeout,
791                                        headers=headers))
792            result = service.last_result()
793
794            if not success:
795                if result.get('status_code'):
796                    _logger.debug("Request fail: PUT %s => %s %s",
797                                  self.data_hash,
798                                  result.get('status_code'),
799                                  result.get('body'))
800                raise self.TaskFailed()
801
802            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
803                          str(threading.current_thread()),
804                          self.data_hash,
805                          len(self.data),
806                          service_root)
807            try:
808                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
809            except (KeyError, ValueError):
810                replicas_stored = 1
811
812            classes_confirmed = {}
813            try:
814                scch = result['headers']['x-keep-storage-classes-confirmed']
815                for confirmation in scch.replace(' ', '').split(','):
816                    if '=' in confirmation:
817                        stored_class, stored_copies = confirmation.split('=')[:2]
818                        classes_confirmed[stored_class] = int(stored_copies)
819            except (KeyError, ValueError):
820                # Storage classes confirmed header missing or corrupt
821                classes_confirmed = None
822
823            return result['body'].strip(), replicas_stored, classes_confirmed
Inherited Members
threading.Thread
start
join
name
ident
is_alive
isDaemon
setDaemon
getName
setName
native_id
class KeepClient.KeepWriterThread.TaskFailed(builtins.RuntimeError):
755        class TaskFailed(RuntimeError): pass

Unspecified run-time error.

Inherited Members
builtins.RuntimeError
RuntimeError
builtins.BaseException
with_traceback
args